1 21 22 package org.jacorb.notification.engine; 23 24 import java.util.ArrayList ; 25 import java.util.Iterator ; 26 import java.util.List ; 27 28 import org.jacorb.notification.interfaces.Disposable; 29 30 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 31 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 32 33 public class DefaultPushTaskExecutor implements PushTaskExecutor, Disposable 34 { 35 private static int noOfExecutors_ = 0; 36 private final int executorNr_ = noOfExecutors_++; 37 38 final LinkedQueue scheduledPushTasks_ = new LinkedQueue(); 39 40 final SynchronizedBoolean isActive_ = new SynchronizedBoolean(true); 41 42 final List workers_ = new ArrayList (); 43 44 private class Worker extends Thread 45 { 46 public Worker(String name) 47 { 48 super(name); 49 } 50 51 public void run() 52 { 53 while (isActive_.get()) 54 { 55 try 56 { 57 PushTaskExecutor.PushTask pushTask = (PushTaskExecutor.PushTask) scheduledPushTasks_ 58 .take(); 59 60 if (isActive_.get()) 61 { 62 pushTask.doPush(); 63 } 64 } catch (InterruptedException e) 65 { 66 } 68 } 69 } 70 } 71 72 public DefaultPushTaskExecutor(int numberOfWorkers) 73 { 74 if (numberOfWorkers < 1) 75 { 76 throw new IllegalArgumentException ("At least 1 Worker"); 77 } 78 79 createWorkers(numberOfWorkers); 80 81 startWorkers(); 82 } 83 84 public void executePush(PushTaskExecutor.PushTask pushTask) 85 { 86 if (isActive_.get()) 87 { 88 try 89 { 90 scheduledPushTasks_.put(pushTask); 91 } catch (InterruptedException e) 92 { 93 } 95 } 96 } 97 98 public void dispose() 99 { 100 isActive_.set(false); 101 102 try 103 { 104 while (!scheduledPushTasks_.isEmpty()) 105 { 106 PushTaskExecutor.PushTask pushTask = (PushTask) scheduledPushTasks_.take(); 107 pushTask.cancel(); 108 } 109 110 } catch (InterruptedException e) 111 { 112 } 114 115 disposeWorkers(); 116 } 117 118 private void createWorkers(int numberOfWorkers) 119 { 120 for (int x = 0; x < numberOfWorkers; ++x) 121 { 122 Worker worker = new Worker("PushTaskExecutor#" + executorNr_ + "-" + x); 123 workers_.add(worker); 124 } 125 } 126 127 private void startWorkers() 128 { 129 Iterator i = workers_.iterator(); 130 131 while (i.hasNext()) 132 { 133 ((Thread ) i.next()).start(); 134 } 135 } 136 137 private void disposeWorkers() 138 { 139 Iterator i = workers_.iterator(); 140 141 while (i.hasNext()) 142 { 143 Thread thread = (Thread ) i.next(); 144 145 thread.interrupt(); 146 } 147 148 workers_.clear(); 149 } 150 } 151 | Popular Tags |