1 7 package org.jboss.jms.serverless; 8 9 import org.jboss.logging.Logger; 10 import javax.jms.Session ; 11 import java.util.List ; 12 import java.util.ArrayList ; 13 import javax.jms.JMSException ; 14 import java.util.Iterator ; 15 import javax.jms.Message ; 16 17 25 class SessionManager implements Runnable { 26 27 private static final Logger log = Logger.getLogger(SessionManager.class); 28 29 private GroupConnection connection; 30 private org.jgroups.util.Queue deliveryQueue; 31 private Thread deliveryThread; 32 private List sessions; 33 private int sessionCounter = 0; 34 35 SessionManager(GroupConnection connection, org.jgroups.util.Queue deliveryQueue) { 36 37 this.connection = connection; 38 this.deliveryQueue = deliveryQueue; 39 sessions = new ArrayList (); 40 deliveryThread = new Thread (this, "Session Delivery Thread"); 41 deliveryThread.start(); 42 } 43 44 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 45 46 Session s = new SessionImpl(this, generateSessionID(), transacted, acknowledgeMode); 47 synchronized(sessions) { 48 sessions.add(s); 49 } 50 return s; 51 } 52 53 57 GroupConnection getConnection() { 58 return connection; 59 } 60 61 private void deliver(Message m) { 63 64 70 synchronized(sessions) { 71 for(Iterator i = sessions.iterator(); i.hasNext(); ) { 72 ((SessionImpl)i.next()).deliver(m); 73 } 74 } 75 } 76 77 78 private void deliver(Message m, String sessionID, String queueReceiverID) { 80 81 82 88 SessionImpl session = null; 89 synchronized(sessions) { 90 for(Iterator i = sessions.iterator(); i.hasNext(); ) { 91 SessionImpl crts = (SessionImpl)i.next(); 92 if (crts.getID().equals(sessionID)) { 93 session = crts; 94 break; 95 } 96 } 97 } 98 if (session == null) { 99 log.error("No such session: "+sessionID+". Delivery failed!"); 100 } 101 else { 102 session.deliver(m, queueReceiverID); 103 } 104 } 105 106 111 void advertiseQueueReceiver(String sessionID, QueueReceiverImpl qr, boolean isOn) 112 throws JMSException { 113 try { 114 connection. 115 advertiseQueueReceiver(qr.getQueue().getQueueName(), sessionID, qr.getID(), isOn); 116 } 117 catch(ProviderException e) { 118 String msg = "Cannot advertise queue receiver"; 120 JMSException jmse = new JMSException (msg); 121 jmse.setLinkedException(e); 122 throw jmse; 123 } 124 } 125 126 130 134 private synchronized String generateSessionID() { 135 return Integer.toString(sessionCounter++); 136 } 137 138 142 public void run() { 143 144 while(true) { 145 146 try { 147 Object o = deliveryQueue.remove(); 148 if (o instanceof javax.jms.Message ) { 149 deliver((javax.jms.Message )o); 150 } 151 else if (o instanceof QueueCarrier) { 152 QueueCarrier qc = (QueueCarrier)o; 153 deliver(qc.getJMSMessage(), qc.getSessionID(), qc.getReceiverID()); 154 } 155 else { 156 log.warn("Unknown delivery object: " + 157 (o == null ? "null" : o.getClass().getName())); 158 } 159 } 160 catch(Exception e) { 161 log.warn("Failed to remove element from the delivery queue", e); 162 } 163 } 164 } 165 166 170 } 171 | Popular Tags |