1 package com.ubermq.jms.client.impl; 2 3 import com.ubermq.*; 4 import com.ubermq.jms.client.*; 5 import com.ubermq.jms.common.datagram.*; 6 import com.ubermq.kernel.*; 7 import com.ubermq.kernel.event.*; 8 import com.ubermq.kernel.overflow.*; 9 import com.ubermq.util.*; 10 11 import java.io.*; 12 import java.util.*; 13 import javax.jms.*; 14 15 23 public class Connection 24 implements javax.jms.Connection , 25 javax.jms.QueueConnection , 26 javax.jms.TopicConnection , 27 IConnectionInfo, 28 ConnectionEventListener 29 { 30 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(Connection.class); 31 32 public static final int UBERMQ_MAJOR_VERSION = 2; 34 public static final int UBERMQ_MINOR_VERSION = 7; 35 public static final int UBERMQ_REVISION = 0; 36 public static final String UBERMQ_PROVIDER_NAME = "UberMQ"; 37 public static final String UBERMQ_PROVIDER_VERSION = UBERMQ_MAJOR_VERSION + "." + 38 UBERMQ_MINOR_VERSION + 39 ((UBERMQ_REVISION > 0) ? ("." + UBERMQ_REVISION) : ""); 40 public static final int JMS_MAJOR_VERSION = 1; 41 public static final int JMS_MINOR_VERSION = 1; 42 public static final String JMS_VERSION = "1.1"; 43 44 47 public static final long DEFAULT_TIMEOUT = 30000L; 48 49 52 final IClientSession theClient; 53 54 57 IConnectionInfo theConn; 58 59 62 final IClientProcessor clientProc; 63 64 67 final IDeliveryManager delivery; 68 69 73 ExceptionListener exceptionListener; 74 75 78 private List eventHandlers; 79 80 83 private String clientId; 84 85 88 private List sessions; 89 90 93 private Set localSenders; 94 95 98 ConnectionDescriptor connDescriptor; 99 100 103 private volatile boolean open; 104 105 108 private volatile boolean started; 109 110 113 final DatagramFactoryHolder factories; 114 115 124 public Connection(IClientSession theClient, 125 IClientProcessor clientProc, 126 IDeliveryManager delivery, 127 DatagramFactoryHolder factories, 128 ConnectionDescriptor conn) 129 throws IOException 130 { 131 this.theClient = theClient; 132 this.clientProc = clientProc; 133 this.delivery = delivery; 134 this.connDescriptor = conn; 135 this.factories = factories; 136 this.eventHandlers = new LinkedList(); 137 this.sessions = new ArrayList(); 138 this.localSenders = new HashSet(); 139 this.clientId = null; 140 141 this.theConn = theClient.connect(this, connDescriptor, clientProc); 142 theConn.addEventListener(this); 143 144 theClient.started(theConn); 145 146 open = true; 147 started = false; 148 } 149 150 public void close() 151 { 152 if (open) { 153 open = false; 154 started = false; 155 156 theConn.close(); 158 159 sendEvent(ConnectionEvent.CONNECTION_CLOSED); 161 } else { 162 } 165 } 166 167 171 public void reconnect() 172 throws IOException 173 { 174 reconnect(connDescriptor); 175 } 176 177 181 public synchronized void reconnect(ConnectionDescriptor conn) 182 throws IOException 183 { 184 theConn = theClient.connect(this, conn, clientProc); 185 theConn.addEventListener(this); 186 187 open = true; 188 189 theClient.started(theConn); 191 192 clientProc.reconnected(); 195 start(); 196 197 sendEvent(ConnectionEvent.CONNECTION_RECONNECTED); 199 } 200 201 public void setClientID(String id) 202 throws JMSException 203 { 204 if (clientId == null && 205 !started) 206 { 207 clientId = id; 208 } 209 else 210 throw new javax.jms.IllegalStateException ("Cannot set Client ID at this time."); 211 } 212 213 public String getClientID() 214 { 215 return clientId; 216 } 217 218 void addSession(Session s) 219 { 220 sessions.add(s); 221 } 222 223 void removeSession(Session s) 224 { 225 sessions.remove(s); 226 } 227 228 public void stop() 229 { 230 if (started) 231 { 232 started = false; 233 234 Iterator iter = sessions.iterator(); 236 while (iter.hasNext()) 237 { 238 Session ts = (Session)iter.next(); 239 ts.pause(); 240 } 241 242 theConn.stop(); 244 } 245 } 246 247 public void start() 248 { 249 if (!started) 250 { 251 theConn.start(); 252 253 Iterator iter = sessions.iterator(); 254 while (iter.hasNext()) 255 { 256 Session ts = (Session)iter.next(); 257 ts.resume(); 258 } 259 260 started = true; 261 } 262 } 263 264 public boolean isOpen() {return open;} 265 266 public ConnectionMetaData getMetaData() 267 { 268 return new ConnectionMetaData() { 269 public String getJMSProviderName() throws JMSException 270 { 271 return UBERMQ_PROVIDER_NAME; 272 } 273 274 public String getProviderVersion() throws JMSException 275 { 276 return UBERMQ_PROVIDER_VERSION; 277 } 278 279 public String getJMSVersion() throws JMSException 280 { 281 return JMS_VERSION; 282 } 283 284 public java.util.Enumeration getJMSXPropertyNames() throws JMSException 285 { 286 return new java.util.Vector ().elements(); 287 } 288 289 public int getProviderMinorVersion() throws JMSException 290 { 291 return UBERMQ_MINOR_VERSION; 292 } 293 294 public int getJMSMajorVersion() throws JMSException 295 { 296 return JMS_MAJOR_VERSION; 297 } 298 299 public int getProviderMajorVersion() throws JMSException 300 { 301 return UBERMQ_MAJOR_VERSION; 302 } 303 304 public int getJMSMinorVersion() throws JMSException 305 { 306 return JMS_MINOR_VERSION; 307 } 308 }; 309 } 310 311 public void setExceptionListener(ExceptionListener el) 312 { 313 exceptionListener = el; 314 } 315 316 public ExceptionListener getExceptionListener() {return exceptionListener;} 317 318 public void addEventListener(ConnectionEventListener l) 319 { 320 eventHandlers.add(l); 321 } 322 323 public void removeEventListener(ConnectionEventListener l) 324 { 325 eventHandlers.remove(l); 326 } 327 328 333 void sendEvent(ConnectionEvent event) 334 { 335 log.debug("sending connection event " + event); 336 337 Iterator iter = eventHandlers.iterator(); 338 while (iter.hasNext()) 339 { 340 ConnectionEventListener l = (ConnectionEventListener)iter.next(); 341 try { 342 l.connectionEvent(event); 343 } catch(RuntimeException x) { 344 } 347 } 348 } 349 350 354 void sendEvent(int eventCode) 355 { 356 sendEvent(new ConnectionEvent(this, eventCode)); 357 } 358 359 369 public void connectionEvent(ConnectionEvent e) 370 { 371 if (e.getEventCode() == ConnectionEvent.CONNECTION_IO_EXCEPTION && 374 exceptionListener != null) 375 { 376 exceptionListener.onException(new JMSIOException("")); 377 } 378 379 if (e.isFailure()) 382 { 383 open = false; 384 sendEvent(e); 385 386 new Thread (new Runnable () { 387 public void run() { 388 while(!Thread.interrupted()) 389 { 390 try 391 { 392 reconnect(); 393 break; 394 } 395 catch (IOException iox) 396 { 397 log.error("",iox); 400 try 401 { 402 Thread.sleep(DEFAULT_TIMEOUT); 403 } 404 catch (InterruptedException e) {} 405 } 406 } 407 } 408 }, 409 "Connection Reconnector").start(); 410 } 411 } 412 413 public void output(IDatagram d, IOverflowHandler h) 415 throws IOException 416 { 417 if (!isOpen()) 418 throw new java.lang.IllegalStateException (); 419 420 synchronized(this) { 421 theConn.output(d, h); 422 } 423 } 424 425 428 public boolean controlSequence(IControlDatagram d, IOverflowHandler h) 429 throws IOException 430 { 431 return clientProc.controlSequence(d, h); 432 } 433 434 439 public IClientProcessor getClientProcessor() 440 { 441 return clientProc; 442 } 443 444 public String getId() {return theConn.getId();} 445 446 448 public ConnectionConsumer createConnectionConsumer(Destination p0, String p1, ServerSessionPool p2, int p3) throws JMSException 449 { 450 throw new JMSUnsupportedOperationException(); 451 } 452 453 public javax.jms.Session createSession(boolean transacted, int ackMode) throws JMSException 454 { 455 return new Session(this, factories.messageFactory(), ackMode); 456 } 457 458 public javax.jms.QueueSession createQueueSession(boolean transacted, int ackMode) throws javax.jms.JMSException 459 { 460 if (transacted) 461 throw new UnsupportedOperationException (); 462 if (!open) 463 throw new javax.jms.IllegalStateException ("not open"); 464 465 QueueSession qs = new QueueSession(this, this.factories.messageFactory(), ackMode); 466 addSession(qs); 467 return qs; 468 } 469 470 public ConnectionConsumer createConnectionConsumer(javax.jms.Queue p0, 471 String p1, 472 ServerSessionPool p2, 473 int p3) 474 throws JMSException 475 { 476 throw new JMSUnsupportedOperationException(); 477 } 478 479 480 public ConnectionConsumer createConnectionConsumer(Topic topic, 481 java.lang.String messageSelector, 482 ServerSessionPool sessionPool, 483 int maxMessages) 484 throws JMSException 485 { 486 throw new JMSUnsupportedOperationException(); 487 } 488 489 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 490 java.lang.String subscriptionName, 491 java.lang.String messageSelector, 492 ServerSessionPool sessionPool, 493 int maxMessages) 494 throws JMSException 495 { 496 throw new JMSUnsupportedOperationException(); 497 } 498 499 public javax.jms.TopicSession createTopicSession(boolean transacted, 500 int acknowledgeMode) 501 throws JMSException 502 { 503 if (transacted) 505 throw new JMSUnsupportedOperationException(); 506 if (!open) 507 throw new javax.jms.IllegalStateException ("not open"); 508 509 TopicSession ts = new TopicSession(this, 510 factories.messageFactory(), 511 acknowledgeMode); 512 addSession(ts); 513 return ts; 514 } 515 516 518 522 void addLocalSender(long senderId) 523 { 524 localSenders.add(new Long (senderId)); 525 } 526 527 530 void removeLocalSender(long senderId) 531 { 532 localSenders.remove(new Long (senderId)); 533 } 534 535 541 boolean isSenderLocal(long senderId) 542 { 543 return localSenders.contains(new Long (senderId)); 544 } 545 546 public String toString() 547 { 548 return connDescriptor.toString(); 549 } 550 } 551 | Popular Tags |