1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package com.github.dexecutor.executor;
19
20 import java.io.Writer;
21 import java.util.Collection;
22 import java.util.Date;
23 import java.util.Set;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.CompletionService;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.ExecutorCompletionService;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import com.github.dexecutor.executor.TaskProvider.Task;
36 import com.github.dexecutor.executor.graph.DefaultGraph;
37 import com.github.dexecutor.executor.graph.Graph;
38 import com.github.dexecutor.executor.graph.Graph.Node;
39 import com.github.dexecutor.executor.graph.Traversar;
40 import com.github.dexecutor.executor.graph.Validator;
41
42
43
44
45
46
47
48
49
50
51
52 public final class DefaultDependentTasksExecutor <T extends Comparable<T>, R> implements DependentTasksExecutor<T> {
53
54 private static final Logger logger = LoggerFactory.getLogger(DefaultDependentTasksExecutor.class);
55
56 private ExecutorService executorService;
57 private TaskProvider<T, R> taskProvider;
58 private Validator<T, R> validator;
59 private Traversar<T, R> traversar;
60 private Graph<T, R> graph;
61
62 private Collection<Node<T, R>> processedNodes = new CopyOnWriteArrayList<Node<T, R>>();
63 private AtomicInteger nodesCount = new AtomicInteger(0);
64
65
66
67
68
69 public DefaultDependentTasksExecutor(final ExecutorService executorService, final TaskProvider<T, R> taskProvider) {
70 this(new DependentTasksExecutorConfig<T, R>(executorService, taskProvider));
71 }
72
73
74
75
76 public DefaultDependentTasksExecutor(final DependentTasksExecutorConfig<T, R> config) {
77 config.validate();
78 this.executorService = config.getExecutorService();
79 this.taskProvider = config.getTaskProvider();
80 this.validator = config.getValidator();
81 this.traversar = config.getTraversar();
82 this.graph = new DefaultGraph<T, R>();
83 }
84
85 public void print(final Writer writer) {
86 this.traversar.traverse(this.graph, writer);
87 }
88
89 public void addIndependent(final T nodeValue) {
90 this.graph.addIndependent(nodeValue);
91 }
92
93 public void addDependency(final T evalFirstNode, final T evalLaterNode) {
94 this.graph.addDependency(evalFirstNode, evalLaterNode);
95 }
96
97 public void addAsDependentOnAllLeafNodes(final T nodeValue) {
98 if (this.graph.size() == 0) {
99 addIndependent(nodeValue);
100 } else {
101 for (Node<T, R> node : this.graph.getLeafNodes()) {
102 addDependency(node.getValue(), nodeValue);
103 }
104 }
105 }
106
107 @Override
108 public void addAsDependencyToAllInitialNodes(final T nodeValue) {
109 if (this.graph.size() == 0) {
110 addIndependent(nodeValue);
111 } else {
112 for (Node<T, R> node : this.graph.getInitialNodes()) {
113 addDependency(nodeValue, node.getValue());
114 }
115 }
116 }
117
118 private boolean isAlreadyProcessed(final Node<T, R> node) {
119 return this.processedNodes.contains(node);
120 }
121
122 private boolean areAlreadyProcessed(final Set<Node<T, R>> nodes) {
123 return this.processedNodes.containsAll(nodes);
124 }
125
126 public void execute(final ExecutionBehavior behavior) {
127 validate();
128
129 Set<Node<T, R>> initialNodes = this.graph.getInitialNodes();
130 CompletionService<Node<T, R>> completionService = new ExecutorCompletionService<Node<T, R>>(executorService);
131
132 long start = new Date().getTime();
133
134 doProcessNodes(behavior, initialNodes, completionService);
135
136 long end = new Date().getTime();
137
138 logger.debug("Total Time taken to process {} jobs is {} ms.", graph.size(), end - start);
139 logger.debug("Processed Nodes Ordering {}", this.processedNodes);
140 }
141
142 private void doProcessNodes(final ExecutionBehavior behavior, final Set<Node<T, R>> nodes, final CompletionService<Node<T, R>> completionService) {
143 doExecute(nodes, completionService, behavior);
144 doWaitForExecution(completionService, behavior);
145 }
146
147 private void validate() {
148 this.validator.validate(this.graph);
149 }
150
151 private void doExecute(final Collection<Node<T, R>> nodes, final CompletionService<Node<T, R>> completionService, final ExecutionBehavior behavior) {
152 for (Node<T, R> node : nodes) {
153 if (shouldProcess(node) ) {
154 nodesCount.incrementAndGet();
155 logger.debug("Going to schedule {} node", node.getValue());
156 completionService.submit(newWorker(node, behavior));
157
158 } else {
159 logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
160 }
161 }
162 }
163
164 private boolean shouldProcess(final Node<T, R> node) {
165 return !this.executorService.isShutdown() && !isAlreadyProcessed(node) && allIncomingNodesProcessed(node);
166 }
167
168 private boolean allIncomingNodesProcessed(final Node<T, R> node) {
169 if (node.getInComingNodes().isEmpty() || areAlreadyProcessed(node.getInComingNodes())) {
170 return true;
171 }
172 return false;
173 }
174
175 private void doWaitForExecution(final CompletionService<Node<T, R>> completionService, final ExecutionBehavior behavior) {
176 int cuurentCount = 0;
177 while (cuurentCount != nodesCount.get()) {
178 try {
179 Future<Node<T, R>> future = completionService.take();
180 Node<T, R> processedNode = future.get();
181 logger.debug("Processing of node {} done", processedNode.getValue());
182 cuurentCount++;
183 this.processedNodes.add(processedNode);
184
185 doExecute(processedNode.getOutGoingNodes(), completionService, behavior);
186 } catch (Exception e) {
187 cuurentCount++;
188 logger.error("Task interrupted", e);
189 }
190 }
191 }
192
193 private Callable<Node<T, R>> newWorker(final Node<T, R> graphNode, final ExecutionBehavior behavior) {
194 if (ExecutionBehavior.NON_TERMINATING.equals(behavior)) {
195 return new NonTerminatingTask(graphNode);
196 } else if (ExecutionBehavior.RETRY_ONCE_TERMINATING.equals(behavior)) {
197 return new RetryOnceAndTerminateTask(graphNode);
198 } else {
199 return new TerminatingTask(graphNode);
200 }
201 }
202
203 private class TerminatingTask implements Callable<Node<T, R>> {
204 private Node<T, R> node;
205
206 public TerminatingTask(final Node<T, R> graphNode) {
207 this.node = graphNode;
208 }
209
210 public Node<T, R> call() throws Exception {
211 Task<T, R> task = newExecutorTask(this.node);
212 task.setConsiderExecutionError(true);
213 R result = task.execute();
214 this.node.setResult(result);
215 return this.node;
216 }
217 }
218
219 private class NonTerminatingTask implements Callable<Node<T, R>> {
220 private Node<T, R> node;
221
222 public NonTerminatingTask(final Node<T, R> graphNode) {
223 this.node = graphNode;
224 }
225
226 public Node<T, R> call() throws Exception {
227 try {
228 Task<T, R> task = newExecutorTask(this.node);
229 task.setConsiderExecutionError(false);
230 task.execute();
231 } catch(Exception ex) {
232 logger.error("Exception caught, executing node # " + this.node.getValue(), ex);
233 this.node.setErrored();
234 }
235 return this.node;
236 }
237 }
238
239 private class RetryOnceAndTerminateTask implements Callable<Node<T, R>> {
240 private Node<T, R> node;
241
242 public RetryOnceAndTerminateTask(final Node<T, R> graphNode) {
243 this.node = graphNode;
244 }
245
246 public Node<T, R> call() throws Exception {
247 Task<T, R> task = newExecutorTask(this.node);
248 boolean retry = shouldRetry(this.node.getValue());
249 task.setConsiderExecutionError(!retry);
250 try {
251 task.execute();
252 } catch(Exception ex) {
253 logger.error("Exception caught, executing node # " + this.node.getValue() + " Retry would happen : " + getYesNo(retry), ex);
254 if (retry) {
255 task.setConsiderExecutionError(true);
256 task.execute();
257 }
258 }
259 return this.node;
260 }
261
262 private String getYesNo(boolean retry) {
263 return retry ? "Yes" : "No";
264 }
265 }
266
267 protected boolean shouldRetry(final T node) {
268 return true;
269 }
270
271 private Task<T, R> newExecutorTask(final Node<T, R> node) {
272 return new ExecutorTask(node, this.taskProvider.provid(node.getValue()));
273 }
274
275 private class ExecutorTask extends Task<T, R> {
276 private final Task<T, R> task;
277 private final Node<T, R> node;
278 private int retryCount = 0;
279
280 public ExecutorTask(final Node<T, R> node, final Task<T, R> task) {
281 this.task = task;
282 this.node = node;
283 }
284
285 public R execute() {
286 R result = null;
287 if (shouldExecute(parentResults())) {
288 logger.debug("{} Node # {}", msg(this.retryCount), this.taskId());
289 this.retryCount ++;
290 result = this.task.execute();
291 this.node.setSuccess();
292 this.node.setResult(result);
293 logger.debug("Node # {}, Execution Done!", this.taskId());
294 } else {
295 logger.debug("Execution Skipped for node # {} ", this.taskId());
296 this.node.setSkipped();
297 }
298 return result;
299 }
300
301 @Override
302 public boolean shouldExecute(final ExecutionResults<T, R> parentResults) {
303 return task.shouldExecute(parentResults);
304 }
305
306 private ExecutionResults<T, R> parentResults() {
307 ExecutionResults<T, R> result = new ExecutionResults<T, R>();
308 for (Node<T, R> in : this.node.getInComingNodes()) {
309 result.add(new ExecutionResult<T, R>(in.getValue(), in.getResult(), status(in)));
310 }
311 return result;
312 }
313
314 private ExecutionStatus status(final Node<T, R> incomingNode) {
315 if (incomingNode.isSuccess()) {
316 return ExecutionStatus.SUCCESS;
317 } else if (incomingNode.isErrored()) {
318 return ExecutionStatus.ERRORED;
319 }
320 return ExecutionStatus.SKIPPED;
321 }
322
323 private T taskId() {
324 return this.node.getValue();
325 }
326
327 private String msg(int retryCount) {
328 return retryCount > 0 ? "Retrying(" + retryCount+ ") " : "Executing";
329 }
330
331 @Override
332 void setConsiderExecutionError(boolean considerExecutionError) {
333 this.task.setConsiderExecutionError(considerExecutionError);
334 }
335 }
336 }