View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package com.github.dexecutor.core;
19  
20  import java.util.Collection;
21  import java.util.Date;
22  import java.util.HashSet;
23  import java.util.Set;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ScheduledExecutorService;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  import com.github.dexecutor.core.graph.Node;
33  import com.github.dexecutor.core.graph.Traversar;
34  import com.github.dexecutor.core.graph.TraversarAction;
35  import com.github.dexecutor.core.graph.Validator;
36  import com.github.dexecutor.core.task.ExecutionResult;
37  import com.github.dexecutor.core.task.ExecutionResults;
38  import com.github.dexecutor.core.task.ExecutionStatus;
39  import com.github.dexecutor.core.task.Task;
40  import com.github.dexecutor.core.task.TaskFactory;
41  import com.github.dexecutor.core.task.TaskProvider;
42  
43  /**
44   * Default implementation of @Dexecutor
45   * 
46   * @author Nadeem Mohammad
47   * 
48   * @since 0.0.1
49   *
50   * @param <T> Type of Node/Task ID
51   * @param <R> Type of Node/Task result
52   */
53  public class DefaultDexecutor <T extends Comparable<T>, R> implements Dexecutor<T, R> {
54  
55  	private static final Logger logger = LoggerFactory.getLogger(DefaultDexecutor.class);
56  
57  	private final Validator<T, R> validator;
58  	private final TaskProvider<T, R> taskProvider;
59  	private final ExecutionEngine<T, R> executionEngine;
60  	private final ExecutorService immediatelyRetryExecutor;
61  	private final ScheduledExecutorService scheduledRetryExecutor;
62  
63  	private final DexecutorState<T, R> state;
64  
65  	/**
66  	 * Creates the Executor with Config
67  	 * @param config based on which dexecutor would  be constructed
68  	 */
69  	public DefaultDexecutor(final DexecutorConfig<T, R> config) {
70  		config.validate();
71  
72  		this.immediatelyRetryExecutor = Executors.newFixedThreadPool(config.getImmediateRetryPoolThreadsCount());
73  		this.scheduledRetryExecutor = Executors.newScheduledThreadPool(config.getScheduledRetryPoolThreadsCount());
74  
75  		this.executionEngine = config.getExecutorEngine();
76  		this.validator = config.getValidator();
77  		this.taskProvider = config.getTaskProvider();
78  		this.state = config.getDexecutorState();
79  	}
80  
81  	public void print(final Traversar<T, R> traversar, final TraversarAction<T, R> action) {
82  		this.state.print(traversar, action);
83  	}
84  
85  	public void addIndependent(final T nodeValue) {
86  		checkValidPhase();
87  		this.state.addIndependent(nodeValue);
88  	}
89  
90  	public void addDependency(final T evalFirstNode, final T evalLaterNode) {
91  		checkValidPhase();
92  		this.state.addDependency(evalFirstNode, evalLaterNode);
93  	}
94  
95  	public void addAsDependentOnAllLeafNodes(final T nodeValue) {
96  		checkValidPhase();
97  		this.state.addAsDependentOnAllLeafNodes(nodeValue);		
98  	}
99  
100 	@Override
101 	public void addAsDependencyToAllInitialNodes(final T nodeValue) {
102 		checkValidPhase();
103 		this.state.addAsDependencyToAllInitialNodes(nodeValue);				
104 	}
105 
106 	@Override
107 	public void recoverExecution(final ExecutionConfig config) {
108 		if (Phase.TERMINATED.equals(this.state.getCurrentPhase())) {
109 			throw new IllegalStateException("Can't recover terminated dexecutor");		
110 		} else {	
111 			logger.debug("Recovering Dexecutor.");
112 			doWaitForExecution(config);
113 			doExecute(this.state.getNonProcessedRootNodes(), config);
114 			doWaitForExecution(config);
115 			logger.debug("Processed Nodes Ordering {}", this.state.getProcessedNodes());
116 		}
117 	}
118 
119 	public void execute(final ExecutionConfig config) {
120 		validate(config);
121 
122 		this.state.setCurrentPhase(Phase.RUNNING);
123 
124 		Set<Node<T, R>> initialNodes = this.state.getInitialNodes();
125 
126 		long start = new Date().getTime();
127 
128 		doProcessNodes(config, initialNodes);
129 		shutdownExecutors();
130 
131 		long end = new Date().getTime();
132 
133 		this.state.setCurrentPhase(Phase.TERMINATED);
134 
135 		logger.debug("Total Time taken to process {} jobs is {} ms.", this.state.graphSize(), end - start);
136 		logger.debug("Processed Nodes Ordering {}", this.state.getProcessedNodes());
137 	}
138 
139 	private void shutdownExecutors() {
140 		this.immediatelyRetryExecutor.shutdown();
141 		this.scheduledRetryExecutor.shutdown();
142 		try {
143 			this.immediatelyRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
144 			this.scheduledRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
145 		} catch (InterruptedException e) {
146 			logger.error("Error Shuting down Executor", e);
147 		}		
148 	}
149 
150 	private void validate(final ExecutionConfig config) {
151 		config.validate();
152 		checkValidPhase();
153 		this.state.validate(this.validator);
154 	}
155 
156 	private void checkValidPhase() {
157 		throwExceptionIfTerminated();
158 		throwExceptionIfRunning();
159 	}
160 
161 	private void throwExceptionIfRunning() {
162 		if (Phase.RUNNING.equals(this.state.getCurrentPhase())) {
163 			throw new IllegalStateException("Dexecutor is already running!");
164 		}
165 	}
166 
167 	private void throwExceptionIfTerminated() {
168 		if (Phase.TERMINATED.equals(this.state.getCurrentPhase())) {
169 			throw new IllegalStateException("Dexecutor has been terminated!");
170 		}
171 	}
172 
173 	private void doProcessNodes(final ExecutionConfig config, final Set<Node<T, R>> nodes) {
174 		doExecute(nodes, config);
175 		doWaitForExecution(config);	
176 	}
177 
178 	private void doExecute(final Collection<Node<T, R>> nodes, final ExecutionConfig config) {
179 		for (Node<T, R> node : nodes) {
180 			forceStopIfRequired();
181 			if (this.state.shouldProcess(node)) {				
182 				Task<T, R> task = newTask(config, node);
183 				if (node.isNotProcessed() && shouldExecute(node, task)) {					
184 					this.state.incrementUnProcessedNodesCount();
185 					logger.debug("Going to schedule {} node", node.getValue());
186 					this.executionEngine.submit(task);
187 				} else if (node.isNotProcessed()){
188 					node.setSkipped();
189 					logger.debug("Execution Skipped for node # {} ", node.getValue());
190 					this.state.markProcessingDone(node);
191 					doExecute(node.getOutGoingNodes(), config);
192 				}
193 			} else {
194 				logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
195 			}
196 		}
197 	}
198 
199 	private boolean shouldExecute(final Node<T, R> node, final Task<T, R> task) {
200 		if (task.shouldExecute(parentResults(node))) {
201 			return true;
202 		}
203 		return false;
204 	}
205 
206 	private ExecutionResults<T, R> parentResults(final Node<T, R> node) {
207 		ExecutionResults<T, R> parentResult = new ExecutionResults<T, R>();
208 		for (Node<T, R> pNode : node.getInComingNodes()) {
209 			parentResult.add(new ExecutionResult<T, R>(pNode.getValue(), pNode.getResult(), status(pNode)));
210 		}
211 		return parentResult;
212 	}
213 
214 	private ExecutionStatus status(final Node<T, R> node) {
215 		ExecutionStatus status = ExecutionStatus.SUCCESS;
216 		if (node.isErrored()) {
217 			status = ExecutionStatus.ERRORED;
218 		} else if (node.isSkipped()) {
219 			status = ExecutionStatus.SKIPPED;
220 		}
221 		return status;
222 	}
223 
224 	private void doWaitForExecution(final ExecutionConfig config) {
225 		while (state.getUnProcessedNodesCount() > 0) {
226 			forceStopIfRequired();
227 			ExecutionResult<T, R> executionResult = this.executionEngine.processResult();			
228 			doAfterExecutionDone(config, executionResult);
229 		}
230 	}
231 
232 	//Check if it can run in separate thread
233 	private void doAfterExecutionDone(final ExecutionConfig config, final ExecutionResult<T, R> executionResult) {
234 		logger.debug("Processing of node {} done, with status {}", executionResult.getId(), executionResult.getStatus());
235 		state.decrementUnProcessedNodesCount();
236 		
237 		final Node<T, R> processedNode = state.getGraphNode(executionResult.getId());
238 		updateNode(executionResult, processedNode);
239 		state.markProcessingDone(processedNode);
240 
241 		if (executionResult.isSuccess() && !executionEngine.isAnyTaskInError() && state.isDiscontinuedNodesNotEmpty()) {
242 			Collection<Node<T, R>> recover = new HashSet<>(state.getDiscontinuedNodes());	
243 			state.markDiscontinuedNodesProcessed();
244 			doExecute(recover, config);
245 		}
246 
247 		if (config.isNonTerminating() ||  (!executionEngine.isAnyTaskInError())) {
248 			doExecute(processedNode.getOutGoingNodes(), config);				
249 		} else if (executionEngine.isAnyTaskInError() && executionResult.isSuccess()) { 
250 			state.processAfterNoError(processedNode.getOutGoingNodes());
251 		} else if (shouldDoImmediateRetry(config, executionResult, processedNode)) {
252 			logger.debug("Submitting for Immediate retry, node {}", executionResult.getId());
253 			submitForImmediateRetry(config, processedNode);
254 		} else if (shouldScheduleRetry(config, executionResult, processedNode)) {
255 			logger.debug("Submitting for Scheduled retry, node {}", executionResult.getId());
256 			submitForScheduledRetry(config, processedNode);
257 		}
258 	}
259 
260 	private boolean shouldScheduleRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
261 			final Node<T, R> processedNode) {
262 		return executionResult.isErrored() && config.isScheduledRetrying() && config.shouldRetry(getExecutionCount(processedNode));
263 	}
264 
265 	private boolean shouldDoImmediateRetry(final ExecutionConfig config, final ExecutionResult<T, R> executionResult,
266 			final Node<T, R> processedNode) {
267 		return executionResult.isErrored() && config.isImmediatelyRetrying() && config.shouldRetry(getExecutionCount(processedNode));
268 	}
269 
270 	private void submitForImmediateRetry(final ExecutionConfig config, final Node<T, R> node) {
271 		Task<T, R> task = newTask(config, node);		
272 		this.immediatelyRetryExecutor.submit(retryingTask(config, task));
273 	}
274 
275 	private void submitForScheduledRetry(final ExecutionConfig config, final Node<T, R> node) {
276 		Task<T, R> task = newTask(config, node);
277 		this.scheduledRetryExecutor.schedule(retryingTask(config, task), config.getRetryDelay().getDuration(), config.getRetryDelay().getTimeUnit());
278 	}
279 
280 	private Task<T, R> newTask(final ExecutionConfig config, final Node<T, R> node) {
281 		Task<T, R> task = this.taskProvider.provideTask(node.getValue());
282 		task.setId(node.getValue());
283 		updateConsiderExecutionStatus(config, task);
284 		return TaskFactory.newWorker(task);
285 	}
286 
287 	private void updateConsiderExecutionStatus(final ExecutionConfig config, final Task<T, R> task) {
288 		if (config.isImmediatelyRetrying() || config.isScheduledRetrying()) {
289 			Node<T, R> node = this.state.getGraphNode(task.getId());
290 			Integer currentCount = getExecutionCount(node);
291 			if (currentCount < config.getRetryCount()) {
292 				task.setConsiderExecutionError(false);
293 			} else {
294 				task.setConsiderExecutionError(true);
295 			}
296 		}
297 	}
298 
299 	private Runnable retryingTask(final ExecutionConfig config, final Task<T, R> task) {
300 		this.state.incrementUnProcessedNodesCount();
301 		return new Runnable() {
302 			@Override
303 			public void run() {
304 				executionEngine.submit(task);
305 			}
306 		};
307 	}
308 
309 	private void updateExecutionCount(final Node<T, R> node) {
310 		Integer count = getExecutionCount(node);
311 		count++;
312 		node.setData(count);
313 	}
314 
315 	private Integer getExecutionCount(final Node<T, R> node) {
316 		Integer count = (Integer) node.getData();
317 		if (count == null) {
318 			count = 0;
319 		}
320 		return count;
321 	}
322 
323 	private void updateNode(final ExecutionResult<T, R> executionResult, final Node<T, R> processedNode) {
324 		updateExecutionCount(processedNode);
325 		processedNode.setResult(executionResult.getResult());
326 		if(executionResult.isErrored()) {
327 			processedNode.setErrored();
328 		} else {
329 			processedNode.setSuccess();
330 		}
331 	}
332 
333 	private void forceStopIfRequired() {
334 		if (!shouldContinueProcessingNodes()) {
335 			this.state.forcedStop();
336 			this.immediatelyRetryExecutor.shutdownNow();
337 			this.scheduledRetryExecutor.shutdownNow();
338 			throw new IllegalStateException("Forced to Stop the instance of Dexecutor!");
339 		}		
340 	}
341 	/**
342 	 * Override this method if force stop is required
343 	 * 
344 	 * @return {@code true} if processing should continue otherwise {@code false}
345 	 */
346 	protected boolean shouldContinueProcessingNodes() {
347 		return true;
348 	}
349 }