1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package com.github.dexecutor.core;
19
20 import java.util.Collection;
21 import java.util.Date;
22 import java.util.HashSet;
23 import java.util.Set;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.TimeUnit;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import com.github.dexecutor.core.graph.Node;
33 import com.github.dexecutor.core.graph.Traversar;
34 import com.github.dexecutor.core.graph.TraversarAction;
35 import com.github.dexecutor.core.graph.Validator;
36 import com.github.dexecutor.core.task.ExecutionResult;
37 import com.github.dexecutor.core.task.ExecutionResults;
38 import com.github.dexecutor.core.task.ExecutionStatus;
39 import com.github.dexecutor.core.task.Task;
40 import com.github.dexecutor.core.task.TaskFactory;
41 import com.github.dexecutor.core.task.TaskProvider;
42
43
44
45
46
47
48
49
50
51
52
53 public class DefaultDexecutor <T extends Comparable<T>, R> implements Dexecutor<T, R> {
54
55 private static final Logger logger = LoggerFactory.getLogger(DefaultDexecutor.class);
56
57 private final Validator<T, R> validator;
58 private final TaskProvider<T, R> taskProvider;
59 private final ExecutionEngine<T, R> executionEngine;
60 private final ExecutorService immediatelyRetryExecutor;
61 private final ScheduledExecutorService scheduledRetryExecutor;
62
63 private final DexecutorState<T, R> state;
64
65
66
67
68
69 public DefaultDexecutor(final DexecutorConfig<T, R> config) {
70 config.validate();
71
72 this.immediatelyRetryExecutor = Executors.newFixedThreadPool(config.getImmediateRetryPoolThreadsCount());
73 this.scheduledRetryExecutor = Executors.newScheduledThreadPool(config.getScheduledRetryPoolThreadsCount());
74
75 this.executionEngine = config.getExecutorEngine();
76 this.validator = config.getValidator();
77 this.taskProvider = config.getTaskProvider();
78 this.state = config.getDexecutorState();
79 }
80
81 public void print(final Traversar<T, R> traversar, final TraversarAction<T, R> action) {
82 this.state.print(traversar, action);
83 }
84
85 public void addIndependent(final T nodeValue) {
86 checkValidPhase();
87 this.state.addIndependent(nodeValue);
88 }
89
90 public void addDependency(final T evalFirstNode, final T evalLaterNode) {
91 checkValidPhase();
92 this.state.addDependency(evalFirstNode, evalLaterNode);
93 }
94
95 public void addAsDependentOnAllLeafNodes(final T nodeValue) {
96 checkValidPhase();
97 this.state.addAsDependentOnAllLeafNodes(nodeValue);
98 }
99
100 @Override
101 public void addAsDependencyToAllInitialNodes(final T nodeValue) {
102 checkValidPhase();
103 this.state.addAsDependencyToAllInitialNodes(nodeValue);
104 }
105
106 @Override
107 public void recoverExecution(final ExecutionConfig config) {
108 if (Phase.TERMINATED.equals(this.state.getCurrentPhase())) {
109 throw new IllegalStateException("Can't recover terminated dexecutor");
110 } else {
111 logger.debug("Recovering Dexecutor.");
112 doWaitForExecution(config);
113 doExecute(this.state.getNonProcessedRootNodes(), config);
114 doWaitForExecution(config);
115 logger.debug("Processed Nodes Ordering {}", this.state.getProcessedNodes());
116 }
117 }
118
119 public void execute(final ExecutionConfig config) {
120 validate(config);
121
122 this.state.setCurrentPhase(Phase.RUNNING);
123
124 Set<Node<T, R>> initialNodes = this.state.getInitialNodes();
125
126 long start = new Date().getTime();
127
128 doProcessNodes(config, initialNodes);
129 shutdownExecutors();
130
131 long end = new Date().getTime();
132
133 this.state.setCurrentPhase(Phase.TERMINATED);
134
135 logger.debug("Total Time taken to process {} jobs is {} ms.", this.state.graphSize(), end - start);
136 logger.debug("Processed Nodes Ordering {}", this.state.getProcessedNodes());
137 }
138
139 private void shutdownExecutors() {
140 this.immediatelyRetryExecutor.shutdown();
141 this.scheduledRetryExecutor.shutdown();
142 try {
143 this.immediatelyRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
144 this.scheduledRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
145 } catch (InterruptedException e) {
146 logger.error("Error Shuting down Executor", e);
147 }
148 }
149
150 private void validate(final ExecutionConfig config) {
151 config.validate();
152 checkValidPhase();
153 this.state.validate(this.validator);
154 }
155
156 private void checkValidPhase() {
157 throwExceptionIfTerminated();
158 throwExceptionIfRunning();
159 }
160
161 private void throwExceptionIfRunning() {
162 if (Phase.RUNNING.equals(this.state.getCurrentPhase())) {
163 throw new IllegalStateException("Dexecutor is already running!");
164 }
165 }
166
167 private void throwExceptionIfTerminated() {
168 if (Phase.TERMINATED.equals(this.state.getCurrentPhase())) {
169 throw new IllegalStateException("Dexecutor has been terminated!");
170 }
171 }
172
173 private void doProcessNodes(final ExecutionConfig config, final Set<Node<T, R>> nodes) {
174 doExecute(nodes, config);
175 doWaitForExecution(config);
176 }
177
178 private void doExecute(final Collection<Node<T, R>> nodes, final ExecutionConfig config) {
179 for (Node<T, R> node : nodes) {
180 forceStopIfRequired();
181 if (this.state.shouldProcess(node)) {
182 Task<T, R> task = newTask(config, node);
183 if (node.isNotProcessed() && shouldExecute(node, task)) {
184 this.state.incrementUnProcessedNodesCount();
185 logger.debug("Going to schedule {} node", node.getValue());
186 this.executionEngine.submit(task);
187 } else if (node.isNotProcessed()){
188 node.setSkipped();
189 logger.debug("Execution Skipped for node # {} ", node.getValue());
190 this.state.markProcessingDone(node);
191 doExecute(node.getOutGoingNodes(), config);
192 }
193 } else {
194 logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
195 }
196 }
197 }
198
199 private boolean shouldExecute(final Node<T, R> node, final Task<T, R> task) {
200 if (task.shouldExecute(parentResults(node))) {
201 return true;
202 }
203 return false;
204 }
205
206 private ExecutionResults<T, R> parentResults(final Node<T, R> node) {
207 ExecutionResults<T, R> parentResult = new ExecutionResults<T, R>();
208 for (Node<T, R> pNode : node.getInComingNodes()) {
209 parentResult.add(new ExecutionResult<T, R>(pNode.getValue(), pNode.getResult(), status(pNode)));
210 }
211 return parentResult;
212 }
213
214 private ExecutionStatus status(final Node<T, R> node) {
215 ExecutionStatus status = ExecutionStatus.SUCCESS;
216 if (node.isErrored()) {
217 status = ExecutionStatus.ERRORED;
218 } else if (node.isSkipped()) {
219 status = ExecutionStatus.SKIPPED;
220 }
221 return status;
222 }
223
224 private void doWaitForExecution(final ExecutionConfig config) {
225 while (state.getUnProcessedNodesCount() > 0) {
226 forceStopIfRequired();
227 ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
228 doAfterExecutionDone(config, executionResult);
229 }
230 }
231
232
233 private void doAfterExecutionDone(final ExecutionConfig config, final ExecutionResult<T, R> executionResult) {
234 logger.debug("Processing of node {} done, with status {}", executionResult.getId(), executionResult.getStatus());
235 state.decrementUnProcessedNodesCount();
236
237 final Node<T, R> processedNode = state.getGraphNode(executionResult.getId());
238 updateNode(executionResult, processedNode);
239 state.markProcessingDone(processedNode);
240
241 if (executionResult.isSuccess() && !executionEngine.isAnyTaskInError() && state.isDiscontinuedNodesNotEmpty()) {
242 Collection<Node<T, R>> recover = new HashSet<>(state.getDiscontinuedNodes());
243 state.markDiscontinuedNodesProcessed();
244 doExecute(recover, config);
245 }
246
247 if (config.isNonTerminating() || (!executionEngine.isAnyTaskInError())) {
248 doExecute(processedNode.getOutGoingNodes(), config);
249 } else if (executionEngine.isAnyTaskInError() && executionResult.isSuccess()) {
250 state.processAfterNoError(processedNode.getOutGoingNodes());
251 } else if (shouldDoImmediateRetry(config, executionResult, processedNode)) {
252 logger.debug("Submitting for Immediate retry, node {}", executionResult.getId());
253 submitForImmediateRetry(config, processedNode);
254 } else if (shouldScheduleRetry(config, executionResult, processedNode)) {
255 logger.debug("Submitting for Scheduled retry, node {}", executionResult.getId());
256 submitForScheduledRetry(config, processedNode);
257 }
258 }
259
260 private boolean shouldScheduleRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
261 final Node<T, R> processedNode) {
262 return executionResult.isErrored() && config.isScheduledRetrying() && config.shouldRetry(getExecutionCount(processedNode));
263 }
264
265 private boolean shouldDoImmediateRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
266 final Node<T, R> processedNode) {
267 return executionResult.isErrored() && config.isImmediatelyRetrying() && config.shouldRetry(getExecutionCount(processedNode));
268 }
269
270 private void submitForImmediateRetry(final ExecutionConfig config, final Node<T, R> node) {
271 Task<T, R> task = newTask(config, node);
272 this.immediatelyRetryExecutor.submit(retryingTask(config, task));
273 }
274
275 private void submitForScheduledRetry(final ExecutionConfig config, final Node<T, R> node) {
276 Task<T, R> task = newTask(config, node);
277 this.scheduledRetryExecutor.schedule(retryingTask(config, task), config.getRetryDelay().getDuration(), config.getRetryDelay().getTimeUnit());
278 }
279
280 private Task<T, R> newTask(final ExecutionConfig config, final Node<T, R> node) {
281 Task<T, R> task = this.taskProvider.provideTask(node.getValue());
282 task.setId(node.getValue());
283 updateConsiderExecutionStatus(config, task);
284 return TaskFactory.newWorker(task);
285 }
286
287 private void updateConsiderExecutionStatus(final ExecutionConfig config, final Task<T, R> task) {
288 if (config.isImmediatelyRetrying() || config.isScheduledRetrying()) {
289 Node<T, R> node = this.state.getGraphNode(task.getId());
290 Integer currentCount = getExecutionCount(node);
291 if (currentCount < config.getRetryCount()) {
292 task.setConsiderExecutionError(false);
293 } else {
294 task.setConsiderExecutionError(true);
295 }
296 }
297 }
298
299 private Runnable retryingTask(final ExecutionConfig config, final Task<T, R> task) {
300 this.state.incrementUnProcessedNodesCount();
301 return new Runnable() {
302 @Override
303 public void run() {
304 executionEngine.submit(task);
305 }
306 };
307 }
308
309 private void updateExecutionCount(final Node<T, R> node) {
310 Integer count = getExecutionCount(node);
311 count++;
312 node.setData(count);
313 }
314
315 private Integer getExecutionCount(final Node<T, R> node) {
316 Integer count = (Integer) node.getData();
317 if (count == null) {
318 count = 0;
319 }
320 return count;
321 }
322
323 private void updateNode(final ExecutionResult<T, R> executionResult, final Node<T, R> processedNode) {
324 updateExecutionCount(processedNode);
325 processedNode.setResult(executionResult.getResult());
326 if(executionResult.isErrored()) {
327 processedNode.setErrored();
328 } else {
329 processedNode.setSuccess();
330 }
331 }
332
333 private void forceStopIfRequired() {
334 if (!shouldContinueProcessingNodes()) {
335 this.state.forcedStop();
336 this.immediatelyRetryExecutor.shutdownNow();
337 this.scheduledRetryExecutor.shutdownNow();
338 throw new IllegalStateException("Forced to Stop the instance of Dexecutor!");
339 }
340 }
341
342
343
344
345
346 protected boolean shouldContinueProcessingNodes() {
347 return true;
348 }
349 }