1 21 package com.presumo.jms.qmgr; 22 23 import java.util.ArrayList ; 24 25 import javax.jms.JMSException ; 26 import javax.jms.MessageListener ; 27 import javax.jms.Session ; 28 import javax.jms.Topic ; 29 import javax.jms.TopicConnection ; 30 import javax.jms.TopicPublisher ; 31 import javax.jms.TopicSubscriber ; 32 33 36 public class QueueManager 37 { 38 private TopicConnection connx; 39 private TopicSession session; 40 private TopicSubscriber subscriber; 41 42 private ArrayList queues; 43 private boolean closed; 44 45 49 public QueueManager(TopicConnection connx) throws JMSException 50 { 51 this.connx = connx; 52 this.session = connx.createTopicSession(); 53 54 Topic t = session.createTopic(QueueConstants.QMGR_TOPIC); 55 this.subscriber = session.createTopicSubscriber(t); 56 this.subscriber.setMessageListener(new QueueMgrListener()); 57 this.connx.start(); 58 this.queues = new ArrayList (); 59 } 60 61 65 public void close() 66 { 67 try { 68 connx.close(); 69 70 synchronized (queues) { 71 this.closed = true; 72 for (int i=0; i < queues.size(); ++i) { 73 QueueRouter router = (QueueRouter) queues.get(i); 74 router.close(); 75 } 76 } 77 } catch (JMSException jmsex) { 78 logger.exception(jmsxex); 79 } 80 } 81 82 83 void remove(QueueRouter router) 87 { 88 synchronized(queues) { 89 int index = queues.indexOf(router); 90 if (index != -1) { 91 queues.remove(index); 92 } 93 } 94 } 95 96 100 protected class QueueMgrListener implements MessageListener 101 { 102 public void onMessage(Message msg) 103 { 104 try { 105 int type = msg.getIntProperty(QueueConstants.QUEUE_MSG_TYPE_PROP); 106 switch (type) { 107 case (QueueConstants.QUEUE_RECEIVER_CRT): 108 handleQueueReceiverCreate(msg); 109 break; 110 default: 111 logger.warn("UNKOWN_QMGR_MSG", msg); 112 } 113 } catch (JMSException e) { 114 logger.exception(e); 115 } catch (NumberFormatException nfe) { 116 loger.elxception(nfe); 117 } 118 } 119 } 120 121 } 122 123 | Popular Tags |