1 6 7 package org.jfox.jms; 8 9 import java.rmi.RemoteException ; 10 import java.rmi.server.UnicastRemoteObject ; 11 import java.util.ArrayList ; 12 import java.util.HashMap ; 13 import java.util.List ; 14 import java.util.Map ; 15 import javax.jms.Connection ; 16 import javax.jms.ConnectionConsumer ; 17 import javax.jms.ConnectionMetaData ; 18 import javax.jms.Destination ; 19 import javax.jms.ExceptionListener ; 20 import javax.jms.JMSException ; 21 import javax.jms.Queue ; 22 import javax.jms.QueueConnection ; 23 import javax.jms.QueueSession ; 24 import javax.jms.ServerSessionPool ; 25 import javax.jms.Session ; 26 import javax.jms.Topic ; 27 import javax.jms.TopicConnection ; 28 import javax.jms.TopicSession ; 29 import javax.jms.XAConnection ; 30 import javax.jms.XAQueueConnection ; 31 import javax.jms.XAQueueSession ; 32 import javax.jms.XASession ; 33 import javax.jms.XATopicConnection ; 34 import javax.jms.XATopicSession ; 35 36 import org.jfox.jms.connector.JMSContainer; 37 import org.jfox.jms.message.JMSMessage; 38 39 42 43 public class JMSConnection implements Connection , 44 QueueConnection , 45 TopicConnection , 46 XAConnection , 47 XAQueueConnection , 48 XATopicConnection , 49 JMSConnectionRemote { 50 51 protected JMSContainer container = null; 52 protected boolean started = false; 53 protected boolean closed = false; 54 55 protected String clientId = null; 56 57 protected boolean isXA = false; 58 59 63 protected transient Map <String , JMSSession> sessions = new HashMap <String , JMSSession>(); 64 65 public JMSConnection(String clientId, JMSContainer container, boolean isXA) { 66 this.clientId = clientId; 67 this.container = container; 68 this.isXA = isXA; 69 } 70 71 80 public synchronized Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 81 checkClosed(); 82 if (transacted) { 83 acknowledgeMode = Session.SESSION_TRANSACTED; 84 } 85 JMSSession session = new JMSSession(this, transacted, acknowledgeMode, false); 86 container.registerSession(clientId, session.getSessionId()); 87 synchronized (sessions) { 88 sessions.put(session.getSessionId(), session); 89 } 90 return session; 91 } 92 93 public String getClientID() throws JMSException { 94 return clientId; 95 } 96 97 public void setClientID(String clientId) throws JMSException { 98 checkClosed(); 99 if (started) { 100 throw new IllegalStateException ("connection has already started, can not set client identifier."); 101 } 102 throw new IllegalStateException ("client identifier has already been setted by system."); 103 } 104 105 public ConnectionMetaData getMetaData() throws JMSException { 106 throw new JMSException ("not support now!"); 107 } 108 109 public ExceptionListener getExceptionListener() throws JMSException { 110 throw new JMSException ("not support now!"); 111 } 112 113 public void setExceptionListener(ExceptionListener listener) throws JMSException { 114 throw new JMSException ("not support now!"); 115 } 116 117 122 public synchronized void start() throws JMSException { 123 if (!started) { 124 container.startConnection(getClientID()); 125 this.started = true; 126 } 127 } 128 129 public synchronized void stop() throws JMSException { 130 if (started) { 131 container.stopConnection(getClientID()); 132 started = false; 133 } 134 } 135 136 public synchronized void close() throws JMSException { 137 if (closed) return; 138 this.stop(); 139 closed = true; 140 141 List <JMSSession> list = new ArrayList <JMSSession>(sessions.values()); 142 for (JMSSession session : list) { 143 session.close(); 144 } 145 146 try { 147 container.unregisterConnection(this.getClientID()); 148 UnicastRemoteObject.unexportObject(this, false); 149 } catch (Exception e) { 150 e.printStackTrace(); 151 } 152 153 } 154 155 public ConnectionConsumer createConnectionConsumer(Destination destination, 156 String messageSelector, 157 ServerSessionPool sessionPool, 158 int maxMessages) throws JMSException { 159 checkClosed(); 160 throw new JMSException ("not support now!"); 161 } 162 163 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 164 String subscriptionName, 165 String messageSelector, 166 ServerSessionPool sessionPool, 167 int maxMessages) throws JMSException { 168 checkClosed(); 169 throw new JMSException ("not support now!"); 170 } 171 172 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 173 return (QueueSession ) createSession(transacted, acknowledgeMode); 174 } 175 176 public ConnectionConsumer createConnectionConsumer(Queue queue, 177 String messageSelector, 178 ServerSessionPool sessionPool, 179 int maxMessages) throws JMSException { 180 throw new JMSException ("not support now!"); 181 } 182 183 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 184 return (TopicSession ) createSession(transacted, acknowledgeMode); 185 } 186 187 public ConnectionConsumer createConnectionConsumer(Topic topic, 188 String messageSelector, 189 ServerSessionPool sessionPool, 190 int maxMessages) throws JMSException { 191 throw new JMSException ("not support now!"); 192 } 193 194 public XASession createXASession() throws JMSException { 195 checkClosed(); 196 if (!isXA) { 197 throw new JMSException ("current connection " + this + " is not an xa connection"); 198 } 199 200 JMSSession session = new JMSSession(this, true, Session.SESSION_TRANSACTED, true); 201 synchronized (sessions) { 202 sessions.put(session.getSessionId(), session); 203 } 204 return session; 205 } 206 207 public XAQueueSession createXAQueueSession() throws JMSException { 208 return (XAQueueSession ) createXASession(); 209 } 210 211 public XATopicSession createXATopicSession() throws JMSException { 212 return (XATopicSession ) createXASession(); 213 } 214 215 public void onMessage(String sessionId, String consumerId, JMSMessage msg) throws RemoteException , JMSException { 216 JMSSession session = sessions.get(sessionId); 217 session.onMessage(consumerId, msg); 218 } 219 220 JMSContainer getContainer() { 221 return container; 222 } 223 224 protected void checkClosed() throws javax.jms.IllegalStateException { 225 if (closed) { 226 throw new javax.jms.IllegalStateException ("connection closed"); 227 } 228 } 229 230 boolean isStarted() { 231 return started; 232 } 233 234 void closeSession(String sessionId) throws JMSException { 235 sessions.remove(sessionId); 236 container.closeSession(clientId, sessionId); 237 } 238 239 public static void main(String [] args) { 240 241 } 242 } 243 | Popular Tags |