1 21 package com.presumo.jms.router; 22 23 import com.presumo.jms.message.AckHelper; 24 import com.presumo.jms.message.JmsMessage; 25 import com.presumo.jms.plugin.MessageQueue; 26 27 import java.io.IOException ; 28 29 import com.presumo.jms.resources.Resources; 30 import com.presumo.util.log.Logger; 31 import com.presumo.util.log.LoggerFactory; 32 33 41 public abstract class RouterAdapter implements Runnable 42 { 43 44 private volatile boolean closed; 48 private volatile boolean stopRouting = true; 49 private int batchSize; 50 private final Object startStopLock = new String ("startStopLock"); 51 private final Object routerLock = new String ("routerLock"); 52 private Thread routingThread; 53 private final String threadName; 54 private MessageQueue inbox; 55 56 60 public RouterAdapter(MessageQueue queue, int batchSize, String threadName) 61 { 62 super(); 63 this.batchSize = batchSize; 64 this.inbox = queue; 65 this.threadName = threadName; 66 } 67 68 72 78 public void run() 79 { 80 logger.entry(threadName + ": run"); 81 synchronized (startStopLock) { 82 83 for(;;) { 84 85 if (stopRouting == true) 86 break; 87 88 routeMessages(batchSize); 89 90 synchronized (inbox) { 91 while (stopRouting == false && inbox.size() == 0) { 92 try { inbox.wait(3000); } catch (InterruptedException ie) {} 93 if (stopRouting == false && inbox.size() == 0) 94 timerTick(); 95 } 96 } 97 } 98 } 99 100 logger.exit(threadName + ": run"); 101 } 102 103 104 108 public void setMessageQueue(MessageQueue queue) 109 { 110 logger.entry("setMessageQueue", queue); 111 112 synchronized(routerLock) { 113 if (stopRouting == true) throw new 114 IllegalStateException ("Attempt to change message queue on running router"); 115 116 synchronized (inbox) { 117 inbox = queue; 118 } 119 } 120 logger.exit("setMessageQueue"); 121 } 122 123 126 public void closeRouter() 127 { 128 logger.entry(threadName + ": closeRouter"); 129 130 synchronized (routerLock) { 131 if (closed) return; 132 stopRouter(); 133 closed = true; 134 } 135 136 logger.exit(threadName + ": closeRouter"); 137 } 138 139 140 143 public void startRouter() 144 { 145 logger.entry(threadName + ": startRouter"); 146 147 synchronized (routerLock) { 148 149 if (closed) 150 throw new IllegalStateException ("start called on a closed router"); 151 if (stopRouting == false) return; 152 153 stopRouting = false; 154 routingThread = new Thread (this, threadName); 155 routingThread.start(); 156 157 } 158 159 logger.exit(threadName + ": startRouter"); 160 } 161 162 165 public void stopRouter() 166 { 167 logger.entry(threadName + ": stopRouter"); 168 169 synchronized (routerLock) { 170 171 if (closed) throw new IllegalStateException ("stop() called on a closed connection"); 172 if (stopRouting == true) return; 173 174 synchronized(inbox) { 176 stopRouting = true; 177 inbox.notifyAll(); 178 } 179 180 try { 181 routingThread.join(); 182 routingThread = null; 183 } catch (InterruptedException ie) { 184 ie.printStackTrace(); 185 } 186 } 187 logger.exit(threadName + ": stopRouter"); 188 } 189 190 194 public void setBatchSize(int batchSize) 195 { 196 this.batchSize = batchSize; 197 } 198 199 203 213 protected abstract void routeMessages(int number); 214 215 218 protected void timerTick() 219 { 220 } 221 222 225 protected final JmsMessage [] getNext(int batchsize) throws IOException 226 { 227 JmsMessage [] msgs = null; 228 synchronized (inbox) { 229 msgs = inbox.getNext(batchsize); 230 } 231 return msgs; 232 } 233 234 protected final int queueSize() 235 { 236 synchronized (inbox) { 237 return inbox.size(); 238 } 239 } 240 241 244 protected final void queueMessage(JmsMessage msg) throws IOException 245 { 246 if (inbox.isPersistent()) { 247 AckHelper ah = msg.getAckHelper(); 248 if (ah != null) { 249 ah.setMessageQueue(inbox); 250 } 251 } 252 253 synchronized (inbox) { 254 inbox.push(msg); 255 inbox.notifyAll(); 256 } 257 258 } 259 260 271 272 275 protected final void queueMessages(JmsMessage [] msgs) throws IOException 276 { 277 if (inbox.isPersistent()) { 278 for (int i=0; i < msgs.length; ++i) { 279 AckHelper ah = msgs[i].getAckHelper(); 280 if (ah != null) { 281 ah.setMessageQueue(inbox); 282 } 283 } 284 } 285 286 synchronized (inbox) { 287 inbox.push(msgs); 288 inbox.notifyAll(); 289 } 290 291 } 292 293 private static Logger logger = 295 LoggerFactory.getLogger(RouterAdapter.class, Resources.getBundle()); 296 } 298 299 300 301 302 303 304 305 306 307 308 309 310 | Popular Tags |