View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package com.github.dexecutor.core;
19  
20  import java.io.Writer;
21  import java.util.Collection;
22  import java.util.Date;
23  import java.util.HashSet;
24  import java.util.Set;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.ScheduledExecutorService;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import com.github.dexecutor.core.graph.Node;
34  import com.github.dexecutor.core.graph.Traversar;
35  import com.github.dexecutor.core.graph.Validator;
36  import com.github.dexecutor.core.task.ExecutionResult;
37  import com.github.dexecutor.core.task.ExecutionResults;
38  import com.github.dexecutor.core.task.ExecutionStatus;
39  import com.github.dexecutor.core.task.Task;
40  import com.github.dexecutor.core.task.TaskFactory;
41  import com.github.dexecutor.core.task.TaskProvider;
42  
43  /**
44   * Default implementation of @Dexecutor
45   * 
46   * @author Nadeem Mohammad
47   * 
48   * @since 0.0.1
49   *
50   * @param <T> Type of Node/Task ID
51   * @param <R> Type of Node/Task result
52   */
53  public final class DefaultDexecutor <T extends Comparable<T>, R> implements Dexecutor<T> {
54  
55  	private static final Logger logger = LoggerFactory.getLogger(DefaultDexecutor.class);
56  
57  	private final Validator<T, R> validator;
58  	private final Traversar<T, R> traversar;
59  	private final TaskProvider<T, R> taskProvider;
60  	private final ExecutionEngine<T, R> executionEngine;
61  	private final ExecutorService immediatelyRetryExecutor;
62  	private final ScheduledExecutorService scheduledRetryExecutor;
63  
64  	private final DexecutorState<T, R> state;
65  
66  	public DefaultDexecutor(final ExecutionEngine<T, R> executionEngine, final TaskProvider<T, R> taskProvider) {
67  		this(new DexecutorConfig<>(executionEngine, taskProvider));
68  	}
69  
70  	/**
71  	 * Creates the Executor with Config
72  	 * @param config
73  	 */
74  	public DefaultDexecutor(final DexecutorConfig<T, R> config) {
75  		config.validate();
76  
77  		this.immediatelyRetryExecutor = Executors.newFixedThreadPool(config.getImmediateRetryPoolThreadsCount());
78  		this.scheduledRetryExecutor = Executors.newScheduledThreadPool(config.getScheduledRetryPoolThreadsCount());
79  
80  		this.executionEngine = config.getExecutorEngine();
81  		this.validator = config.getValidator();
82  		this.traversar = config.getTraversar();
83  		this.taskProvider = config.getTaskProvider();
84  		this.state = config.getDexecutorState();
85  
86  		this.state.initState();
87  	}
88  
89  	public void print(final Writer writer) {
90  		this.state.print(this.traversar, writer);
91  	}
92  
93  	public void addIndependent(final T nodeValue) {
94  		checkValidPhase();
95  		this.state.addIndependent(nodeValue);
96  	}
97  
98  	public void addDependency(final T evalFirstNode, final T evalLaterNode) {
99  		checkValidPhase();
100 		this.state.addDependency(evalFirstNode, evalLaterNode);
101 	}
102 
103 	public void addAsDependentOnAllLeafNodes(final T nodeValue) {
104 		checkValidPhase();
105 		this.state.addAsDependentOnAllLeafNodes(nodeValue);		
106 	}
107 
108 	@Override
109 	public void addAsDependencyToAllInitialNodes(final T nodeValue) {
110 		checkValidPhase();
111 		this.state.addAsDependencyToAllInitialNodes(nodeValue);				
112 	}
113 
114 	public void execute(final ExecutionConfig config) {
115 		validate(config);
116 
117 		this.state.setCurrentPhase(Phase.RUNNING);
118 
119 		Set<Node<T, R>> initialNodes = this.state.getInitialNodes();
120 
121 		long start = new Date().getTime();
122 
123 		doProcessNodes(config, initialNodes);
124 		shutdownExecutors();
125 
126 		long end = new Date().getTime();
127 
128 		this.state.setCurrentPhase(Phase.TERMINATED);
129 
130 		logger.debug("Total Time taken to process {} jobs is {} ms.", this.state.graphSize(), end - start);
131 		logger.debug("Processed Nodes Ordering {}", this.state.getProcessedNodes());
132 	}
133 
134 	private void shutdownExecutors() {
135 		this.immediatelyRetryExecutor.shutdown();
136 		this.scheduledRetryExecutor.shutdown();
137 		try {
138 			this.immediatelyRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
139 			this.scheduledRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
140 		} catch (InterruptedException e) {
141 			logger.error("Error Shuting down Executor", e);
142 		}		
143 	}
144 
145 	private void validate(final ExecutionConfig config) {
146 		checkValidPhase();
147 		config.validate();
148 		this.state.validate(this.validator);
149 	}
150 
151 	private void checkValidPhase() {
152 		throwExceptionIfTerminated();
153 		throwExceptionIfRunning();
154 	}
155 
156 	private void throwExceptionIfRunning() {
157 		if (Phase.RUNNING.equals(this.state.getCurrentPhase())) {
158 			throw new IllegalStateException("Dexecutor is already running!");
159 		}
160 	}
161 
162 	private void throwExceptionIfTerminated() {
163 		if (Phase.TERMINATED.equals(this.state.getCurrentPhase())) {
164 			throw new IllegalStateException("Dexecutor has been terminated!");
165 		}
166 	}
167 
168 	private void doProcessNodes(final ExecutionConfig config, final Set<Node<T, R>> nodes) {
169 		doExecute(nodes, config);
170 		doWaitForExecution(config);	
171 	}
172 
173 	private void doExecute(final Collection<Node<T, R>> nodes, final ExecutionConfig config) {
174 		for (Node<T, R> node : nodes) {
175 			if (this.state.shouldProcess(node)) {				
176 				Task<T, R> task = newTask(config, node);
177 				if (shouldExecute(node, task)) {					
178 					state.incrementUnProcessedNodesCount();
179 					logger.debug("Going to schedule {} node", node.getValue());
180 					this.executionEngine.submit(task);
181 				} else {
182 					node.setSkipped();
183 					logger.debug("Execution Skipped for node # {} ", node.getValue());
184 					this.state.markProcessingDone(node);
185 					doExecute(node.getOutGoingNodes(), config);
186 				}
187 			} else {
188 				logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
189 			}
190 		}
191 	}
192 
193 	private boolean shouldExecute(final Node<T, R> node, final Task<T, R> task) {
194 		if (task.shouldExecute(parentResults(node))) {
195 			return true;
196 		}
197 		return false;
198 	}
199 
200 	private ExecutionResults<T, R> parentResults(final Node<T, R> node) {
201 		ExecutionResults<T, R> parentResult = new ExecutionResults<T, R>();
202 		for (Node<T, R> pNode : node.getInComingNodes()) {
203 			parentResult.add(new ExecutionResult<T, R>(pNode.getValue(), pNode.getResult(), status(pNode)));
204 		}
205 		return parentResult;
206 	}
207 
208 	private ExecutionStatus status(final Node<T, R> node) {
209 		ExecutionStatus status = ExecutionStatus.SUCCESS;
210 		if (node.isErrored()) {
211 			status = ExecutionStatus.ERRORED;
212 		} else if (node.isSkipped()) {
213 			status = ExecutionStatus.SKIPPED;
214 		}
215 		return status;
216 	}
217 
218 	private void doWaitForExecution(final ExecutionConfig config) {
219 		while (state.getUnProcessedNodesCount() > 0) {
220 
221 			ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
222 			state.decrementUnProcessedNodesCount();
223 			logger.debug("Processing of node {} done, with status {}", executionResult.getId(), executionResult.getStatus());
224 
225 			final Node<T, R> processedNode = this.state.getGraphNode(executionResult.getId());
226 			updateNode(executionResult, processedNode);
227 			this.state.markProcessingDone(processedNode);
228 
229 			if (executionResult.isSuccess() && !this.executionEngine.isAnyTaskInError() && this.state.isDiscontinuedNodesNotEmpty()) {
230 				Collection<Node<T, R>> recover = new HashSet<>(this.state.getDiscontinuedNodes());	
231 				this.state.markDiscontinuedNodesProcessed();
232 				doExecute(recover, config);
233 			}
234 
235 			if (config.isNonTerminating() ||  (!this.executionEngine.isAnyTaskInError())) {
236 				doExecute(processedNode.getOutGoingNodes(), config);				
237 			} else if (this.executionEngine.isAnyTaskInError() && executionResult.isSuccess()) { 
238 				this.state.processAfterNoError(processedNode.getOutGoingNodes());
239 			} else if (shouldDoImmediateRetry(config, executionResult, processedNode)) {
240 				logger.debug("Submitting for Immediate retry, node {}", executionResult.getId());
241 				submitForImmediateRetry(config, processedNode);
242 			} else if (shouldScheduleRetry(config, executionResult, processedNode)) {
243 				logger.debug("Submitting for Scheduled retry, node {}", executionResult.getId());
244 				submitForScheduledRetry(config, processedNode);
245 			}
246 		}
247 	}
248 
249 	private boolean shouldScheduleRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
250 			final Node<T, R> processedNode) {
251 		return executionResult.isErrored() && config.isScheduledRetrying() && config.shouldRetry(getExecutionCount(processedNode));
252 	}
253 
254 	private boolean shouldDoImmediateRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
255 			final Node<T, R> processedNode) {
256 		return executionResult.isErrored() && config.isImmediatelyRetrying() && config.shouldRetry(getExecutionCount(processedNode));
257 	}
258 
259 	private void submitForImmediateRetry(final ExecutionConfig config, final Node<T, R> node) {
260 		Task<T, R> task = newTask(config, node);		
261 		this.immediatelyRetryExecutor.submit(retryingTask(config, task));
262 	}
263 
264 	private void submitForScheduledRetry(ExecutionConfig config, Node<T, R> node) {
265 		Task<T, R> task = newTask(config, node);
266 		this.scheduledRetryExecutor.schedule(retryingTask(config, task), config.getRetryDelay().getDuration(), config.getRetryDelay().getTimeUnit());
267 	}
268 
269 	private Task<T, R> newTask(final ExecutionConfig config, final Node<T, R> node) {
270 		Task<T, R> task = this.taskProvider.provideTask(node.getValue());
271 		task.setId(node.getValue());
272 		updateConsiderExecutionStatus(config, task);
273 		return TaskFactory.newWorker(task);
274 	}
275 
276 	private void updateConsiderExecutionStatus(final ExecutionConfig config, final Task<T, R> task) {
277 		if (config.isImmediatelyRetrying() || config.isScheduledRetrying()) {
278 			Node<T, R> node = this.state.getGraphNode(task.getId());
279 			Integer currentCount = getExecutionCount(node);
280 			if (currentCount < config.getRetryCount()) {
281 				task.setConsiderExecutionError(false);
282 			} else {
283 				task.setConsiderExecutionError(true);
284 			}
285 		}
286 	}
287 
288 	private Runnable retryingTask(final ExecutionConfig config, final Task<T, R> task) {
289 		this.state.incrementUnProcessedNodesCount();
290 		return new Runnable() {
291 			@Override
292 			public void run() {
293 				executionEngine.submit(task);
294 			}
295 		};
296 	}
297 
298 	private void updateExecutionCount(final Node<T, R> node) {
299 		Integer count = getExecutionCount(node);
300 		count++;
301 		node.setData(count);
302 	}
303 
304 	private Integer getExecutionCount(final Node<T, R> node) {
305 		Integer count = (Integer) node.getData();
306 		if (count == null) {
307 			count = 0;
308 		}
309 		return count;
310 	}
311 
312 	private void updateNode(final ExecutionResult<T, R> executionResult, final Node<T, R> processedNode) {
313 		updateExecutionCount(processedNode);
314 		processedNode.setResult(executionResult.getResult());
315 		if(executionResult.isErrored()) {
316 			processedNode.setErrored();
317 		} else {
318 			processedNode.setSuccess();
319 		}
320 	}
321 }