1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package com.github.dexecutor.core;
19
20 import java.io.Writer;
21 import java.util.Collection;
22 import java.util.Date;
23 import java.util.HashSet;
24 import java.util.Set;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CopyOnWriteArraySet;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.github.dexecutor.core.graph.Dag;
37 import com.github.dexecutor.core.graph.DefaultDag;
38 import com.github.dexecutor.core.graph.Node;
39 import com.github.dexecutor.core.graph.Traversar;
40 import com.github.dexecutor.core.graph.Validator;
41 import com.github.dexecutor.core.task.ExecutionResult;
42 import com.github.dexecutor.core.task.ExecutionResults;
43 import com.github.dexecutor.core.task.ExecutionStatus;
44 import com.github.dexecutor.core.task.Task;
45 import com.github.dexecutor.core.task.TaskFactory;
46 import com.github.dexecutor.core.task.TaskProvider;
47
48
49
50
51
52
53
54
55
56
57
58 public final class DefaultDependentTasksExecutor <T extends Comparable<T>, R> implements DependentTasksExecutor<T> {
59
60 private static final Logger logger = LoggerFactory.getLogger(DefaultDependentTasksExecutor.class);
61
62 private TaskProvider<T, R> taskProvider;
63 private ExecutionEngine<T, R> executionEngine;
64 private Validator<T, R> validator;
65 private Traversar<T, R> traversar;
66 private Dag<T, R> graph;
67
68 private Collection<Node<T, R>> processedNodes = new CopyOnWriteArrayList<Node<T, R>>();
69 private Collection<Node<T, R>> continueAfterSuccess = new CopyOnWriteArraySet<Node<T, R>>();
70
71 private AtomicInteger nodesCount = new AtomicInteger(0);
72 private ExecutorService immediatelyRetryExecutor;
73 private ScheduledExecutorService scheduledRetryExecutor;
74
75 private Phase currentPhase = Phase.BUILDING;
76
77 public DefaultDependentTasksExecutor(final ExecutionEngine<T, R> executionEngine, final TaskProvider<T, R> taskProvider) {
78 this(new DependentTasksExecutorConfig<>(executionEngine, taskProvider));
79 }
80
81
82
83
84
85 public DefaultDependentTasksExecutor(final DependentTasksExecutorConfig<T, R> config) {
86 config.validate();
87
88 this.immediatelyRetryExecutor = Executors.newFixedThreadPool(config.getImmediateRetryPoolThreadsCount());
89 this.scheduledRetryExecutor = Executors.newScheduledThreadPool(config.getScheduledRetryPoolThreadsCount());
90
91 this.executionEngine = config.getExecutorEngine();
92 this.validator = config.getValidator();
93 this.traversar = config.getTraversar();
94 this.graph = new DefaultDag<T, R>();
95 this.taskProvider = config.getTaskProvider();
96 }
97
98 public void print(final Writer writer) {
99 this.traversar.traverse(this.graph, writer);
100 }
101
102 public void addIndependent(final T nodeValue) {
103 checkValidPhase();
104 this.graph.addIndependent(nodeValue);
105 }
106
107 public void addDependency(final T evalFirstNode, final T evalLaterNode) {
108 checkValidPhase();
109 this.graph.addDependency(evalFirstNode, evalLaterNode);
110 }
111
112 public void addAsDependentOnAllLeafNodes(final T nodeValue) {
113 checkValidPhase();
114 if (this.graph.size() == 0) {
115 addIndependent(nodeValue);
116 } else {
117 for (Node<T, R> node : this.graph.getLeafNodes()) {
118 addDependency(node.getValue(), nodeValue);
119 }
120 }
121 }
122
123 @Override
124 public void addAsDependencyToAllInitialNodes(final T nodeValue) {
125 checkValidPhase();
126 if (this.graph.size() == 0) {
127 addIndependent(nodeValue);
128 } else {
129 for (Node<T, R> node : this.graph.getInitialNodes()) {
130 addDependency(nodeValue, node.getValue());
131 }
132 }
133 }
134
135 public void execute(final ExecutionConfig config) {
136 validate(config);
137
138 this.currentPhase = Phase.RUNNING;
139
140 Set<Node<T, R>> initialNodes = this.graph.getInitialNodes();
141
142 long start = new Date().getTime();
143
144 doProcessNodes(config, initialNodes);
145 shutdownExecutors();
146
147 long end = new Date().getTime();
148
149 this.currentPhase = Phase.TERMINATED;
150
151 logger.debug("Total Time taken to process {} jobs is {} ms.", graph.size(), end - start);
152 logger.debug("Processed Nodes Ordering {}", this.processedNodes);
153 }
154
155 private void shutdownExecutors() {
156 this.immediatelyRetryExecutor.shutdown();
157 this.scheduledRetryExecutor.shutdown();
158 try {
159 this.immediatelyRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
160 this.scheduledRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
161 } catch (InterruptedException e) {
162 logger.error("Error Shuting down Executor", e);
163 }
164 }
165
166 private void validate(ExecutionConfig config) {
167 checkValidPhase();
168 config.validate();
169 this.validator.validate(this.graph);
170 }
171
172 private void checkValidPhase() {
173 throwExceptionIfTerminated();
174 throwExceptionIfRunning();
175 }
176
177 private void throwExceptionIfRunning() {
178 if (Phase.RUNNING.equals(this.currentPhase)) {
179 throw new IllegalStateException("Dexecutor is already running!");
180 }
181 }
182
183 private void throwExceptionIfTerminated() {
184 if (Phase.TERMINATED.equals(this.currentPhase)) {
185 throw new IllegalStateException("Dexecutor has been terminated!");
186 }
187 }
188
189 private void doProcessNodes(final ExecutionConfig config, final Set<Node<T, R>> nodes) {
190 doExecute(nodes, config);
191 doWaitForExecution(config);
192 }
193
194 private void doExecute(final Collection<Node<T, R>> nodes, final ExecutionConfig config) {
195 for (Node<T, R> node : nodes) {
196 if (shouldProcess(node)) {
197 Task<T, R> task = newTask(config, node);
198 if (shouldExecute(node, task)) {
199 nodesCount.incrementAndGet();
200 logger.debug("Going to schedule {} node", node.getValue());
201 this.executionEngine.submit(task);
202 } else {
203 node.setSkipped();
204 logger.debug("Execution Skipped for node # {} ", node.getValue());
205 this.processedNodes.add(node);
206 doExecute(node.getOutGoingNodes(), config);
207 }
208 } else {
209 logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
210 }
211 }
212 }
213
214 private boolean shouldProcess(final Node<T, R> node) {
215 return !isAlreadyProcessed(node) && allIncomingNodesProcessed(node);
216 }
217
218 private boolean isAlreadyProcessed(final Node<T, R> node) {
219 return this.processedNodes.contains(node);
220 }
221
222 private boolean allIncomingNodesProcessed(final Node<T, R> node) {
223 if (node.getInComingNodes().isEmpty() || areAlreadyProcessed(node.getInComingNodes())) {
224 return true;
225 }
226 return false;
227 }
228
229 private boolean areAlreadyProcessed(final Set<Node<T, R>> nodes) {
230 return this.processedNodes.containsAll(nodes);
231 }
232
233 private boolean shouldExecute(final Node<T, R> node, final Task<T, R> task) {
234 if (task.shouldExecute(parentResults(node))) {
235 return true;
236 }
237 return false;
238 }
239
240 private ExecutionResults<T, R> parentResults(final Node<T, R> node) {
241 ExecutionResults<T, R> parentResult = new ExecutionResults<T, R>();
242 for (Node<T, R> pNode : node.getInComingNodes()) {
243 parentResult.add(new ExecutionResult<T, R>(pNode.getValue(), pNode.getResult(), status(pNode)));
244 }
245 return parentResult;
246 }
247
248 private ExecutionStatus status(final Node<T, R> node) {
249 ExecutionStatus status = ExecutionStatus.SUCCESS;
250 if (node.isErrored()) {
251 status = ExecutionStatus.ERRORED;
252 } else if (node.isSkipped()) {
253 status = ExecutionStatus.SKIPPED;
254 }
255 return status;
256 }
257
258 private void doWaitForExecution(final ExecutionConfig config) {
259 while (nodesCount.get() > 0) {
260
261 ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
262 nodesCount.decrementAndGet();
263 logger.debug("Processing of node {} done, with status {}", executionResult.getId(), executionResult.getStatus());
264
265 final Node<T, R> processedNode = this.graph.get(executionResult.getId());
266 updateNode(executionResult, processedNode);
267 this.processedNodes.add(processedNode);
268
269 if (executionResult.isSuccess() && !this.executionEngine.isAnyTaskInError() && !this.continueAfterSuccess.isEmpty()) {
270 Collection<Node<T, R>> recover = new HashSet<>(this.continueAfterSuccess);
271 this.continueAfterSuccess.clear();
272 doExecute(recover, config);
273 }
274
275 if (config.isNonTerminating() || (!this.executionEngine.isAnyTaskInError())) {
276 doExecute(processedNode.getOutGoingNodes(), config);
277 } else if (this.executionEngine.isAnyTaskInError() && executionResult.isSuccess()) {
278 this.continueAfterSuccess.addAll(processedNode.getOutGoingNodes());
279 } else if (shouldDoImmediateRetry(config, executionResult, processedNode)) {
280 logger.debug("Submitting for Immediate retry, node {}", executionResult.getId());
281 submitForImmediateRetry(config, processedNode);
282 } else if (shouldScheduleRetry(config, executionResult, processedNode)) {
283 logger.debug("Submitting for Scheduled retry, node {}", executionResult.getId());
284 submitForScheduledRetry(config, processedNode);
285 }
286 }
287 }
288
289 private boolean shouldScheduleRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
290 final Node<T, R> processedNode) {
291 return executionResult.isErrored() && config.isScheduledRetrying() && config.shouldRetry(getExecutionCount(processedNode));
292 }
293
294 private boolean shouldDoImmediateRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
295 final Node<T, R> processedNode) {
296 return executionResult.isErrored() && config.isImmediatelyRetrying() && config.shouldRetry(getExecutionCount(processedNode));
297 }
298
299 private void submitForImmediateRetry(final ExecutionConfig config, final Node<T, R> node) {
300 Task<T, R> task = newTask(config, node);
301 this.immediatelyRetryExecutor.submit(retryingTask(config, task));
302 }
303
304 private void submitForScheduledRetry(ExecutionConfig config, Node<T, R> node) {
305 Task<T, R> task = newTask(config, node);
306 this.scheduledRetryExecutor.schedule(retryingTask(config, task), config.getRetryDelay().getDuration(), config.getRetryDelay().getTimeUnit());
307 }
308
309 private Task<T, R> newTask(final ExecutionConfig config, final Node<T, R> node) {
310 Task<T, R> task = this.taskProvider.provideTask(node.getValue());
311 task.setId(node.getValue());
312 updateConsiderExecutionStatus(config, task);
313 return TaskFactory.newWorker(task);
314 }
315
316 private void updateConsiderExecutionStatus(final ExecutionConfig config, final Task<T, R> task) {
317 if (config.isImmediatelyRetrying() || config.isScheduledRetrying()) {
318 Node<T, R> node = this.graph.get(task.getId());
319 Integer currentCount = getExecutionCount(node);
320 if (currentCount < config.getRetryCount()) {
321 task.setConsiderExecutionError(false);
322 } else {
323 task.setConsiderExecutionError(true);
324 }
325 }
326 }
327
328 private Runnable retryingTask(final ExecutionConfig config, final Task<T, R> task) {
329 nodesCount.incrementAndGet();
330 return new Runnable() {
331 @Override
332 public void run() {
333 executionEngine.submit(task);
334 }
335 };
336 }
337
338 private void updateExecutionCount(final Node<T, R> node) {
339 Integer count = getExecutionCount(node);
340 count++;
341 node.setData(count);
342 }
343
344 private Integer getExecutionCount(final Node<T, R> node) {
345 Integer count = (Integer) node.getData();
346 if (count == null) {
347 count = 0;
348 }
349 return count;
350 }
351
352 private void updateNode(final ExecutionResult<T, R> executionResult, final Node<T, R> processedNode) {
353 updateExecutionCount(processedNode);
354 processedNode.setResult(executionResult.getResult());
355 if(executionResult.isErrored()) {
356 processedNode.setErrored();
357 }
358
359 else {
360 processedNode.setSuccess();
361 }
362 }
363
364 private static enum Phase {
365 BUILDING, RUNNING, TERMINATED;
366 }
367 }