1 24 package com.scalagent.kjoram; 25 26 import com.scalagent.kjoram.jms.*; 27 import com.scalagent.kjoram.excepts.*; 28 29 import java.util.Vector ; 30 import java.util.Enumeration ; 31 32 33 public class ConnectionConsumer 34 { 35 36 private Connection cnx; 37 38 private boolean durable = false; 39 40 private String selector; 41 42 private ServerSessionPool sessionPool; 43 44 private int maxMessages; 45 49 private CCDaemon ccDaemon; 50 51 private com.scalagent.kjoram.jms.AbstractJmsRequest currentReq = null; 52 53 private boolean closed = false; 54 55 56 String targetName; 57 58 boolean queueMode = true; 59 63 com.scalagent.kjoram.util.Queue repliesIn; 64 65 66 84 ConnectionConsumer(Connection cnx, Destination dest, String subName, 85 String selector, ServerSessionPool sessionPool, 86 int maxMessages) throws JMSException 87 { 88 if (sessionPool == null) 89 throw new JMSException("Invalid ServerSessionPool parameter: " 90 + sessionPool); 91 if (maxMessages <= 0) 92 throw new JMSException("Invalid maxMessages parameter: " + maxMessages); 93 94 this.cnx = cnx; 95 this.selector = selector; 96 this.sessionPool = sessionPool; 97 this.maxMessages = maxMessages; 98 99 if (dest instanceof Queue) 100 targetName = dest.getName(); 101 else if (subName == null) { 102 queueMode = false; 103 targetName = cnx.nextSubName(); 104 } 105 else { 106 queueMode = false; 107 targetName = subName; 108 durable = true; 109 } 110 111 repliesIn = new com.scalagent.kjoram.util.Queue(); 112 113 if (cnx.cconsumers == null) 114 cnx.cconsumers = new Vector (); 115 116 cnx.cconsumers.addElement(this); 117 118 ccDaemon = new CCDaemon(this); 119 ccDaemon.setDaemon(true); 120 ccDaemon.start(); 121 122 if (! queueMode) 124 cnx.syncRequest(new ConsumerSubRequest(dest.getName(), targetName, 125 selector, false, durable)); 126 127 currentReq = new ConsumerSetListRequest(targetName, selector, queueMode); 129 currentReq.setRequestId(cnx.nextRequestId()); 130 cnx.requestsTable.put(currentReq.getKey(), this); 131 cnx.asyncRequest(currentReq); 132 133 if (JoramTracing.dbgClient) 134 JoramTracing.log(JoramTracing.DEBUG, this + ": created."); 135 } 136 137 154 ConnectionConsumer(Connection cnx, Destination dest, String selector, 155 ServerSessionPool sessionPool, 156 int maxMessages) throws JMSException 157 { 158 this(cnx, dest, null, selector, sessionPool, maxMessages); 159 } 160 161 162 public String toString() 163 { 164 return "ConnCons:" + cnx.toString(); 165 } 166 167 168 173 public ServerSessionPool getServerSessionPool() throws JMSException 174 { 175 if (closed) 176 throw new com.scalagent.kjoram.excepts. 177 IllegalStateException("Forbidden call on a closed" 178 + " ConnectionConsumer."); 179 return sessionPool; 180 } 181 182 183 188 public void close() throws JMSException 189 { 190 cnx.requestsTable.remove(currentReq.getKey()); 191 ccDaemon.stop(); 192 193 if (! queueMode) { 195 try { 196 if (durable) 197 cnx.syncRequest(new ConsumerCloseSubRequest(targetName)); 198 else 199 cnx.syncRequest(new ConsumerUnsubRequest(targetName)); 200 } 201 catch (JMSException jE) {} 203 } 204 cnx.cconsumers.removeElement(this); 205 } 206 207 211 class CCDaemon extends com.scalagent.kjoram.util.Daemon 212 { 213 214 private ConnectionConsumer cc; 215 216 220 CCDaemon(ConnectionConsumer cc) 221 { 222 super(cc.toString()); 223 this.cc = cc; 224 } 225 226 227 public void run() 228 { 229 ConsumerMessages reply; 230 Vector deliveries = new Vector (); 231 ServerSession serverSess; 232 Session sess; 233 int counter; 234 235 try { 236 while (running) { 237 canStop = true; 238 239 try { 240 repliesIn.get(); 242 } 243 catch (Exception iE) { 244 continue; 245 } 246 canStop = false; 247 248 try { 250 if (JoramTracing.dbgClient) 251 JoramTracing.log(JoramTracing.DEBUG, "--- " + cc 252 + ": got a delivery."); 253 254 serverSess = sessionPool.getServerSession(); 256 sess = (Session) serverSess.getSession(); 257 sess.connectionConsumer = cc; 258 counter = 1; 259 260 while (counter <= maxMessages && repliesIn.size() > 0) { 263 264 if (queueMode) { 266 cnx.requestsTable.remove(currentReq.getKey()); 267 currentReq = new ConsumerSetListRequest(targetName, selector, 268 queueMode); 269 currentReq.setRequestId(cnx.nextRequestId()); 270 cnx.requestsTable.put(currentReq.getKey(), cc); 271 cnx.asyncRequest(currentReq); 272 } 273 274 reply = (ConsumerMessages) repliesIn.pop(); 275 for (Enumeration e = reply.getMessages().elements(); e.hasMoreElements(); ) { 276 deliveries.addElement(e.nextElement()); 277 } 278 279 while (! deliveries.isEmpty()) { 280 while (counter <= maxMessages && ! deliveries.isEmpty()) { 281 if (JoramTracing.dbgClient) 282 JoramTracing.log(JoramTracing.DEBUG, "Passes a" 283 + " message to a session."); 284 Object obj = deliveries.elementAt(0); 285 deliveries.removeElementAt(0); 286 sess.repliesIn.push(obj); 287 counter++; 288 } 289 if (counter > maxMessages) { 290 if (JoramTracing.dbgClient) 291 JoramTracing.log(JoramTracing.DEBUG, "Starts the" 292 + " session."); 293 serverSess.start(); 294 counter = 1; 295 296 if (! deliveries.isEmpty() || repliesIn.size() > 0) { 297 serverSess = sessionPool.getServerSession(); 298 sess = 299 (Session) serverSess.getSession(); 300 sess.connectionConsumer = cc; 301 } 302 } 303 } 304 } 305 if (JoramTracing.dbgClient) 308 JoramTracing.log(JoramTracing.DEBUG, "No more delivery."); 309 if (counter > 1) { 310 if (JoramTracing.dbgClient) 311 JoramTracing.log(JoramTracing.DEBUG, "Starts the" 312 + " session."); 313 counter = 1; 314 serverSess.start(); 315 } 316 } 317 catch (JMSException jE) { 320 canStop = true; 321 try { 322 cc.close(); 323 } 324 catch (JMSException jE2) {} 325 } 326 } 327 } 328 finally { 329 finish(); 330 } 331 } 332 333 334 public void shutdown() 335 {} 336 337 338 public void close() 339 { 340 if (JoramTracing.dbgClient) 341 JoramTracing.log(JoramTracing.DEBUG, "CCDaemon finished."); 342 } 343 } 344 } 345 | Popular Tags |