1 7 package org.jboss.jms.client.p2p; 8 9 import java.io.Serializable ; 10 import java.net.URL ; 11 import java.util.ArrayList ; 12 import java.util.Collection ; 13 import java.util.Enumeration ; 14 import java.util.Iterator ; 15 import java.util.List ; 16 17 import javax.jms.ExceptionListener ; 18 import javax.jms.JMSException ; 19 20 import org.jgroups.Address; 21 import org.jgroups.Channel; 22 import org.jgroups.ChannelException; 23 import org.jgroups.ChannelListener; 24 import org.jgroups.JChannelFactory; 25 import org.jgroups.Message; 26 import org.jgroups.MessageListener; 27 import org.jgroups.blocks.PullPushAdapter; 28 import org.jboss.jms.MessageImpl; 29 import org.jboss.jms.client.ConnectionDelegate; 30 import org.jboss.jms.client.SessionDelegate; 31 import org.jboss.jms.destination.JBossTemporaryDestination; 32 import org.jboss.util.id.GUID; 33 34 41 public class P2PConnectionDelegate 42 implements ConnectionDelegate, ChannelListener, MessageListener 43 { 44 46 48 private String clientId = null; 49 private ExceptionListener exceptionListener = null; 50 private boolean closed = false; 51 private String password = null; 52 private String username = null; 53 private List sessions = new ArrayList (); 54 55 private Channel channel = null; 56 private PullPushAdapter connection = null; 57 private boolean started = false; 58 59 61 63 public P2PConnectionDelegate(String username, String password) 64 throws JMSException 65 { 66 this.username = username; 67 this.password = password; 68 69 try 70 { 71 URL url = Thread.currentThread().getContextClassLoader().getResource("org/jboss/jms/p2p/jgroups-config.xml"); 72 this.channel = new JChannelFactory().createChannel(url); 73 this.channel.setChannelListener(this); 74 this.channel.connect("org.jboss.jms.p2p"); 75 this.connection = new PullPushAdapter(this.channel, this); 76 this.connection.start(); 77 } 78 catch (ChannelException exception) 79 { 80 throw new JMSException (exception.getMessage()); 81 } 82 83 } 84 85 87 89 public void close() throws JMSException 90 { 91 Iterator iterator = this.sessions.iterator(); 92 while (iterator.hasNext()) 93 { 94 ((SessionDelegate) iterator.next()).close(); 95 iterator.remove(); 96 } 97 this.closed = true; 98 this.connection.stop(); 99 this.channel.disconnect(); 100 this.channel.close(); 101 } 102 103 public void closing() throws JMSException 104 { 105 } 106 107 public SessionDelegate createSession(boolean isXA, boolean transacted, int acknowledgeMode) throws JMSException 108 { 109 this.throwExceptionIfClosed(); 110 this.generateClientIDIfNull(); 111 SessionDelegate session = new P2PSessionDelegate(this, transacted, acknowledgeMode); 112 this.sessions.add(session); 113 return session; 114 } 115 116 public String getClientID() throws JMSException 117 { 118 this.throwExceptionIfClosed(); 119 this.generateClientIDIfNull(); 120 return this.clientId; 121 } 122 123 public Enumeration getJMSXPropertyNames() throws JMSException 124 { 125 return null; 127 } 128 129 public void deleteTempDestination(JBossTemporaryDestination destination) 130 { 131 } 133 134 public void setClientID(String id) throws JMSException 135 { 136 this.throwExceptionIfClosed(); 137 if (this.clientId != null) 138 { 139 throw new IllegalStateException ("The client Id has already been set by the provider. To supply your own value, you must set the client ID immediatly after creating the connection. See section 4.3.2 of the JMS specification for more information."); 140 } 141 this.clientId = id; 142 } 143 144 public void setExceptionListener(ExceptionListener listener) throws JMSException 145 { 146 this.throwExceptionIfClosed(); 147 this.generateClientIDIfNull(); 148 this.exceptionListener = listener; 149 } 150 151 public void start() throws JMSException 152 { 153 this.throwExceptionIfClosed(); 154 this.generateClientIDIfNull(); 155 this.started = true; 156 } 157 158 public void stop() throws JMSException 159 { 160 this.throwExceptionIfClosed(); 161 this.generateClientIDIfNull(); 162 this.started = false; 163 } 164 165 167 public void channelClosed(Channel arg0) 168 { 169 if (this.closed != false && this.exceptionListener != null) 170 { 171 this.exceptionListener.onException(new JMSException ("We were unexpectedly disconnected")); 172 } 173 } 174 175 public void channelConnected(Channel arg0) 176 { 177 } 178 179 public void channelDisconnected(Channel arg0) 180 { 181 this.channelClosed(channel); 182 } 183 184 public void channelReconnected(Address arg0) 185 { 186 } 187 188 public void channelShunned() 189 { 190 if (this.exceptionListener != null) 191 { 192 this.exceptionListener.onException(new JMSException ("We were shunned.")); 193 } 194 } 195 196 198 public byte[] getState() 199 { 200 return new byte[0]; 201 } 202 203 public void receive(Message message) 204 { 205 if (this.started) 206 { 207 Object object = message.getObject(); 208 if (object instanceof List ) 209 { 210 List theList = (List ) object; 211 Iterator iterator = theList.iterator(); 212 while (iterator.hasNext()) 213 { 214 Object listObject = iterator.next(); 215 if (listObject instanceof MessageImpl) 216 { 217 MessageImpl currentMessage = (MessageImpl)listObject; 218 if (currentMessage.getOrigianClientID().equals(this.clientId)) 219 { 220 currentMessage.setIsLocal(true); 221 } 222 Iterator sessionIterator = this.sessions.iterator(); 223 while (sessionIterator.hasNext()) 224 { 225 ((P2PSessionDelegate) sessionIterator.next()).deliver(currentMessage); 226 } 227 } 228 } 229 } 230 else if (object instanceof MessageImpl) 231 { 232 MessageImpl theMessage = (MessageImpl) object; 233 if (theMessage.getOrigianClientID().equals(this.clientId)) 234 { 235 theMessage.setIsLocal(true); 236 } 237 Iterator iterator = this.sessions.iterator(); 238 while (iterator.hasNext()) 239 { 240 ((P2PSessionDelegate) iterator.next()).deliver(theMessage); 241 } 242 } 243 } 244 } 245 246 public void setState(byte[] arg0) 247 { 248 } 249 250 252 public void finalize() throws Throwable 253 { 254 if (!this.closed) 255 { 256 this.close(); 257 } 258 } 259 260 262 264 268 void send(MessageImpl message) throws JMSException 269 { 270 try 271 { 272 message.setOriginClientID(this.clientId); 273 this.connection.send(new Message(null, null, (Serializable ) message)); 274 } 275 catch (Exception exception) 276 { 277 throw new JMSException (exception.getMessage()); 278 } 279 } 280 281 void send(Collection messages) throws JMSException 282 { 283 try 284 { 285 Iterator iterator = messages.iterator(); 286 while (iterator.hasNext()) 287 { 288 ((MessageImpl)iterator.next()).setOriginClientID(this.clientId); 289 } 290 this.connection.send(new Message(null, null, (Serializable ) messages)); 291 } 292 catch (Exception exception) 293 { 294 throw new JMSException (exception.getMessage()); 295 } 296 } 297 298 300 private void throwExceptionIfClosed() 301 { 302 if (this.closed) 303 { 304 throw new IllegalStateException ("The connection is closed."); 305 } 306 } 307 308 private synchronized void generateClientIDIfNull() throws JMSException 309 { 310 if (this.clientId == null) 311 { 312 this.setClientID(new GUID().toString().toUpperCase()); 313 } 314 } 315 316 318 } 319 | Popular Tags |