1 package com.github.dexecutor.executor;
2
3 import java.io.Writer;
4 import java.util.Collection;
5 import java.util.Date;
6 import java.util.Set;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.CompletionService;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.ExecutorCompletionService;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.atomic.AtomicInteger;
14
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 import com.github.dexecutor.executor.TaskProvider.Task;
19 import com.github.dexecutor.executor.graph.DefaultGraph;
20 import com.github.dexecutor.executor.graph.Graph;
21 import com.github.dexecutor.executor.graph.Graph.Node;
22 import com.github.dexecutor.executor.graph.Traversar;
23 import com.github.dexecutor.executor.graph.Validator;
24
25
26
27
28
29
30
31
32 public final class DefaultDependentTasksExecutor <T extends Comparable<T>> implements DependentTasksExecutor<T> {
33
34 private static final Logger logger = LoggerFactory.getLogger(DefaultDependentTasksExecutor.class);
35
36 private ExecutorService executorService;
37 private TaskProvider<T> taskProvider;
38 private Validator<T> validator;
39 private Traversar<T> traversar;
40 private Graph<T> graph;
41
42 private Collection<Node<T>> processedNodes = new CopyOnWriteArrayList<Node<T>>();
43 private AtomicInteger nodesCount = new AtomicInteger(0);
44
45
46
47
48
49 public DefaultDependentTasksExecutor(final ExecutorService executorService, final TaskProvider<T> taskProvider) {
50 this(new DependentTasksExecutorConfig<T>(executorService, taskProvider));
51 }
52
53
54
55
56 public DefaultDependentTasksExecutor(final DependentTasksExecutorConfig<T> config) {
57 config.validate();
58 this.executorService = config.getExecutorService();
59 this.taskProvider = config.getTaskProvider();
60 this.validator = config.getValidator();
61 this.traversar = config.getTraversar();
62 this.graph = new DefaultGraph<T>();
63 }
64
65 public void print(final Writer writer) {
66 this.traversar.traverse(this.graph, writer);
67 }
68
69 public void addIndependent(final T nodeValue) {
70 this.graph.addIndependent(nodeValue);
71 }
72
73 public void addDependency(final T evalFirstNode, final T evalLaterNode) {
74 this.graph.addDependency(evalFirstNode, evalLaterNode);
75 }
76
77 public void addAsDependentOnAllLeafNodes(final T nodeValue) {
78 if (this.graph.size() == 0) {
79 addIndependent(nodeValue);
80 } else {
81 for (Node<T> node : this.graph.getLeafNodes()) {
82 addDependency(node.getValue(), nodeValue);
83 }
84 }
85 }
86
87 @Override
88 public void addAsDependencyToAllInitialNodes(final T nodeValue) {
89 if (this.graph.size() == 0) {
90 addIndependent(nodeValue);
91 } else {
92 for (Node<T> node : this.graph.getInitialNodes()) {
93 addDependency(nodeValue, node.getValue());
94 }
95 }
96 }
97
98 private boolean isAlreadyProcessed(final Node<T> node) {
99 return this.processedNodes.contains(node);
100 }
101
102 private boolean areAlreadyProcessed(final Set<Node<T>> nodes) {
103 return this.processedNodes.containsAll(nodes);
104 }
105
106 public void execute(final ExecutionBehavior behavior) {
107 validate();
108
109 Set<Node<T>> initialNodes = this.graph.getInitialNodes();
110 CompletionService<Node<T>> completionService = new ExecutorCompletionService<Node<T>>(executorService);
111
112 long start = new Date().getTime();
113
114 doProcessNodes(behavior, initialNodes, completionService);
115
116 long end = new Date().getTime();
117
118 logger.debug("Total Time taken to process {} jobs is {} ms.", graph.size(), end - start);
119 logger.debug("Processed Nodes Ordering {}", this.processedNodes);
120 }
121
122 private void doProcessNodes(final ExecutionBehavior behavior, final Set<Node<T>> nodes, final CompletionService<Node<T>> completionService) {
123 doExecute(nodes, completionService, behavior);
124 doWaitForExecution(completionService, behavior);
125 }
126
127 private void validate() {
128 this.validator.validate(this.graph);
129 }
130
131 private void doExecute(final Collection<Node<T>> nodes, final CompletionService<Node<T>> completionService, final ExecutionBehavior behavior) {
132 for (Node<T> node : nodes) {
133 if (shouldProcess(node) ) {
134 nodesCount.incrementAndGet();
135 logger.debug("Going to schedule {} node", node.getValue());
136 completionService.submit(newWorker(node, behavior));
137 } else {
138 logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
139 }
140 }
141 }
142
143 private boolean shouldProcess(final Node<T> node) {
144 return !this.executorService.isShutdown() && !isAlreadyProcessed(node) && allIncomingNodesProcessed(node);
145 }
146
147 private boolean allIncomingNodesProcessed(final Node<T> node) {
148 if (node.getInComingNodes().isEmpty() || areAlreadyProcessed(node.getInComingNodes())) {
149 return true;
150 }
151 return false;
152 }
153
154 private void doWaitForExecution(final CompletionService<Node<T>> completionService, final ExecutionBehavior behavior) {
155 int cuurentCount = 0;
156 while (cuurentCount != nodesCount.get()) {
157 try {
158 Future<Node<T>> future = completionService.take();
159 Node<T> processedNode = future.get();
160 logger.debug("Processing of node {} done", processedNode.getValue());
161 cuurentCount++;
162 this.processedNodes.add(processedNode);
163
164 doExecute(processedNode.getOutGoingNodes(), completionService, behavior);
165 } catch (Exception e) {
166 cuurentCount++;
167 logger.error("Task interrupted", e);
168 }
169 }
170 }
171
172 private Callable<Node<T>> newWorker(final Node<T> graphNode, final ExecutionBehavior behavior) {
173 if (ExecutionBehavior.NON_TERMINATING.equals(behavior)) {
174 return new NonTerminatingTask(graphNode);
175 } else if (ExecutionBehavior.RETRY_ONCE_TERMINATING.equals(behavior)) {
176 return new RetryOnceAndTerminateTask(graphNode);
177 } else {
178 return new TerminatingTask(graphNode);
179 }
180 }
181
182 private class TerminatingTask implements Callable<Node<T>> {
183 private Node<T> node;
184
185 public TerminatingTask(final Node<T> graphNode) {
186 this.node = graphNode;
187 }
188
189 public Node<T> call() throws Exception {
190 Task task = newLoggingTask(this.node.getValue());
191 task.setConsiderExecutionError(true);
192 task.execute();
193 return this.node;
194 }
195 }
196
197 private class NonTerminatingTask implements Callable<Node<T>> {
198 private Node<T> node;
199
200 public NonTerminatingTask(final Node<T> graphNode) {
201 this.node = graphNode;
202 }
203
204 public Node<T> call() throws Exception {
205 try {
206 Task task = newLoggingTask(this.node.getValue());
207 task.setConsiderExecutionError(false);
208 task.execute();
209 } catch(Exception ex) {
210 logger.error("Exception caught, executing node # " + this.node.getValue(), ex);
211 }
212 return this.node;
213 }
214 }
215
216 private class RetryOnceAndTerminateTask implements Callable<Node<T>> {
217 private Node<T> node;
218
219 public RetryOnceAndTerminateTask(final Node<T> graphNode) {
220 this.node = graphNode;
221 }
222
223 public Node<T> call() throws Exception {
224 Task task = newLoggingTask(this.node.getValue());
225 boolean retry = shouldRetry(this.node.getValue());
226 task.setConsiderExecutionError(!retry);
227 try {
228 task.execute();
229 } catch(Exception ex) {
230 logger.error("Exception caught, executing node # " + this.node.getValue() + " Retry would happen : " + getYesNo(retry), ex);
231 if (retry) {
232 task.setConsiderExecutionError(true);
233 task.execute();
234 }
235 }
236 return this.node;
237 }
238
239 private String getYesNo(boolean retry) {
240 return retry ? "Yes" : "No";
241 }
242 }
243
244 protected boolean shouldRetry(final T node) {
245 return true;
246 }
247
248 private Task newLoggingTask(final T taskId) {
249 return new LoggingTask(taskId, this.taskProvider.provid(taskId));
250 }
251
252 private class LoggingTask extends Task {
253 private final Task task;
254 private final T taskId;
255 private int retryCount = 0;
256
257 public LoggingTask(final T taskId, final Task task) {
258 this.task = task;
259 this.taskId = taskId;
260 }
261
262 public void execute() {
263 logger.debug("{} Node # {}", msg(this.retryCount), this.taskId);
264 this.retryCount ++;
265 this.task.execute();
266 logger.debug("Node # {}, Execution Done!", this.taskId);
267 }
268
269 private String msg(int retryCount) {
270 return retryCount > 0 ? "Retrying(" + retryCount+ ") " : "Executing";
271 }
272
273 @Override
274 void setConsiderExecutionError(boolean considerExecutionError) {
275 this.task.setConsiderExecutionError(considerExecutionError);
276 }
277 }
278 }