View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package com.github.dexecutor.core;
19  
20  import java.io.Writer;
21  import java.util.Collection;
22  import java.util.Date;
23  import java.util.HashSet;
24  import java.util.Set;
25  import java.util.concurrent.CopyOnWriteArrayList;
26  import java.util.concurrent.CopyOnWriteArraySet;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import com.github.dexecutor.core.graph.Dag;
37  import com.github.dexecutor.core.graph.DefaultDag;
38  import com.github.dexecutor.core.graph.Node;
39  import com.github.dexecutor.core.graph.Traversar;
40  import com.github.dexecutor.core.graph.Validator;
41  import com.github.dexecutor.core.task.ExecutionResult;
42  import com.github.dexecutor.core.task.ExecutionResults;
43  import com.github.dexecutor.core.task.ExecutionStatus;
44  import com.github.dexecutor.core.task.Task;
45  import com.github.dexecutor.core.task.TaskFactory;
46  import com.github.dexecutor.core.task.TaskProvider;
47  
48  /**
49   * Default implementation of @DependentTasksExecutor
50   * 
51   * @author Nadeem Mohammad
52   * 
53   * @since 0.0.1
54   *
55   * @param <T> Type of Node/Task ID
56   * @param <R> Type of Node/Task result
57   */
58  public final class DefaultDependentTasksExecutor <T extends Comparable<T>, R> implements DependentTasksExecutor<T> {
59  
60  	private static final Logger logger = LoggerFactory.getLogger(DefaultDependentTasksExecutor.class);
61  
62  	private TaskProvider<T, R> taskProvider;
63  	private ExecutionEngine<T, R> executionEngine;
64  	private Validator<T, R> validator;
65  	private Traversar<T, R> traversar;
66  	private Dag<T, R> graph;
67  
68  	private Collection<Node<T, R>> processedNodes = new CopyOnWriteArrayList<Node<T, R>>();
69  	private Collection<Node<T, R>> continueAfterSuccess = new CopyOnWriteArraySet<Node<T, R>>();
70  
71  	private AtomicInteger nodesCount = new AtomicInteger(0);
72  	private ExecutorService immediatelyRetryExecutor;
73  	private ScheduledExecutorService scheduledRetryExecutor;
74  
75  	private Phase currentPhase = Phase.BUILDING;
76  
77  	public DefaultDependentTasksExecutor(final ExecutionEngine<T, R> executionEngine, final TaskProvider<T, R> taskProvider) {
78  		this(new DependentTasksExecutorConfig<>(executionEngine, taskProvider));
79  	}
80  
81  	/**
82  	 * Creates the Executor with Config
83  	 * @param config
84  	 */
85  	public DefaultDependentTasksExecutor(final DependentTasksExecutorConfig<T, R> config) {
86  		config.validate();
87  
88  		this.immediatelyRetryExecutor = Executors.newFixedThreadPool(config.getImmediateRetryPoolThreadsCount());
89  		this.scheduledRetryExecutor = Executors.newScheduledThreadPool(config.getScheduledRetryPoolThreadsCount());
90  
91  		this.executionEngine = config.getExecutorEngine();
92  		this.validator = config.getValidator();
93  		this.traversar = config.getTraversar();
94  		this.graph = new DefaultDag<T, R>();
95  		this.taskProvider = config.getTaskProvider();
96  	}
97  
98  	public void print(final Writer writer) {
99  		this.traversar.traverse(this.graph, writer);
100 	}
101 
102 	public void addIndependent(final T nodeValue) {
103 		checkValidPhase();
104 		this.graph.addIndependent(nodeValue);
105 	}
106 
107 	public void addDependency(final T evalFirstNode, final T evalLaterNode) {
108 		checkValidPhase();
109 		this.graph.addDependency(evalFirstNode, evalLaterNode);
110 	}
111 
112 	public void addAsDependentOnAllLeafNodes(final T nodeValue) {
113 		checkValidPhase();
114 		if (this.graph.size() == 0) {
115 			addIndependent(nodeValue);
116 		} else {
117 			for (Node<T, R> node : this.graph.getLeafNodes()) {
118 				addDependency(node.getValue(), nodeValue);
119 			}
120 		}
121 	}
122 
123 	@Override
124 	public void addAsDependencyToAllInitialNodes(final T nodeValue) {
125 		checkValidPhase();
126 		if (this.graph.size() == 0) {
127 			addIndependent(nodeValue);
128 		} else {
129 			for (Node<T, R> node : this.graph.getInitialNodes()) {
130 				addDependency(nodeValue, node.getValue());
131 			}
132 		}		
133 	}
134 
135 	public void execute(final ExecutionConfig config) {
136 		validate(config);
137 
138 		this.currentPhase = Phase.RUNNING;
139 
140 		Set<Node<T, R>> initialNodes = this.graph.getInitialNodes();
141 
142 		long start = new Date().getTime();
143 
144 		doProcessNodes(config, initialNodes);
145 		shutdownExecutors();
146 
147 		long end = new Date().getTime();
148 
149 		this.currentPhase = Phase.TERMINATED;
150 
151 		logger.debug("Total Time taken to process {} jobs is {} ms.", graph.size(), end - start);
152 		logger.debug("Processed Nodes Ordering {}", this.processedNodes);
153 	}
154 
155 	private void shutdownExecutors() {
156 		this.immediatelyRetryExecutor.shutdown();
157 		this.scheduledRetryExecutor.shutdown();
158 		try {
159 			this.immediatelyRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
160 			this.scheduledRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
161 		} catch (InterruptedException e) {
162 			logger.error("Error Shuting down Executor", e);
163 		}		
164 	}
165 
166 	private void validate(ExecutionConfig config) {
167 		checkValidPhase();
168 		config.validate();
169 		this.validator.validate(this.graph);
170 	}
171 
172 	private void checkValidPhase() {
173 		throwExceptionIfTerminated();
174 		throwExceptionIfRunning();
175 	}
176 
177 	private void throwExceptionIfRunning() {
178 		if (Phase.RUNNING.equals(this.currentPhase)) {
179 			throw new IllegalStateException("Dexecutor is already running!");
180 		}
181 	}
182 
183 	private void throwExceptionIfTerminated() {
184 		if (Phase.TERMINATED.equals(this.currentPhase)) {
185 			throw new IllegalStateException("Dexecutor has been terminated!");
186 		}
187 	}
188 
189 	private void doProcessNodes(final ExecutionConfig config, final Set<Node<T, R>> nodes) {
190 		doExecute(nodes, config);
191 		doWaitForExecution(config);	
192 	}
193 
194 	private void doExecute(final Collection<Node<T, R>> nodes, final ExecutionConfig config) {
195 		for (Node<T, R> node : nodes) {
196 			if (shouldProcess(node)) {				
197 				Task<T, R> task = newTask(config, node);
198 				if (shouldExecute(node, task)) {
199 					nodesCount.incrementAndGet();
200 					logger.debug("Going to schedule {} node", node.getValue());
201 					this.executionEngine.submit(task);
202 				} else {
203 					node.setSkipped();
204 					logger.debug("Execution Skipped for node # {} ", node.getValue());
205 					this.processedNodes.add(node);
206 					doExecute(node.getOutGoingNodes(), config);
207 				}
208 			} else {
209 				logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
210 			}
211 		}
212 	}
213 
214 	private boolean shouldProcess(final Node<T, R> node) {
215 		return !isAlreadyProcessed(node) && allIncomingNodesProcessed(node);
216 	}
217 
218 	private boolean isAlreadyProcessed(final Node<T, R> node) {
219 		return this.processedNodes.contains(node);
220 	}
221 
222 	private boolean allIncomingNodesProcessed(final Node<T, R> node) {
223 		if (node.getInComingNodes().isEmpty() || areAlreadyProcessed(node.getInComingNodes())) {
224 			return true;
225 		}
226 		return false;
227 	}
228 
229 	private boolean areAlreadyProcessed(final Set<Node<T, R>> nodes) {
230         return this.processedNodes.containsAll(nodes);
231     }
232 
233 	private boolean shouldExecute(final Node<T, R> node, final Task<T, R> task) {
234 		if (task.shouldExecute(parentResults(node))) {
235 			return true;
236 		}
237 		return false;
238 	}
239 
240 	private ExecutionResults<T, R> parentResults(final Node<T, R> node) {
241 		ExecutionResults<T, R> parentResult = new ExecutionResults<T, R>();
242 		for (Node<T, R> pNode : node.getInComingNodes()) {
243 			parentResult.add(new ExecutionResult<T, R>(pNode.getValue(), pNode.getResult(), status(pNode)));
244 		}
245 		return parentResult;
246 	}
247 
248 	private ExecutionStatus status(final Node<T, R> node) {
249 		ExecutionStatus status = ExecutionStatus.SUCCESS;
250 		if (node.isErrored()) {
251 			status = ExecutionStatus.ERRORED;
252 		} else if (node.isSkipped()) {
253 			status = ExecutionStatus.SKIPPED;
254 		}
255 		return status;
256 	}
257 
258 	private void doWaitForExecution(final ExecutionConfig config) {
259 		while (nodesCount.get() > 0) {
260 			ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
261 			nodesCount.decrementAndGet();
262 			logger.debug("Processing of node {} done, with status {}", executionResult.getId(), executionResult.getStatus());
263 
264 			final Node<T, R> processedNode = this.graph.get(executionResult.getId());
265 			updateNode(executionResult, processedNode);
266 			this.processedNodes.add(processedNode);
267 
268 			if (executionResult.isSuccess() && !this.executionEngine.isAnyTaskInError() && !this.continueAfterSuccess.isEmpty()) {
269 				Collection<Node<T, R>> recover = new HashSet<>(this.continueAfterSuccess);	
270 				this.continueAfterSuccess.clear();
271 				doExecute(recover, config);
272 			}
273 
274 			if (config.isNonTerminating() ||  (!this.executionEngine.isAnyTaskInError())) {
275 				doExecute(processedNode.getOutGoingNodes(), config);				
276 			} else if (this.executionEngine.isAnyTaskInError() && executionResult.isSuccess()) { 
277 				this.continueAfterSuccess.addAll(processedNode.getOutGoingNodes());
278 			} else if (shouldDoImmediateRetry(config, executionResult, processedNode)) {
279 				logger.debug("Submitting for Immediate retry, node {}", executionResult.getId());
280 				submitForImmediateRetry(config, processedNode);
281 			} else if (shouldScheduleRetry(config, executionResult, processedNode)) {
282 				logger.debug("Submitting for Scheduled retry, node {}", executionResult.getId());
283 				submitForScheduledRetry(config, processedNode);
284 			}
285 		}
286 	}
287 
288 	private boolean shouldScheduleRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
289 			final Node<T, R> processedNode) {
290 		return executionResult.isErrored() && config.isScheduledRetrying() && config.shouldRetry(getExecutionCount(processedNode));
291 	}
292 
293 	private boolean shouldDoImmediateRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
294 			final Node<T, R> processedNode) {
295 		return executionResult.isErrored() && config.isImmediatelyRetrying() && config.shouldRetry(getExecutionCount(processedNode));
296 	}
297 
298 	private void submitForImmediateRetry(final ExecutionConfig config, final Node<T, R> node) {
299 		Task<T, R> task = newTask(config, node);
300 		this.immediatelyRetryExecutor.submit(retryingTask(config, task));
301 	}
302 
303 	private void submitForScheduledRetry(ExecutionConfig config, Node<T, R> node) {
304 		Task<T, R> task = newTask(config, node);
305 		this.scheduledRetryExecutor.schedule(retryingTask(config, task), config.getRetryDelay().getDuration(), config.getRetryDelay().getTimeUnit());		
306 	}
307 
308 	private Task<T, R> newTask(final ExecutionConfig config, final Node<T, R> node) {
309 		Task<T, R> task = this.taskProvider.provideTask(node.getValue());
310 		task.setId(node.getValue());
311 		return TaskFactory.newWorker(task);
312 	}
313 
314 	private Runnable retryingTask(final ExecutionConfig config, final Task<T, R> task) {
315 		nodesCount.incrementAndGet();
316 		return new Runnable() {
317 			@Override
318 			public void run() {
319 				executionEngine.submit(task);
320 			}
321 		};
322 	}
323 
324 	private void updateExecutionCount(final Node<T, R> node) {
325 		Integer count = getExecutionCount(node);
326 		if (count == null) {
327 			count = 0;
328 		} else {
329 			count++;
330 		}
331 		node.setData(count);
332 	}
333 
334 	private Integer getExecutionCount(final Node<T, R> node) {
335 		return (Integer) node.getData();
336 	}
337 
338 	private void updateNode(final ExecutionResult<T, R> executionResult, final Node<T, R> processedNode) {
339 		updateExecutionCount(processedNode);
340 		processedNode.setResult(executionResult.getResult());
341 		if(executionResult.isErrored()) {
342 			processedNode.setErrored();
343 		} else if(executionResult.isSkipped()) {
344 			processedNode.setSkipped();
345 		} else {
346 			processedNode.setSuccess();
347 		}
348 	}
349 
350 	private static enum Phase {
351 		BUILDING, RUNNING, TERMINATED;
352 	}
353 }