1 55 56 package org.jboss.axis.transport.jms; 57 58 import org.jboss.axis.components.jms.JMSVendorAdapter; 59 60 import javax.jms.Connection ; 61 import javax.jms.ConnectionFactory ; 62 import javax.jms.Destination ; 63 import javax.jms.JMSException ; 64 import javax.jms.Message ; 65 import javax.jms.MessageConsumer ; 66 import javax.jms.MessageListener ; 67 import javax.jms.Session ; 68 import javax.jms.TemporaryTopic ; 69 import javax.jms.Topic ; 70 import javax.jms.TopicConnection ; 71 import javax.jms.TopicConnectionFactory ; 72 import javax.jms.TopicPublisher ; 73 import javax.jms.TopicSession ; 74 import javax.jms.TopicSubscriber ; 75 import java.util.HashMap ; 76 77 85 public class TopicConnector extends JMSConnector 86 { 87 public TopicConnector(TopicConnectionFactory factory, 88 int numRetries, 89 int numSessions, 90 long connectRetryInterval, 91 long interactRetryInterval, 92 long timeoutTime, 93 boolean allowReceive, 94 String clientID, 95 String username, 96 String password, 97 JMSVendorAdapter adapter) 98 throws JMSException 99 { 100 super(factory, numRetries, numSessions, connectRetryInterval, 101 interactRetryInterval, timeoutTime, allowReceive, 102 clientID, username, password, adapter); 103 } 104 105 protected Connection internalConnect(ConnectionFactory connectionFactory, 106 String username, String password) 107 throws JMSException 108 { 109 TopicConnectionFactory tcf = (TopicConnectionFactory )connectionFactory; 110 if (username == null) 111 return tcf.createTopicConnection(); 112 113 return tcf.createTopicConnection(username, password); 114 } 115 116 117 protected SyncConnection createSyncConnection(ConnectionFactory factory, 118 Connection connection, 119 int numSessions, 120 String threadName, 121 String clientID, 122 String username, 123 String password) 124 throws JMSException 125 { 126 return new TopicSyncConnection((TopicConnectionFactory )factory, 127 (TopicConnection )connection, numSessions, 128 threadName, clientID, username, password); 129 } 130 131 protected AsyncConnection createAsyncConnection(ConnectionFactory factory, 132 Connection connection, 133 String threadName, 134 String clientID, 135 String username, 136 String password) 137 throws JMSException 138 { 139 return new TopicAsyncConnection((TopicConnectionFactory )factory, 140 (TopicConnection )connection, threadName, 141 clientID, username, password); 142 } 143 144 public JMSEndpoint createEndpoint(String destination) 145 { 146 return new TopicEndpoint(destination); 147 } 148 149 156 public JMSEndpoint createEndpoint(Destination destination) 157 throws JMSException 158 { 159 if (!(destination instanceof Topic )) 160 throw new IllegalArgumentException ("The input be a topic for this connector"); 161 return new TopicDestinationEndpoint((Topic )destination); 162 } 163 164 private TopicSession createTopicSession(TopicConnection connection, int ackMode) 165 throws JMSException 166 { 167 return connection.createTopicSession(false, 168 ackMode); 169 } 170 171 private Topic createTopic(TopicSession session, String subject) 172 throws Exception 173 { 174 return m_adapter.getTopic(session, subject); 175 } 176 177 private TopicSubscriber createSubscriber(TopicSession session, 178 TopicSubscription subscription) 179 throws Exception 180 { 181 if (subscription.isDurable()) 182 return createDurableSubscriber(session, 183 (Topic )subscription.m_endpoint.getDestination(session), 184 subscription.m_subscriptionName, 185 subscription.m_messageSelector, 186 subscription.m_noLocal); 187 else 188 return createSubscriber(session, 189 (Topic )subscription.m_endpoint.getDestination(session), 190 subscription.m_messageSelector, 191 subscription.m_noLocal); 192 } 193 194 private TopicSubscriber createDurableSubscriber(TopicSession session, 195 Topic topic, 196 String subscriptionName, 197 String messageSelector, 198 boolean noLocal) 199 throws JMSException 200 { 201 return session.createDurableSubscriber(topic, subscriptionName, 202 messageSelector, noLocal); 203 } 204 205 private TopicSubscriber createSubscriber(TopicSession session, 206 Topic topic, 207 String messageSelector, 208 boolean noLocal) 209 throws JMSException 210 { 211 return session.createSubscriber(topic, messageSelector, noLocal); 212 } 213 214 215 private final class TopicAsyncConnection extends AsyncConnection 216 { 217 218 TopicAsyncConnection(TopicConnectionFactory connectionFactory, 219 TopicConnection connection, 220 String threadName, 221 String clientID, 222 String username, 223 String password) 224 225 throws JMSException 226 { 227 super(connectionFactory, connection, threadName, 228 clientID, username, password); 229 } 230 231 protected ListenerSession createListenerSession(javax.jms.Connection connection, 232 Subscription subscription) 233 throws Exception 234 { 235 TopicSession session = createTopicSession((TopicConnection )connection, 236 subscription.m_ackMode); 237 TopicSubscriber subscriber = createSubscriber(session, 238 (TopicSubscription)subscription); 239 return new TopicListenerSession(session, subscriber, 240 (TopicSubscription)subscription); 241 } 242 243 private final class TopicListenerSession extends ListenerSession 244 { 245 246 TopicListenerSession(TopicSession session, 247 TopicSubscriber subscriber, 248 TopicSubscription subscription) 249 throws Exception 250 { 251 super(session, subscriber, subscription); 252 } 253 254 void cleanup() 255 { 256 try 257 { 258 m_consumer.close(); 259 } 260 catch (Exception ignore) 261 { 262 } 263 try 264 { 265 TopicSubscription sub = (TopicSubscription)m_subscription; 266 if (sub.isDurable() && sub.m_unsubscribe) 267 { 268 ((TopicSession )m_session).unsubscribe(sub.m_subscriptionName); 269 } 270 } 271 catch (Exception ignore) 272 { 273 } 274 try 275 { 276 m_session.close(); 277 } 278 catch (Exception ignore) 279 { 280 } 281 282 } 283 } 284 } 285 286 private final class TopicSyncConnection extends SyncConnection 287 { 288 TopicSyncConnection(TopicConnectionFactory connectionFactory, 289 TopicConnection connection, 290 int numSessions, 291 String threadName, 292 String clientID, 293 String username, 294 String password) 295 296 throws JMSException 297 { 298 super(connectionFactory, connection, numSessions, threadName, 299 clientID, username, password); 300 } 301 302 protected SendSession createSendSession(javax.jms.Connection connection) 303 throws JMSException 304 { 305 TopicSession session = createTopicSession((TopicConnection )connection, 306 JMSConstants.DEFAULT_ACKNOWLEDGE_MODE); 307 TopicPublisher publisher = session.createPublisher(null); 308 return new TopicSendSession(session, publisher); 309 } 310 311 private final class TopicSendSession extends SendSession 312 { 313 TopicSendSession(TopicSession session, 314 TopicPublisher publisher) 315 throws JMSException 316 { 317 super(session, publisher); 318 } 319 320 321 protected MessageConsumer createConsumer(Destination destination) 322 throws JMSException 323 { 324 return createSubscriber((TopicSession )m_session, (Topic )destination, 325 null, JMSConstants.DEFAULT_NO_LOCAL); 326 } 327 328 protected void deleteTemporaryDestination(Destination destination) 329 throws JMSException 330 { 331 ((TemporaryTopic )destination).delete(); 332 } 333 334 335 protected Destination createTemporaryDestination() 336 throws JMSException 337 { 338 return ((TopicSession )m_session).createTemporaryTopic(); 339 } 340 341 protected void send(Destination destination, Message message, 342 int deliveryMode, int priority, long timeToLive) 343 throws JMSException 344 { 345 ((TopicPublisher )m_producer).publish((Topic )destination, message, 346 deliveryMode, priority, timeToLive); 347 } 348 349 } 350 } 351 352 353 private class TopicEndpoint 354 extends JMSEndpoint 355 { 356 String m_topicName; 357 358 TopicEndpoint(String topicName) 359 { 360 super(TopicConnector.this); 361 m_topicName = topicName; 362 } 363 364 Destination getDestination(Session session) 365 throws Exception 366 { 367 return createTopic((TopicSession )session, m_topicName); 368 } 369 370 protected Subscription createSubscription(MessageListener listener, 371 HashMap properties) 372 { 373 return new TopicSubscription(listener, this, properties); 374 } 375 376 public String toString() 377 { 378 StringBuffer buffer = new StringBuffer ("TopicEndpoint:"); 379 buffer.append(m_topicName); 380 return buffer.toString(); 381 } 382 383 public boolean equals(Object object) 384 { 385 if (!super.equals(object)) 386 return false; 387 388 if (!(object instanceof TopicEndpoint)) 389 return false; 390 391 return m_topicName.equals(((TopicEndpoint)object).m_topicName); 392 } 393 } 394 395 private final class TopicSubscription extends Subscription 396 { 397 String m_subscriptionName; 398 boolean m_unsubscribe; 399 boolean m_noLocal; 400 401 TopicSubscription(MessageListener listener, 402 JMSEndpoint endpoint, 403 HashMap properties) 404 { 405 super(listener, endpoint, properties); 406 m_subscriptionName = MapUtils.removeStringProperty(properties, 407 JMSConstants.SUBSCRIPTION_NAME, 408 null); 409 m_unsubscribe = MapUtils.removeBooleanProperty(properties, 410 JMSConstants.UNSUBSCRIBE, 411 JMSConstants.DEFAULT_UNSUBSCRIBE); 412 m_noLocal = MapUtils.removeBooleanProperty(properties, 413 JMSConstants.NO_LOCAL, 414 JMSConstants.DEFAULT_NO_LOCAL); 415 } 416 417 boolean isDurable() 418 { 419 return m_subscriptionName != null; 420 } 421 422 public boolean equals(Object obj) 423 { 424 if (!super.equals(obj)) 425 return false; 426 if (!(obj instanceof TopicSubscription)) 427 return false; 428 429 TopicSubscription other = (TopicSubscription)obj; 430 if (other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal) 431 return false; 432 433 if (isDurable()) 434 { 435 return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName); 436 } 437 else if (other.isDurable()) 438 return false; 439 else 440 return true; 441 } 442 443 public String toString() 444 { 445 StringBuffer buffer = new StringBuffer (super.toString()); 446 buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe); 447 if (isDurable()) 448 { 449 buffer.append(":"); 450 buffer.append(m_subscriptionName); 451 } 452 return buffer.toString(); 453 } 454 455 } 456 457 private final class TopicDestinationEndpoint 458 extends TopicEndpoint 459 { 460 Topic m_topic; 461 462 TopicDestinationEndpoint(Topic topic) 463 throws JMSException 464 { 465 super(topic.getTopicName()); 466 m_topic = topic; 467 } 468 469 Destination getDestination(Session session) 470 { 471 return m_topic; 472 } 473 474 } 475 476 477 } | Popular Tags |