1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package com.github.dexecutor.core;
19
20 import java.io.Writer;
21 import java.util.Collection;
22 import java.util.Date;
23 import java.util.HashSet;
24 import java.util.Set;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import com.github.dexecutor.core.graph.Node;
34 import com.github.dexecutor.core.graph.Traversar;
35 import com.github.dexecutor.core.graph.Validator;
36 import com.github.dexecutor.core.task.ExecutionResult;
37 import com.github.dexecutor.core.task.ExecutionResults;
38 import com.github.dexecutor.core.task.ExecutionStatus;
39 import com.github.dexecutor.core.task.Task;
40 import com.github.dexecutor.core.task.TaskFactory;
41 import com.github.dexecutor.core.task.TaskProvider;
42
43
44
45
46
47
48
49
50
51
52
53 public final class DefaultDexecutor <T extends Comparable<T>, R> implements Dexecutor<T> {
54
55 private static final Logger logger = LoggerFactory.getLogger(DefaultDexecutor.class);
56
57 private final Validator<T, R> validator;
58 private final Traversar<T, R> traversar;
59 private final TaskProvider<T, R> taskProvider;
60 private final ExecutionEngine<T, R> executionEngine;
61 private final ExecutorService immediatelyRetryExecutor;
62 private final ScheduledExecutorService scheduledRetryExecutor;
63
64 private final DexecutorState<T, R> state;
65
66 public DefaultDexecutor(final ExecutionEngine<T, R> executionEngine, final TaskProvider<T, R> taskProvider) {
67 this(new DexecutorConfig<>(executionEngine, taskProvider));
68 }
69
70
71
72
73
74 public DefaultDexecutor(final DexecutorConfig<T, R> config) {
75 config.validate();
76
77 this.immediatelyRetryExecutor = Executors.newFixedThreadPool(config.getImmediateRetryPoolThreadsCount());
78 this.scheduledRetryExecutor = Executors.newScheduledThreadPool(config.getScheduledRetryPoolThreadsCount());
79
80 this.executionEngine = config.getExecutorEngine();
81 this.validator = config.getValidator();
82 this.traversar = config.getTraversar();
83 this.taskProvider = config.getTaskProvider();
84 this.state = config.getDexecutorState();
85
86 this.state.initState();
87 }
88
89 public void print(final Writer writer) {
90 this.state.print(this.traversar, writer);
91 }
92
93 public void addIndependent(final T nodeValue) {
94 checkValidPhase();
95 this.state.addIndependent(nodeValue);
96 }
97
98 public void addDependency(final T evalFirstNode, final T evalLaterNode) {
99 checkValidPhase();
100 this.state.addDependency(evalFirstNode, evalLaterNode);
101 }
102
103 public void addAsDependentOnAllLeafNodes(final T nodeValue) {
104 checkValidPhase();
105 this.state.addAsDependentOnAllLeafNodes(nodeValue);
106 }
107
108 @Override
109 public void addAsDependencyToAllInitialNodes(final T nodeValue) {
110 checkValidPhase();
111 this.state.addAsDependencyToAllInitialNodes(nodeValue);
112 }
113
114 public void execute(final ExecutionConfig config) {
115 validate(config);
116
117 this.state.setCurrentPhase(Phase.RUNNING);
118
119 Set<Node<T, R>> initialNodes = this.state.getInitialNodes();
120
121 long start = new Date().getTime();
122
123 doProcessNodes(config, initialNodes);
124 shutdownExecutors();
125
126 long end = new Date().getTime();
127
128 this.state.setCurrentPhase(Phase.TERMINATED);
129
130 logger.debug("Total Time taken to process {} jobs is {} ms.", this.state.graphSize(), end - start);
131 logger.debug("Processed Nodes Ordering {}", this.state.getProcessedNodes());
132 }
133
134 private void shutdownExecutors() {
135 this.immediatelyRetryExecutor.shutdown();
136 this.scheduledRetryExecutor.shutdown();
137 try {
138 this.immediatelyRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
139 this.scheduledRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
140 } catch (InterruptedException e) {
141 logger.error("Error Shuting down Executor", e);
142 }
143 }
144
145 private void validate(final ExecutionConfig config) {
146 checkValidPhase();
147 config.validate();
148 this.state.validate(this.validator);
149 }
150
151 private void checkValidPhase() {
152 throwExceptionIfTerminated();
153 throwExceptionIfRunning();
154 }
155
156 private void throwExceptionIfRunning() {
157 if (Phase.RUNNING.equals(this.state.getCurrentPhase())) {
158 throw new IllegalStateException("Dexecutor is already running!");
159 }
160 }
161
162 private void throwExceptionIfTerminated() {
163 if (Phase.TERMINATED.equals(this.state.getCurrentPhase())) {
164 throw new IllegalStateException("Dexecutor has been terminated!");
165 }
166 }
167
168 private void doProcessNodes(final ExecutionConfig config, final Set<Node<T, R>> nodes) {
169 doExecute(nodes, config);
170 doWaitForExecution(config);
171 }
172
173 private void doExecute(final Collection<Node<T, R>> nodes, final ExecutionConfig config) {
174 for (Node<T, R> node : nodes) {
175 if (this.state.shouldProcess(node)) {
176 Task<T, R> task = newTask(config, node);
177 if (shouldExecute(node, task)) {
178 state.incrementUnProcessedNodesCount();
179 logger.debug("Going to schedule {} node", node.getValue());
180 this.executionEngine.submit(task);
181 } else {
182 node.setSkipped();
183 logger.debug("Execution Skipped for node # {} ", node.getValue());
184 this.state.markProcessingDone(node);
185 doExecute(node.getOutGoingNodes(), config);
186 }
187 } else {
188 logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
189 }
190 }
191 }
192
193 private boolean shouldExecute(final Node<T, R> node, final Task<T, R> task) {
194 if (task.shouldExecute(parentResults(node))) {
195 return true;
196 }
197 return false;
198 }
199
200 private ExecutionResults<T, R> parentResults(final Node<T, R> node) {
201 ExecutionResults<T, R> parentResult = new ExecutionResults<T, R>();
202 for (Node<T, R> pNode : node.getInComingNodes()) {
203 parentResult.add(new ExecutionResult<T, R>(pNode.getValue(), pNode.getResult(), status(pNode)));
204 }
205 return parentResult;
206 }
207
208 private ExecutionStatus status(final Node<T, R> node) {
209 ExecutionStatus status = ExecutionStatus.SUCCESS;
210 if (node.isErrored()) {
211 status = ExecutionStatus.ERRORED;
212 } else if (node.isSkipped()) {
213 status = ExecutionStatus.SKIPPED;
214 }
215 return status;
216 }
217
218 private void doWaitForExecution(final ExecutionConfig config) {
219 while (state.getUnProcessedNodesCount() > 0) {
220
221 ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
222 state.decrementUnProcessedNodesCount();
223 logger.debug("Processing of node {} done, with status {}", executionResult.getId(), executionResult.getStatus());
224
225 final Node<T, R> processedNode = this.state.getGraphNode(executionResult.getId());
226 updateNode(executionResult, processedNode);
227 this.state.markProcessingDone(processedNode);
228
229 if (executionResult.isSuccess() && !this.executionEngine.isAnyTaskInError() && this.state.isDiscontinuedNodesNotEmpty()) {
230 Collection<Node<T, R>> recover = new HashSet<>(this.state.getDiscontinuedNodes());
231 this.state.markDiscontinuedNodesProcessed();
232 doExecute(recover, config);
233 }
234
235 if (config.isNonTerminating() || (!this.executionEngine.isAnyTaskInError())) {
236 doExecute(processedNode.getOutGoingNodes(), config);
237 } else if (this.executionEngine.isAnyTaskInError() && executionResult.isSuccess()) {
238 this.state.processAfterNoError(processedNode.getOutGoingNodes());
239 } else if (shouldDoImmediateRetry(config, executionResult, processedNode)) {
240 logger.debug("Submitting for Immediate retry, node {}", executionResult.getId());
241 submitForImmediateRetry(config, processedNode);
242 } else if (shouldScheduleRetry(config, executionResult, processedNode)) {
243 logger.debug("Submitting for Scheduled retry, node {}", executionResult.getId());
244 submitForScheduledRetry(config, processedNode);
245 }
246 }
247 }
248
249 private boolean shouldScheduleRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
250 final Node<T, R> processedNode) {
251 return executionResult.isErrored() && config.isScheduledRetrying() && config.shouldRetry(getExecutionCount(processedNode));
252 }
253
254 private boolean shouldDoImmediateRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
255 final Node<T, R> processedNode) {
256 return executionResult.isErrored() && config.isImmediatelyRetrying() && config.shouldRetry(getExecutionCount(processedNode));
257 }
258
259 private void submitForImmediateRetry(final ExecutionConfig config, final Node<T, R> node) {
260 Task<T, R> task = newTask(config, node);
261 this.immediatelyRetryExecutor.submit(retryingTask(config, task));
262 }
263
264 private void submitForScheduledRetry(ExecutionConfig config, Node<T, R> node) {
265 Task<T, R> task = newTask(config, node);
266 this.scheduledRetryExecutor.schedule(retryingTask(config, task), config.getRetryDelay().getDuration(), config.getRetryDelay().getTimeUnit());
267 }
268
269 private Task<T, R> newTask(final ExecutionConfig config, final Node<T, R> node) {
270 Task<T, R> task = this.taskProvider.provideTask(node.getValue());
271 task.setId(node.getValue());
272 updateConsiderExecutionStatus(config, task);
273 return TaskFactory.newWorker(task);
274 }
275
276 private void updateConsiderExecutionStatus(final ExecutionConfig config, final Task<T, R> task) {
277 if (config.isImmediatelyRetrying() || config.isScheduledRetrying()) {
278 Node<T, R> node = this.state.getGraphNode(task.getId());
279 Integer currentCount = getExecutionCount(node);
280 if (currentCount < config.getRetryCount()) {
281 task.setConsiderExecutionError(false);
282 } else {
283 task.setConsiderExecutionError(true);
284 }
285 }
286 }
287
288 private Runnable retryingTask(final ExecutionConfig config, final Task<T, R> task) {
289 this.state.incrementUnProcessedNodesCount();
290 return new Runnable() {
291 @Override
292 public void run() {
293 executionEngine.submit(task);
294 }
295 };
296 }
297
298 private void updateExecutionCount(final Node<T, R> node) {
299 Integer count = getExecutionCount(node);
300 count++;
301 node.setData(count);
302 }
303
304 private Integer getExecutionCount(final Node<T, R> node) {
305 Integer count = (Integer) node.getData();
306 if (count == null) {
307 count = 0;
308 }
309 return count;
310 }
311
312 private void updateNode(final ExecutionResult<T, R> executionResult, final Node<T, R> processedNode) {
313 updateExecutionCount(processedNode);
314 processedNode.setResult(executionResult.getResult());
315 if(executionResult.isErrored()) {
316 processedNode.setErrored();
317 } else {
318 processedNode.setSuccess();
319 }
320 }
321 }