1 16 package org.directwebremoting.contrib; 17 18 import org.directwebremoting.extend.Call; 19 import org.directwebremoting.extend.Calls; 20 import org.directwebremoting.extend.Replies; 21 import org.directwebremoting.extend.Reply; 22 import org.directwebremoting.impl.DefaultRemoter; 23 import org.directwebremoting.util.Logger; 24 25 import edu.emory.mathcs.backport.java.util.concurrent.Callable; 26 import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException; 27 import edu.emory.mathcs.backport.java.util.concurrent.Executors; 28 import edu.emory.mathcs.backport.java.util.concurrent.Future; 29 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; 30 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; 31 import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException; 32 33 39 public class ParallelDefaultRemoter extends DefaultRemoter 40 { 41 class OneCall implements Callable 42 { 43 private Call call; 44 45 48 public OneCall(Call call) 49 { 50 this.call = call; 51 } 52 53 public Object call() 54 { 55 return execute(call); 56 } 57 } 58 59 66 public ParallelDefaultRemoter() 67 { 68 executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool(); 69 executorService.setCorePoolSize(corePoolsize); 70 executorService.setMaximumPoolSize(maximumPoolsize); 71 executorService.setKeepAliveTime(keepAliveTime, TimeUnit.MILLISECONDS); 72 73 log.info(executorService.getClass().getName().indexOf("edu.emory.mathcs.backport") > -1 ? "Backport of java.util.concurrent package used !" : "java.util.concurrent package used !"); 74 } 75 76 80 public void setParallelDefaultRemoterTimeout(long timeout) 81 { 82 this.timeout = timeout; 83 } 84 85 89 public void setParallelDefaultRemoterCorePoolsize(int corePoolsize) 90 { 91 this.corePoolsize = corePoolsize; 92 executorService.setCorePoolSize(corePoolsize); 93 } 94 95 99 public void setParallelDefaultRemoterMaximumPoolsize(int maximumPoolsize) 100 { 101 this.maximumPoolsize = maximumPoolsize; 102 executorService.setMaximumPoolSize(maximumPoolsize); 103 } 104 105 110 public void setParallelDefaultRemoterKeepAliveTime(long keepAliveTime) 111 { 112 this.keepAliveTime = keepAliveTime; 113 executorService.setKeepAliveTime(keepAliveTime, TimeUnit.MILLISECONDS); 114 } 115 116 122 public Replies execute(Calls calls) 123 { 124 Replies replies = new Replies(calls.getBatchId()); 125 Future future[] = new Future[calls.getCallCount()]; 126 127 if (calls.getCallCount() == 1) 128 { 129 return super.execute(calls); 130 } 131 else 132 { 133 for (int callNum = 0; callNum < calls.getCallCount(); callNum++) 134 { 135 Call call = calls.getCall(callNum); 136 future[callNum] = executorService.submit(new OneCall(call)); 137 } 138 for (int callNum = 0; callNum < calls.getCallCount(); callNum++) 139 { 140 try 141 { 142 Reply reply = (Reply) future[callNum].get(this.timeout, TimeUnit.MILLISECONDS); 143 replies.addReply(reply); 144 } 145 catch (InterruptedException ex) 146 { 147 log.warn("Method execution failed: ", ex); 148 replies.addReply(new Reply(calls.getCall(callNum).getCallId(), null, ex)); 149 } 150 catch (ExecutionException ex) 151 { 152 log.warn("Method execution failed: ", ex); 153 replies.addReply(new Reply(calls.getCall(callNum).getCallId(), null, ex)); 154 } 155 catch (TimeoutException ex) 156 { 157 log.warn("Method execution failed: ", ex); 158 replies.addReply(new Reply(calls.getCall(callNum).getCallId(), null, ex)); 159 } 160 } 161 return replies; 162 } 163 } 164 165 private static final Logger log = Logger.getLogger(ParallelDefaultRemoter.class); 166 167 private int corePoolsize = 10; 168 169 private int maximumPoolsize = 100; 170 171 private long keepAliveTime = 5000; 172 173 private long timeout = 10000; 174 175 private ThreadPoolExecutor executorService; 176 } 177
| Popular Tags
|