View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package com.github.dexecutor.executor;
19  
20  import java.io.Writer;
21  import java.util.Collection;
22  import java.util.Date;
23  import java.util.Set;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.CompletionService;
26  import java.util.concurrent.CopyOnWriteArrayList;
27  import java.util.concurrent.ExecutorCompletionService;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Future;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  import com.github.dexecutor.executor.TaskProvider.Task;
36  import com.github.dexecutor.executor.graph.DefaultGraph;
37  import com.github.dexecutor.executor.graph.Graph;
38  import com.github.dexecutor.executor.graph.Graph.Node;
39  import com.github.dexecutor.executor.graph.Traversar;
40  import com.github.dexecutor.executor.graph.Validator;
41  
42  /**
43   * Default implementation of @DependentTasksExecutor
44   * 
45   * @author Nadeem Mohammad
46   * 
47   * @since 0.0.1
48   *
49   * @param <T> Type of Node/Task ID
50   * @param <R> Type of Node/Task result
51   */
52  public final class DefaultDependentTasksExecutor <T extends Comparable<T>, R> implements DependentTasksExecutor<T> {
53  
54  	private static final Logger logger = LoggerFactory.getLogger(DefaultDependentTasksExecutor.class);
55  
56  	private ExecutorService executorService;
57  	private TaskProvider<T, R> taskProvider;
58  	private Validator<T, R> validator;
59  	private Traversar<T, R> traversar;
60  	private Graph<T, R> graph;
61  
62  	private Collection<Node<T, R>> processedNodes = new CopyOnWriteArrayList<Node<T, R>>();
63  	private AtomicInteger nodesCount = new AtomicInteger(0);
64  	/**
65  	 * Creates the Executor with bare minimum required params
66  	 * @param executorService
67  	 * @param taskProvider
68  	 */
69  	public DefaultDependentTasksExecutor(final ExecutorService executorService, final TaskProvider<T, R> taskProvider) {
70  		this(new DependentTasksExecutorConfig<T, R>(executorService, taskProvider));
71  	}
72  	/**
73  	 * Creates the Executor with Config
74  	 * @param config
75  	 */
76  	public DefaultDependentTasksExecutor(final DependentTasksExecutorConfig<T, R> config) {
77  		config.validate();
78  		this.executorService = config.getExecutorService();
79  		this.taskProvider = config.getTaskProvider();
80  		this.validator = config.getValidator();
81  		this.traversar = config.getTraversar();
82  		this.graph = new DefaultGraph<T, R>();
83  	}
84  
85  	public void print(final Writer writer) {
86  		this.traversar.traverse(this.graph, writer);
87  	}
88  
89  	public void addIndependent(final T nodeValue) {
90  		this.graph.addIndependent(nodeValue);
91  	}
92  
93  	public void addDependency(final T evalFirstNode, final T evalLaterNode) {
94  		this.graph.addDependency(evalFirstNode, evalLaterNode);
95  	}
96  
97  	public void addAsDependentOnAllLeafNodes(final T nodeValue) {
98  		if (this.graph.size() == 0) {
99  			addIndependent(nodeValue);
100 		} else {
101 			for (Node<T, R> node : this.graph.getLeafNodes()) {
102 				addDependency(node.getValue(), nodeValue);
103 			}
104 		}
105 	}
106 
107 	@Override
108 	public void addAsDependencyToAllInitialNodes(final T nodeValue) {
109 		if (this.graph.size() == 0) {
110 			addIndependent(nodeValue);
111 		} else {
112 			for (Node<T, R> node : this.graph.getInitialNodes()) {
113 				addDependency(nodeValue, node.getValue());
114 			}
115 		}		
116 	}
117 
118 	private boolean isAlreadyProcessed(final Node<T, R> node) {
119 		return this.processedNodes.contains(node);
120 	}
121 
122 	private boolean areAlreadyProcessed(final Set<Node<T, R>> nodes) {
123         return this.processedNodes.containsAll(nodes);
124     }
125 
126 	public void execute(final ExecutionBehavior behavior) {
127 		validate();
128 
129 		Set<Node<T, R>> initialNodes = this.graph.getInitialNodes();
130 		CompletionService<Node<T, R>> completionService = new ExecutorCompletionService<Node<T, R>>(executorService);
131 
132 		long start = new Date().getTime();
133 		
134 		doProcessNodes(behavior, initialNodes, completionService);
135 
136 		long end = new Date().getTime();
137 
138 		logger.debug("Total Time taken to process {} jobs is {} ms.", graph.size(), end - start);
139 		logger.debug("Processed Nodes Ordering {}", this.processedNodes);
140 	}
141 
142 	private void doProcessNodes(final ExecutionBehavior behavior, final Set<Node<T, R>> nodes, final CompletionService<Node<T, R>> completionService) {
143 		doExecute(nodes, completionService, behavior);
144 		doWaitForExecution(completionService, behavior);	
145 	}
146 
147 	private void validate() {
148 		this.validator.validate(this.graph);
149 	}
150 
151 	private void doExecute(final Collection<Node<T, R>> nodes, final CompletionService<Node<T, R>> completionService, final ExecutionBehavior behavior) {
152 		for (Node<T, R> node : nodes) {
153 			if (shouldProcess(node) ) {
154 				nodesCount.incrementAndGet();
155 				logger.debug("Going to schedule {} node", node.getValue());
156 				completionService.submit(newWorker(node, behavior));
157 				
158 			} else {
159 				logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
160 			}
161 		}		
162 	}
163 
164 	private boolean shouldProcess(final Node<T, R> node) {
165 		return !this.executorService.isShutdown() && !isAlreadyProcessed(node) && allIncomingNodesProcessed(node);
166 	}
167 
168 	private boolean allIncomingNodesProcessed(final Node<T, R> node) {
169 		if (node.getInComingNodes().isEmpty() || areAlreadyProcessed(node.getInComingNodes())) {
170 			return true;
171 		}
172 		return false;
173 	}
174 
175 	private void doWaitForExecution(final CompletionService<Node<T, R>> completionService, final ExecutionBehavior behavior) {
176 		int cuurentCount = 0;
177 		while (cuurentCount != nodesCount.get()) {
178 			try {
179 				Future<Node<T, R>> future = completionService.take();
180 				Node<T, R> processedNode = future.get();
181 				logger.debug("Processing of node {} done", processedNode.getValue());
182 				cuurentCount++;
183 				this.processedNodes.add(processedNode);
184 				//logger.debug(this.executorService.toString());
185 				doExecute(processedNode.getOutGoingNodes(), completionService, behavior);
186 			} catch (Exception e) {
187 				cuurentCount++;
188 				logger.error("Task interrupted", e);
189 			}
190 		}
191 	}
192 
193 	private Callable<Node<T, R>> newWorker(final Node<T, R> graphNode, final ExecutionBehavior behavior) {
194 		if (ExecutionBehavior.NON_TERMINATING.equals(behavior)) {
195 			return new NonTerminatingTask(graphNode);
196 		} else if (ExecutionBehavior.RETRY_ONCE_TERMINATING.equals(behavior)) { 
197 			return new RetryOnceAndTerminateTask(graphNode);
198 		} else {
199 			return new TerminatingTask(graphNode);
200 		}
201 	}
202 
203 	private class TerminatingTask implements Callable<Node<T, R>> {
204 		private Node<T, R> node;
205 
206 		public TerminatingTask(final Node<T, R> graphNode) {
207 			this.node = graphNode;
208 		}
209 
210 		public Node<T, R> call() throws Exception {
211 			Task<T, R> task = newExecutorTask(this.node);
212 			task.setConsiderExecutionError(true);
213 			R result = task.execute();
214 			this.node.setResult(result);
215 			return this.node;
216 		}		
217 	}
218 
219 	private class NonTerminatingTask implements Callable<Node<T, R>> {
220 		private Node<T, R> node;
221 
222 		public NonTerminatingTask(final Node<T, R> graphNode) {
223 			this.node = graphNode;
224 		}
225 
226 		public Node<T, R> call() throws Exception {
227 			try {
228 				Task<T, R> task = newExecutorTask(this.node);
229 				task.setConsiderExecutionError(false);
230 				task.execute();
231 			} catch(Exception ex) {
232 				logger.error("Exception caught, executing node # " + this.node.getValue(), ex);
233 				this.node.setErrored();
234 			}
235 			return this.node;
236 		}
237 	}
238 
239 	private class RetryOnceAndTerminateTask implements Callable<Node<T, R>> {
240 		private Node<T, R> node;
241 
242 		public RetryOnceAndTerminateTask(final Node<T, R> graphNode) {
243 			this.node = graphNode;
244 		}
245 
246 		public Node<T, R> call() throws Exception {
247 			Task<T, R> task = newExecutorTask(this.node);
248 			boolean retry = shouldRetry(this.node.getValue());
249 			task.setConsiderExecutionError(!retry);
250 			try {
251 				task.execute();
252 			} catch(Exception ex) {
253 				logger.error("Exception caught, executing node # " + this.node.getValue() + " Retry would happen : " + getYesNo(retry), ex);
254 				if (retry) {
255 					task.setConsiderExecutionError(true);
256 					task.execute();
257 				}
258 			}
259 			return this.node;
260 		}
261 
262 		private String getYesNo(boolean retry) {
263 			return retry ? "Yes" : "No";
264 		}	
265 	}
266 
267 	protected boolean shouldRetry(final T node) {
268 		return true;
269 	}
270 
271 	private Task<T, R> newExecutorTask(final Node<T, R> node) {
272 		return new ExecutorTask(node, this.taskProvider.provid(node.getValue()));
273 	}
274 
275 	private class ExecutorTask extends Task<T, R> {
276 		private final Task<T, R> task;
277 		private final Node<T, R> node;
278 		private int retryCount = 0;
279 
280 		public ExecutorTask(final Node<T, R> node, final Task<T, R> task) {
281 			this.task = task;
282 			this.node = node;
283 		}
284 
285 		public R execute() {
286 			R result = null;
287 			if (shouldExecute(parentResults())) {
288 				logger.debug("{} Node # {}", msg(this.retryCount), this.taskId());
289 				this.retryCount ++;
290 				result = this.task.execute();
291 				this.node.setSuccess();
292 				this.node.setResult(result);
293 				logger.debug("Node # {}, Execution Done!", this.taskId());
294 			} else {
295 				logger.debug("Execution Skipped for node # {} ", this.taskId());
296 				this.node.setSkipped();
297 			}			
298 			return result;
299 		}
300 
301 		@Override
302 		public boolean shouldExecute(final ExecutionResults<T, R> parentResults) {
303 			return task.shouldExecute(parentResults);
304 		}
305 		
306 		private ExecutionResults<T, R> parentResults() {
307 			ExecutionResults<T, R> result = new ExecutionResults<T, R>();
308 			for (Node<T, R> in : this.node.getInComingNodes()) {
309 				result.add(new ExecutionResult<T, R>(in.getValue(), in.getResult(), status(in)));
310 			}
311 			return result;
312 		}
313 
314 		private ExecutionStatus status(final Node<T, R> incomingNode) {
315 			if (incomingNode.isSuccess()) {
316 				return ExecutionStatus.SUCCESS;
317 			} else if (incomingNode.isErrored()) {
318 				return ExecutionStatus.ERRORED;
319 			}
320 			return ExecutionStatus.SKIPPED;
321 		}
322 
323 		private T taskId() {
324 			return this.node.getValue();
325 		}
326 
327 		private String msg(int retryCount) {
328 			return retryCount > 0 ? "Retrying(" + retryCount+ ") " : "Executing";
329 		}
330 
331 		@Override
332 		void setConsiderExecutionError(boolean considerExecutionError) {
333 			this.task.setConsiderExecutionError(considerExecutionError);
334 		}
335 	}
336 }