View Javadoc
1   package com.github.dexecutor.executor;
2   
3   import java.io.Writer;
4   import java.util.Collection;
5   import java.util.Date;
6   import java.util.Set;
7   import java.util.concurrent.Callable;
8   import java.util.concurrent.CompletionService;
9   import java.util.concurrent.CopyOnWriteArrayList;
10  import java.util.concurrent.ExecutorCompletionService;
11  import java.util.concurrent.ExecutorService;
12  import java.util.concurrent.Future;
13  import java.util.concurrent.atomic.AtomicInteger;
14  
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  
18  import com.github.dexecutor.executor.TaskProvider.Task;
19  import com.github.dexecutor.executor.graph.DefaultGraph;
20  import com.github.dexecutor.executor.graph.Graph;
21  import com.github.dexecutor.executor.graph.Graph.Node;
22  import com.github.dexecutor.executor.graph.Traversar;
23  import com.github.dexecutor.executor.graph.Validator;
24  
25  /**
26   * Default implementation of @DependentTasksExecutor
27   * 
28   * @author Nadeem Mohammad
29   *
30   * @param <T>
31   */
32  public final class DefaultDependentTasksExecutor <T extends Comparable<T>> implements DependentTasksExecutor<T> {
33  
34  	private static final Logger logger = LoggerFactory.getLogger(DefaultDependentTasksExecutor.class);
35  
36  	private ExecutorService executorService;
37  	private TaskProvider<T> taskProvider;
38  	private Validator<T> validator;
39  	private Traversar<T> traversar;
40  	private Graph<T> graph;
41  
42  	private Collection<Node<T>> processedNodes = new CopyOnWriteArrayList<Node<T>>();
43  	private AtomicInteger nodesCount = new AtomicInteger(0);
44  	/**
45  	 * Creates the Executor with bare minimum required params
46  	 * @param executorService
47  	 * @param taskProvider
48  	 */
49  	public DefaultDependentTasksExecutor(final ExecutorService executorService, final TaskProvider<T> taskProvider) {
50  		this(new DependentTasksExecutorConfig<T>(executorService, taskProvider));
51  	}
52  	/**
53  	 * Creates the Executor with Config
54  	 * @param config
55  	 */
56  	public DefaultDependentTasksExecutor(final DependentTasksExecutorConfig<T> config) {
57  		config.validate();
58  		this.executorService = config.getExecutorService();
59  		this.taskProvider = config.getTaskProvider();
60  		this.validator = config.getValidator();
61  		this.traversar = config.getTraversar();
62  		this.graph = new DefaultGraph<T>();
63  	}
64  
65  	public void print(final Writer writer) {
66  		this.traversar.traverse(this.graph, writer);
67  	}
68  
69  	public void addIndependent(final T nodeValue) {
70  		this.graph.addIndependent(nodeValue);
71  	}
72  
73  	public void addDependency(final T evalFirstNode, final T evalLaterNode) {
74  		this.graph.addDependency(evalFirstNode, evalLaterNode);
75  	}
76  
77  	public void addAsDependentOnAllLeafNodes(final T nodeValue) {
78  		if (this.graph.size() == 0) {
79  			addIndependent(nodeValue);
80  		} else {
81  			for (Node<T> node : this.graph.getLeafNodes()) {
82  				addDependency(node.getValue(), nodeValue);
83  			}
84  		}
85  	}
86  
87  	@Override
88  	public void addAsDependencyToAllInitialNodes(final T nodeValue) {
89  		if (this.graph.size() == 0) {
90  			addIndependent(nodeValue);
91  		} else {
92  			for (Node<T> node : this.graph.getInitialNodes()) {
93  				addDependency(nodeValue, node.getValue());
94  			}
95  		}		
96  	}
97  
98  	private boolean isAlreadyProcessed(final Node<T> node) {
99  		return this.processedNodes.contains(node);
100 	}
101 
102 	private boolean areAlreadyProcessed(final Set<Node<T>> nodes) {
103         return this.processedNodes.containsAll(nodes);
104     }
105 
106 	public void execute(final ExecutionBehavior behavior) {
107 		validate();
108 
109 		Set<Node<T>> initialNodes = this.graph.getInitialNodes();
110 		CompletionService<Node<T>> completionService = new ExecutorCompletionService<Node<T>>(executorService);
111 
112 		long start = new Date().getTime();
113 		
114 		doProcessNodes(behavior, initialNodes, completionService);
115 
116 		long end = new Date().getTime();
117 
118 		logger.debug("Total Time taken to process {} jobs is {} ms.", graph.size(), end - start);
119 		logger.debug("Processed Nodes Ordering {}", this.processedNodes);
120 	}
121 
122 	private void doProcessNodes(final ExecutionBehavior behavior, final Set<Node<T>> nodes, final CompletionService<Node<T>> completionService) {
123 		doExecute(nodes, completionService, behavior);
124 		doWaitForExecution(completionService, behavior);	
125 	}
126 
127 	private void validate() {
128 		this.validator.validate(this.graph);
129 	}
130 
131 	private void doExecute(final Collection<Node<T>> nodes, final CompletionService<Node<T>> completionService, final ExecutionBehavior behavior) {
132 		for (Node<T> node : nodes) {
133 			if (shouldProcess(node) ) {
134 				nodesCount.incrementAndGet();
135 				logger.debug("Going to schedule {} node", node.getValue());
136 				completionService.submit(newWorker(node, behavior));
137 			} else {
138 				logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
139 			}
140 		}		
141 	}
142 
143 	private boolean shouldProcess(final Node<T> node) {
144 		return !this.executorService.isShutdown() && !isAlreadyProcessed(node) && allIncomingNodesProcessed(node);
145 	}
146 
147 	private boolean allIncomingNodesProcessed(final Node<T> node) {
148 		if (node.getInComingNodes().isEmpty() || areAlreadyProcessed(node.getInComingNodes())) {
149 			return true;
150 		}
151 		return false;
152 	}
153 
154 	private void doWaitForExecution(final CompletionService<Node<T>> completionService, final ExecutionBehavior behavior) {
155 		int cuurentCount = 0;
156 		while (cuurentCount != nodesCount.get()) {
157 			try {
158 				Future<Node<T>> future = completionService.take();
159 				Node<T> processedNode = future.get();
160 				logger.debug("Processing of node {} done", processedNode.getValue());
161 				cuurentCount++;
162 				this.processedNodes.add(processedNode);				
163 				//logger.debug(this.executorService.toString());
164 				doExecute(processedNode.getOutGoingNodes(), completionService, behavior);
165 			} catch (Exception e) {
166 				cuurentCount++;
167 				logger.error("Task interrupted", e);
168 			}
169 		}
170 	}
171 
172 	private Callable<Node<T>> newWorker(final Node<T> graphNode, final ExecutionBehavior behavior) {
173 		if (ExecutionBehavior.NON_TERMINATING.equals(behavior)) {
174 			return new NonTerminatingTask(graphNode);
175 		} else if (ExecutionBehavior.RETRY_ONCE_TERMINATING.equals(behavior)) { 
176 			return new RetryOnceAndTerminateTask(graphNode);
177 		} else {
178 			return new TerminatingTask(graphNode);
179 		}
180 	}
181 
182 	private class TerminatingTask implements Callable<Node<T>> {
183 		private Node<T> node;
184 
185 		public TerminatingTask(final Node<T> graphNode) {
186 			this.node = graphNode;
187 		}
188 
189 		public Node<T> call() throws Exception {
190 			Task task = newLoggingTask(this.node.getValue());
191 			task.setConsiderExecutionError(true);
192 			task.execute();
193 			return this.node;
194 		}		
195 	}
196 
197 	private class NonTerminatingTask implements Callable<Node<T>> {
198 		private Node<T> node;
199 
200 		public NonTerminatingTask(final Node<T> graphNode) {
201 			this.node = graphNode;
202 		}
203 
204 		public Node<T> call() throws Exception {
205 			try {
206 				Task task = newLoggingTask(this.node.getValue());
207 				task.setConsiderExecutionError(false);
208 				task.execute();
209 			} catch(Exception ex) {
210 				logger.error("Exception caught, executing node # " + this.node.getValue(), ex);
211 			}
212 			return this.node;
213 		}
214 	}
215 
216 	private class RetryOnceAndTerminateTask implements Callable<Node<T>> {
217 		private Node<T> node;
218 
219 		public RetryOnceAndTerminateTask(final Node<T> graphNode) {
220 			this.node = graphNode;
221 		}
222 
223 		public Node<T> call() throws Exception {
224 			Task task = newLoggingTask(this.node.getValue());
225 			boolean retry = shouldRetry(this.node.getValue());
226 			task.setConsiderExecutionError(!retry);
227 			try {
228 				task.execute();
229 			} catch(Exception ex) {
230 				logger.error("Exception caught, executing node # " + this.node.getValue() + " Retry would happen : " + getYesNo(retry), ex);
231 				if (retry) {
232 					task.setConsiderExecutionError(true);
233 					task.execute();
234 				}
235 			}
236 			return this.node;
237 		}
238 
239 		private String getYesNo(boolean retry) {
240 			return retry ? "Yes" : "No";
241 		}	
242 	}
243 
244 	protected boolean shouldRetry(final T node) {
245 		return true;
246 	}
247 
248 	private Task newLoggingTask(final T taskId) {
249 		return new LoggingTask(taskId, this.taskProvider.provid(taskId));
250 	}
251 
252 	private class LoggingTask extends Task {
253 		private final Task task;
254 		private final T taskId;
255 		private int retryCount = 0;
256 		
257 		public LoggingTask(final T taskId, final Task task) {
258 			this.task = task;
259 			this.taskId = taskId;
260 		}
261 
262 		public void execute() {
263 			logger.debug("{} Node # {}", msg(this.retryCount), this.taskId);
264 			this.retryCount ++;
265 			this.task.execute();
266 			logger.debug("Node # {}, Execution Done!", this.taskId);
267 		}
268 
269 		private String msg(int retryCount) {
270 			return retryCount > 0 ? "Retrying(" + retryCount+ ") " : "Executing";
271 		}
272 
273 		@Override
274 		void setConsiderExecutionError(boolean considerExecutionError) {
275 			this.task.setConsiderExecutionError(considerExecutionError);
276 		}
277 	}
278 }