1 24 package org.objectweb.joram.client.jms; 25 26 import org.objectweb.joram.shared.client.*; 27 import org.objectweb.joram.client.jms.connection.ReplyListener; 28 import org.objectweb.joram.client.jms.connection.RequestMultiplexer; 29 import org.objectweb.joram.client.jms.connection.Requestor; 30 31 import java.util.Vector ; 32 33 import javax.jms.IllegalStateException ; 34 import javax.jms.InvalidSelectorException ; 35 import javax.jms.JMSException ; 36 37 import org.objectweb.util.monolog.api.BasicLevel; 38 39 42 public class ConnectionConsumer implements javax.jms.ConnectionConsumer { 43 44 private static class Status { 45 public static final int OPEN = 0; 46 public static final int CLOSE = 1; 47 48 private static final String [] names = { 49 "OPEN", "CLOSE"}; 50 51 public static String toString(int status) { 52 return names[status]; 53 } 54 } 55 56 57 private Connection cnx; 58 59 60 private boolean durable = false; 61 62 63 private String selector; 64 65 66 private javax.jms.ServerSessionPool sessionPool; 67 68 69 private int maxMessages; 70 71 75 private CCDaemon ccDaemon; 76 77 78 private String targetName; 79 80 81 private boolean queueMode; 82 83 87 fr.dyade.aaa.util.Queue repliesIn; 88 89 private RequestMultiplexer mtpx; 90 91 private Requestor requestor; 92 93 private int requestId; 94 95 private int status; 96 97 115 ConnectionConsumer(Connection cnx, 116 Destination dest, 117 String subName, 118 String selector, 119 javax.jms.ServerSessionPool sessionPool, 120 int maxMessages, 121 RequestMultiplexer mtpx) throws JMSException { 122 try { 123 org.objectweb.joram.shared.selectors.Selector.checks(selector); 124 } 125 catch (org.objectweb.joram.shared.excepts.SelectorException sE) { 126 throw new InvalidSelectorException ("Invalid selector syntax: " + sE); 127 } 128 129 if (sessionPool == null) 130 throw new JMSException ("Invalid ServerSessionPool parameter: " 131 + sessionPool); 132 if (maxMessages <= 0) 133 throw new JMSException ("Invalid maxMessages parameter: " + maxMessages); 134 135 this.cnx = cnx; 136 this.selector = selector; 137 this.sessionPool = sessionPool; 138 this.maxMessages = maxMessages; 139 140 this.mtpx = mtpx; 141 this.requestor = new Requestor(mtpx); 142 143 setStatus(Status.OPEN); 144 145 if (dest instanceof Queue) { 146 queueMode = true; 147 targetName = dest.getName(); 148 } else if (subName == null) { 149 queueMode = false; 150 targetName = cnx.nextSubName(); 151 } else { 152 queueMode = false; 153 targetName = subName; 154 durable = true; 155 } 156 157 repliesIn = new fr.dyade.aaa.util.Queue(); 158 159 ccDaemon = new CCDaemon(toString()); 160 ccDaemon.setDaemon(true); 161 ccDaemon.start(); 162 163 if (! queueMode) { 165 requestor.request( 166 new ConsumerSubRequest( 167 dest.getName(), targetName, 168 selector, false, durable)); 169 } 170 171 subscribe(); 173 } 174 175 192 ConnectionConsumer(Connection cnx, 193 Destination dest, 194 String selector, 195 javax.jms.ServerSessionPool sessionPool, 196 int maxMessages, 197 RequestMultiplexer mtpx) 198 throws JMSException { 199 this(cnx, dest, null, selector, 200 sessionPool, maxMessages, mtpx); 201 } 202 203 public final String getTargetName() { 204 return targetName; 205 } 206 207 public final boolean getQueueMode() { 208 return queueMode; 209 } 210 211 212 public String toString() 213 { 214 return "ConnCons:" + cnx.toString(); 215 } 216 217 private void setStatus(int status) { 218 this.status = status; 219 } 220 221 private synchronized void checkClosed() 222 throws IllegalStateException { 223 if (status == Status.CLOSE) 224 throw new IllegalStateException ("Forbidden call on a closed session."); 225 } 226 227 232 public javax.jms.ServerSessionPool getServerSessionPool() throws JMSException { 233 checkClosed(); 234 return sessionPool; 235 } 236 237 242 public synchronized void close() throws JMSException { 243 if (status == Status.CLOSE) return; 244 245 mtpx.abortRequest(requestId); 246 247 ccDaemon.stop(); 248 249 if (! queueMode) { 251 if (durable) { 252 requestor.request(new ConsumerCloseSubRequest(targetName)); 253 } else { 254 requestor.request(new ConsumerUnsubRequest(targetName)); 255 } 256 } 257 258 cnx.closeConnectionConsumer(this); 259 260 setStatus(Status.CLOSE); 261 } 262 263 private void subscribe() throws JMSException { 264 ConsumerSetListRequest req = 265 new ConsumerSetListRequest( 266 targetName, selector, queueMode, null, 1); 267 mtpx.sendRequest(req, new ReplyListener() { 268 public boolean replyReceived(AbstractJmsReply reply) { 269 repliesIn.push(reply); 270 return queueMode; 271 } 272 273 public void replyAborted(int requestId) {} 274 }); 275 requestId = req.getRequestId(); 276 } 277 278 282 class CCDaemon extends fr.dyade.aaa.util.Daemon { 283 284 288 CCDaemon(String name){ 289 super(name); 290 } 291 292 293 public void run() { 294 Vector deliveries = new Vector (); 295 try { 296 while (running) { 297 canStop = true; 298 try { 299 repliesIn.get(); 300 } catch (Exception iE) { 301 continue; 302 } 303 canStop = false; 304 305 try { 307 javax.jms.ServerSession serverSess = sessionPool.getServerSession(); 308 309 Session sess; 310 Object obj = serverSess.getSession(); 311 if (obj instanceof Session) { 312 sess = (Session)obj; 313 } else if (obj instanceof XASession) { 314 sess = ((XASession)obj).sess; 315 } else throw new Error ("Unexpected session type: " + obj); 316 317 sess.setConnectionConsumer(ConnectionConsumer.this); 318 int counter = 1; 319 320 while (counter <= maxMessages && repliesIn.size() > 0) { 323 324 if (queueMode) { 326 subscribe(); 327 } 328 329 ConsumerMessages reply = (ConsumerMessages) repliesIn.pop(); 330 Vector msgs = reply.getMessages(); 331 for (int i = 0; i < msgs.size(); i++) { 332 deliveries.add( 333 (org.objectweb.joram.shared.messages.Message) msgs.get(i)); 334 } 335 336 while (! deliveries.isEmpty()) { 337 while (counter <= maxMessages && ! deliveries.isEmpty()) { 338 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 339 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "Passes a" 340 + " message to a session."); 341 sess.onMessage( 342 (org.objectweb.joram.shared.messages.Message) deliveries.remove(0)); 343 counter++; 344 } 345 346 if (counter > maxMessages) { 347 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 348 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "Starts the" 349 + " session."); 350 serverSess.start(); 351 counter = 1; 352 353 if (! deliveries.isEmpty() || repliesIn.size() > 0) { 354 serverSess = sessionPool.getServerSession(); 355 obj = serverSess.getSession(); 356 if (obj instanceof Session) { 357 sess = (Session)obj; 358 } else if (obj instanceof XASession) { 359 sess = ((XASession)obj).sess; 360 } else throw new Error ("Unexpected session type: " + obj); 361 sess.setConnectionConsumer(ConnectionConsumer.this); 362 } 363 } 364 } 365 } 366 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 369 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "No more delivery."); 370 if (counter > 1) { 371 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 372 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "Starts the" 373 + " session."); 374 counter = 1; 375 serverSess.start(); 376 } 377 } 378 catch (JMSException jE) { 381 canStop = true; 382 try { 383 ConnectionConsumer.this.close(); 384 } 385 catch (JMSException jE2) {} 386 } 387 } 388 } 389 finally { 390 finish(); 391 } 392 } 393 394 protected void shutdown() {} 395 396 protected void close() {} 397 } 398 } 399 | Popular Tags |