1 22 package org.jboss.mq; 23 24 import java.util.ArrayList ; 25 import java.util.LinkedList ; 26 27 import javax.jms.ConnectionConsumer ; 28 import javax.jms.Destination ; 29 import javax.jms.JMSException ; 30 import javax.jms.ServerSession ; 31 import javax.jms.ServerSessionPool ; 32 33 import org.jboss.logging.Logger; 34 35 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 36 37 44 public class SpyConnectionConsumer implements ConnectionConsumer , SpyConsumer, Runnable 45 { 46 48 49 static Logger log = Logger.getLogger(SpyConnectionConsumer.class); 50 51 52 static boolean trace = log.isTraceEnabled(); 53 54 56 57 Connection connection; 58 59 Destination destination; 60 61 javax.jms.ServerSessionPool serverSessionPool; 62 63 int maxMessages; 64 66 LinkedList queue = new LinkedList (); 67 68 boolean closed = false; 69 70 boolean waitingForMessage = false; 71 72 Subscription subscription = new Subscription(); 73 75 Thread internalThread; 76 77 int id; 78 79 static SynchronizedInt threadId = new SynchronizedInt(0); 80 81 83 85 95 public SpyConnectionConsumer(Connection connection, Destination destination, String messageSelector, 96 ServerSessionPool serverSessionPool, int maxMessages) throws JMSException 97 { 98 trace = log.isTraceEnabled(); 99 100 this.connection = connection; 101 this.destination = destination; 102 this.serverSessionPool = serverSessionPool; 103 this.maxMessages = maxMessages; 104 if (this.maxMessages < 1) 105 this.maxMessages = 1; 106 107 subscription.destination = (SpyDestination) destination; 108 subscription.messageSelector = messageSelector; 109 subscription.noLocal = false; 110 111 connection.addConsumer(this); 112 id = threadId.increment(); 113 internalThread = new Thread (this, "Connection Consumer for dest " + subscription + " id=" + id); 114 internalThread.start(); 115 116 if (trace) 117 log.trace("New " + this); 118 } 119 120 122 127 public Subscription getSubscription() 128 { 129 return subscription; 130 } 131 132 138 public void addMessage(SpyMessage mes) throws JMSException 139 { 140 synchronized (queue) 141 { 142 if (closed) 143 { 144 if (trace) 145 log.trace("Consumer close nacking message=" + mes.header.jmsMessageID + " " + this); 146 log.warn("NACK issued. The connection consumer was closed."); 147 connection.send(mes.getAcknowledgementRequest(false)); 148 return; 149 } 150 151 if (trace) 152 log.trace("Add message=" + mes.header.jmsMessageID + " " + this); 153 154 if (waitingForMessage) 155 { 156 queue.addLast(mes); 157 queue.notifyAll(); 158 } 159 else 160 { 161 if (trace) 162 log.trace("Consumer not waiting nacking message=" + mes.header.jmsMessageID + " " + this); 163 connection.send(mes.getAcknowledgementRequest(false)); 164 } 165 } 166 } 167 168 170 public ServerSessionPool getServerSessionPool() throws JMSException 171 { 172 return serverSessionPool; 173 } 174 175 public void close() throws javax.jms.JMSException 176 { 177 synchronized (queue) 178 { 179 if (closed) 180 return; 181 182 closed = true; 183 queue.notifyAll(); 184 } 185 186 if (trace) 187 log.trace("Close " + this); 188 189 if (internalThread != null && !internalThread.equals(Thread.currentThread())) 190 { 191 try 192 { 193 194 if (trace) 195 log.trace("Joining thread " + this); 196 internalThread.join(); 197 } 198 catch (InterruptedException e) 199 { 200 if (trace) 201 log.trace("Ignoring interrupting while joining thread " + this); 202 } 203 } 204 synchronized (queue) 205 { 206 if (trace) 207 log.trace("Nacking messages on queue " + this); 208 try 209 { 210 while (queue.isEmpty() == false) 211 { 212 SpyMessage message = (SpyMessage) queue.removeFirst(); 213 connection.send(message.getAcknowledgementRequest(false)); 214 } 215 } 216 catch (Throwable ignore) 217 { 218 if (trace) 219 log.trace("Ignoring error nacking messages in queue " + this, ignore); 220 } 221 try 222 { 223 connection.removeConsumer(this); 224 } 225 catch (Throwable ignore) 226 { 227 if (trace) 228 log.trace("Ignoring error removing consumer from connection " + this, ignore); 229 } 230 } 231 } 232 233 235 public void run() 236 { 237 ArrayList mesList = new ArrayList (); 238 try 239 { 240 outer : while (true) 241 { 242 synchronized (queue) 243 { 244 if (closed) 245 { 246 if (trace) 247 log.trace("run() closed " + this); 248 break outer; 249 } 250 } 251 252 for (int i = 0; i < maxMessages; i++) 253 { 254 SpyMessage mes = connection.receive(subscription, -1); 255 if (mes == null) 256 { 257 if (trace) 258 log.trace("run() receivedNoWait got no message" + this); 259 break; 260 } 261 else 262 { 263 if (trace) 264 log.trace("run() receivedNoWait message=" + mes.header.jmsMessageID + " " + this); 265 mesList.add(mes); 266 } 267 } 268 269 if (mesList.isEmpty()) 270 { 271 SpyMessage mes = null; 272 synchronized (queue) 273 { 274 mes = connection.receive(subscription, 0); 275 if (mes == null) 276 { 277 waitingForMessage = true; 278 while (queue.isEmpty() && !closed) 279 { 280 if (trace) 281 log.trace("run() waiting for message " + this); 282 try 283 { 284 queue.wait(); 285 } 286 catch (InterruptedException e) 287 { 288 if (trace) 289 log.trace("Ignoring interruption waiting for message " + this, e); 290 } 291 } 292 if (closed) 293 { 294 if (trace) 295 log.trace("run() closed while waiting " + this); 296 waitingForMessage = false; 297 break outer; 298 } 299 mes = (SpyMessage) queue.removeFirst(); 300 waitingForMessage = false; 301 if (trace) 302 log.trace("run() got message message=" + mes.header.jmsMessageID + " " + this); 303 } 304 } 305 mesList.add(mes); 306 } 307 308 if (trace) 309 log.trace("Waiting for serverSesionPool " + this); 310 ServerSession serverSession = serverSessionPool.getServerSession(); 311 SpySession spySession = (SpySession) serverSession.getSession(); 312 if (trace) 313 log.trace("Waited serverSesion=" + serverSession + " session=" + spySession + " " + this); 314 315 if (spySession.sessionConsumer == null) 316 { 317 if (trace) 318 log.trace("Session did not have a set MessageListner " + spySession + " " + this); 319 } 320 else 321 { 322 spySession.sessionConsumer.subscription = subscription; 323 } 324 325 for (int i = 0; i < mesList.size(); i++) 326 spySession.addMessage((SpyMessage) mesList.get(i)); 327 328 if (trace) 329 log.trace(" Starting the ServerSession=" + serverSession + " " + this); 330 serverSession.start(); 331 mesList.clear(); 332 } 333 } 334 catch (Throwable t) 335 { 336 log.warn("Connection consumer closing due to error in listening thread " + this, t); 337 try 338 { 339 for (int i = 0; i < mesList.size(); i++) 340 { 341 SpyMessage msg = (SpyMessage) mesList.get(i); 342 connection.send(msg.getAcknowledgementRequest(false)); 343 } 344 } 345 catch (Throwable ignore) 346 { 347 if (trace) 348 log.trace("Ignoring error nacking message " + this, ignore); 349 } 350 try 351 { 352 close(); 353 } 354 catch (Throwable ignore) 355 { 356 if (trace) 357 log.trace("Ignoring error during close " + this, ignore); 358 } 359 } 360 } 361 362 364 public String toString() 365 { 366 StringBuffer buffer = new StringBuffer (100); 367 buffer.append("SpyConnectionConsumer[sub=").append(subscription); 368 if (closed) 369 buffer.append(" CLOSED"); 370 buffer.append(" messages=").append(queue.size()); 371 buffer.append(" waitingForMessage=").append(waitingForMessage); 372 if (internalThread != null) 373 buffer.append(" internalThread=").append(internalThread); 374 buffer.append(" sessionPool=").append(serverSessionPool); 375 buffer.append(" connection=").append(connection); 376 buffer.append(']'); 377 return buffer.toString(); 378 } 379 380 382 384 386 } | Popular Tags |