1 package com.ubermq.jms.client.impl; 2 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 import com.ubermq.jms.client.*; 5 import com.ubermq.jms.client.msg.*; 6 import com.ubermq.jms.common.datagram.*; 7 import com.ubermq.kernel.*; 8 import java.util.*; 9 import javax.jms.*; 10 11 import javax.jms.Queue ; 12 13 19 class Session 20 implements javax.jms.Session 21 { 22 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(Session.class); 23 24 private static final int SESSION_BUFFER_SIZE = 25 Integer.valueOf(Configurator.getProperty(ClientConfig.SESSION_BOUNDED_BUFFER_SIZE, "100")).intValue(); 26 27 private static final String TEMP_TOPIC_PREFIX = "$TT-"; 28 private static final String TEMP_QUEUE_PREFIX = "$TEMPQ-"; 29 30 static final boolean DEFAULT_NO_LOCAL = false; 31 32 35 Connection conn; 36 37 40 final IMessageDatagramFactory factory; 41 42 46 private MessageListener listener; 47 48 51 final int ackMode; 52 53 56 private List consumers; 57 58 private boolean isClosing = false; 60 61 private boolean needsAsyncDelivery = false; 63 64 private Channel deliveryQueue; 66 private PooledExecutor executor; 67 68 73 Session(Connection conn, IMessageDatagramFactory f, int ackMode) 74 { 75 this.conn = conn; 76 this.factory = f; 77 this.ackMode = ackMode; 78 this.consumers = new ArrayList(); 79 80 deliveryQueue = new BoundedPriorityQueue(SESSION_BUFFER_SIZE, 81 new Comparator() { 82 public int compare(Object o1, Object o2) 83 { 84 DeliveryTask dt1 = (DeliveryTask)o1, 85 dt2 = (DeliveryTask)o2; 86 87 return ((LocalMessage)dt1.msg).compareTo(dt2.msg); 88 } 89 }); 90 } 91 92 97 void addConsumer(AbstractConsumer c) 98 { 99 consumers.add(c); 100 } 101 102 108 void removeConsumer(AbstractConsumer c) 109 { 110 consumers.remove(c); 111 } 112 113 115 private String createRandomIdentifier() 116 { 117 return com.ubermq.util.Utility.allocateLocallyUniqueInt() + "-" + new Random().nextInt(); 118 } 119 120 122 public TopicSubscriber createDurableSubscriber(Topic t, String name, String selector, boolean noLocal) 123 throws JMSException 124 { 125 if (isClosing) 126 throw new javax.jms.IllegalStateException ("closed"); 127 128 return new LocalTopicSubscriber(t, 129 selector, 130 noLocal, 131 name, 132 this, 133 (selector != null) ? new NullDeliveryManager() : conn.delivery.newInstance()); 134 } 135 136 public Topic createTopic(String p0) throws JMSException 137 { 138 if (isClosing) 139 throw new javax.jms.IllegalStateException ("closed"); 140 141 return new LocalTopic(p0); 142 } 143 144 public TopicSubscriber createDurableSubscriber(Topic t, String name) throws JMSException 145 { 146 if (isClosing) 147 throw new javax.jms.IllegalStateException ("closed"); 148 149 return createDurableSubscriber(t, name, null, DEFAULT_NO_LOCAL); 150 } 151 152 public void unsubscribe(String name) throws JMSException 153 { 154 if (isClosing) 155 throw new javax.jms.IllegalStateException ("closed"); 156 157 conn.getClientProcessor().unregisterDurableSubscription(name); 158 } 159 160 public TemporaryTopic createTemporaryTopic() throws JMSException 161 { 162 if (isClosing) 163 throw new javax.jms.IllegalStateException ("closed"); 164 165 return new LocalTopic(TEMP_TOPIC_PREFIX + createRandomIdentifier()); 166 } 167 168 170 public TemporaryQueue createTemporaryQueue() throws JMSException 171 { 172 if (isClosing) 173 throw new javax.jms.IllegalStateException ("closed"); 174 175 return new LocalQueue(this, TEMP_QUEUE_PREFIX + createRandomIdentifier()); 176 } 177 178 public QueueBrowser createBrowser(Queue q) throws JMSException 179 { 180 if (isClosing) 181 throw new javax.jms.IllegalStateException ("closed"); 182 183 return createBrowser(q, null); 184 } 185 186 public QueueBrowser createBrowser(Queue q, String selector) throws JMSException 187 { 188 if (isClosing) 189 throw new javax.jms.IllegalStateException ("closed"); 190 191 throw new UnsupportedOperationException (); 192 } 193 194 public Queue createQueue(String name) throws JMSException 195 { 196 if (isClosing) 197 throw new javax.jms.IllegalStateException ("closed"); 198 199 return new LocalQueue(this, name); 200 } 201 202 204 public int getAcknowledgeMode() throws JMSException 205 { 206 return ackMode; 207 } 208 209 public MessageConsumer createConsumer(Destination p0) throws JMSException 210 { 211 if (isClosing) 212 throw new javax.jms.IllegalStateException ("closed"); 213 214 return createConsumer(p0, null, DEFAULT_NO_LOCAL); 215 } 216 217 public MessageConsumer createConsumer(Destination d, String selector) throws JMSException 218 { 219 if (isClosing) 220 throw new javax.jms.IllegalStateException ("closed"); 221 222 return createConsumer(d, selector, DEFAULT_NO_LOCAL); 223 } 224 225 public MessageConsumer createConsumer(Destination d, String selector, boolean noLocal) 226 throws JMSException 227 { 228 if (isClosing) 229 throw new javax.jms.IllegalStateException ("closed"); 230 231 AbstractConsumer c; 232 233 if (d instanceof Queue ) 234 c = new QueueReceiver((Queue )d, selector, this); 235 else if (d instanceof Topic) 236 { 237 c = new LocalTopicSubscriber((Topic)d, 238 selector, 239 noLocal, 240 this, 241 (selector != null) ? new NullDeliveryManager() : conn.delivery.newInstance()); 242 } 243 else 244 throw new InvalidDestinationException("destination must be a topic or queue"); 245 246 return c; 247 } 248 249 public MessageProducer createProducer(Destination p0) 250 throws JMSException 251 { 252 if (isClosing) 253 throw new javax.jms.IllegalStateException ("closed"); 254 255 return new AbstractProducer(p0, this); 256 } 257 258 260 public BytesMessage createBytesMessage() 261 throws JMSException 262 { 263 if (isClosing) 264 throw new javax.jms.IllegalStateException ("closed"); 265 266 return new LocalBytesMessage(factory); 267 } 268 269 public MapMessage createMapMessage() 270 throws JMSException 271 { 272 if (isClosing) 273 throw new javax.jms.IllegalStateException ("closed"); 274 275 return new LocalMapMessage(factory); 276 } 277 278 public Message createMessage() 279 throws JMSException 280 { 281 if (isClosing) 282 throw new javax.jms.IllegalStateException ("closed"); 283 284 return new LocalMessage(factory); 285 } 286 287 public ObjectMessage createObjectMessage() 288 throws JMSException 289 { 290 if (isClosing) 291 throw new javax.jms.IllegalStateException ("closed"); 292 293 return new LocalObjectMessage(factory); 294 } 295 296 public ObjectMessage createObjectMessage(java.io.Serializable object) 297 throws JMSException 298 { 299 if (isClosing) 300 throw new javax.jms.IllegalStateException ("closed"); 301 302 ObjectMessage om = createObjectMessage(); 303 om.setObject(object); 304 return om; 305 } 306 307 public StreamMessage createStreamMessage() 308 throws JMSException 309 { 310 if (isClosing) 311 throw new javax.jms.IllegalStateException ("closed"); 312 313 return new LocalStreamMessage(factory); 314 } 315 316 public TextMessage createTextMessage() 317 throws JMSException 318 { 319 if (isClosing) 320 throw new javax.jms.IllegalStateException ("closed"); 321 322 return new LocalTextMessage(factory); 323 } 324 325 public TextMessage createTextMessage(String text) 326 throws JMSException 327 { 328 TextMessage tm = createTextMessage(); 329 tm.setText(text); 330 return tm; 331 } 332 333 public boolean getTransacted() 334 { 335 return false; 336 } 337 338 public void commit() 339 throws JMSException 340 { 341 } 344 345 public void rollback() 346 throws JMSException 347 { 348 throw new JMSUnsupportedOperationException(); 351 } 352 353 boolean isClosing() {return isClosing;} 354 355 public void close() throws JMSException 356 { 357 if (!isClosing) 358 { 359 isClosing = true; 360 conn.removeSession(this); 361 stopDeliveryThread(); 362 } 363 } 364 365 public void recover() 366 throws JMSException 367 { 368 throw new JMSUnsupportedOperationException(); 374 } 375 376 public MessageListener getMessageListener() 377 throws JMSException 378 { 379 return listener; 380 } 381 382 public void setMessageListener(MessageListener listener) 383 throws JMSException 384 { 385 if (isClosing) 386 throw new javax.jms.IllegalStateException ("closed"); 387 388 this.listener = listener; 389 checkDeliveryThread(); 390 } 391 392 public void run() 393 { 394 } 395 396 404 void asyncDelivery(final Message msg, 405 final AbstractConsumer sub, 406 final MessageListener listener) 407 throws InterruptedException 408 { 409 deliveryQueue.put(new DeliveryTask(msg, sub, listener)); 410 } 411 412 private class DeliveryTask 413 implements Runnable , Comparable 414 { 415 private final Message msg; 416 private final AbstractConsumer sub; 417 private final MessageListener listener; 418 419 DeliveryTask(Message msg, 420 AbstractConsumer sub, 421 MessageListener listener) 422 { 423 this.msg = msg; 424 this.sub = sub; 425 this.listener = listener; 426 } 427 428 public int compareTo(Object o) 429 { 430 try 431 { 432 return msg.getJMSPriority() - ((DeliveryTask)o).msg.getJMSPriority(); 433 } 434 catch (JMSException e) { 435 return 0; 436 } 437 } 438 439 public void run() { 440 try { 441 if (!sub.isClosing() && 442 !isClosing()) 443 { 444 listener.onMessage(msg); 445 } 446 } catch(RuntimeException re) { 447 log.error("", re); 448 if (ackMode == Session.CLIENT_ACKNOWLEDGE) { 449 ; 451 } else { 452 try { 455 listener.onMessage(msg); 456 } catch(RuntimeException re2) { 457 ; } 459 } 460 } 461 } 462 } 463 464 467 synchronized private void startDeliveryThread() 468 { 469 executor = new PooledExecutor(deliveryQueue); 470 executor.setThreadFactory( new ThreadFactory() { 471 public Thread newThread(Runnable p0) 472 { 473 Thread t = new Thread (p0, "Session Delivery Thread"); 474 t.setDaemon(false); 475 return t; 476 } 477 }); 478 executor.waitWhenBlocked(); 479 executor.setKeepAliveTime(-1); 480 executor.setMinimumPoolSize(1); 481 executor.setMaximumPoolSize(1); 482 executor.createThreads(1); 483 } 484 485 490 synchronized void requestDeliveryThread() 491 { 492 this.needsAsyncDelivery = true; 493 } 494 495 500 synchronized void checkDeliveryThread() 501 { 502 if (executor == null && 503 needsAsyncDelivery) 504 startDeliveryThread(); 505 } 506 507 510 synchronized void stopDeliveryThread() 511 { 512 if (executor != null) { 513 executor.shutdownNow(); 514 executor = null; 515 } 516 } 517 518 521 void pause() 522 { 523 stopDeliveryThread(); 524 525 Iterator iter = consumers.iterator(); 526 while (iter.hasNext()) 527 { 528 AbstractConsumer lts = (AbstractConsumer)iter.next(); 529 lts.pause(); 530 } 531 } 532 533 536 void resume() 537 { 538 checkDeliveryThread(); 539 540 Iterator iter = consumers.iterator(); 541 while (iter.hasNext()) 542 { 543 AbstractConsumer lts = (AbstractConsumer)iter.next(); 544 lts.resume(); 545 } 546 } 547 548 } 549 550 | Popular Tags |