1 16 17 package org.apache.axis.transport.jms; 18 19 import org.apache.axis.components.jms.JMSVendorAdapter; 20 21 import javax.jms.Connection ; 22 import javax.jms.ConnectionFactory ; 23 import javax.jms.Destination ; 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageConsumer ; 27 import javax.jms.MessageListener ; 28 import javax.jms.Session ; 29 import javax.jms.TemporaryTopic ; 30 import javax.jms.Topic ; 31 import javax.jms.TopicConnection ; 32 import javax.jms.TopicConnectionFactory ; 33 import javax.jms.TopicPublisher ; 34 import javax.jms.TopicSession ; 35 import javax.jms.TopicSubscriber ; 36 import java.util.HashMap ; 37 38 46 public class TopicConnector extends JMSConnector 47 { 48 public TopicConnector(TopicConnectionFactory factory, 49 int numRetries, 50 int numSessions, 51 long connectRetryInterval, 52 long interactRetryInterval, 53 long timeoutTime, 54 boolean allowReceive, 55 String clientID, 56 String username, 57 String password, 58 JMSVendorAdapter adapter, 59 JMSURLHelper jmsurl) 60 throws JMSException 61 { 62 super(factory, numRetries, numSessions, connectRetryInterval, 63 interactRetryInterval, timeoutTime, allowReceive, 64 clientID, username, password, adapter, jmsurl); 65 } 66 67 protected Connection internalConnect(ConnectionFactory connectionFactory, 68 String username, String password) 69 throws JMSException 70 { 71 TopicConnectionFactory tcf = (TopicConnectionFactory )connectionFactory; 72 if(username == null) 73 return tcf.createTopicConnection(); 74 75 return tcf.createTopicConnection(username, password); 76 } 77 78 79 protected SyncConnection createSyncConnection(ConnectionFactory factory, 80 Connection connection, 81 int numSessions, 82 String threadName, 83 String clientID, 84 String username, 85 String password) 86 throws JMSException 87 { 88 return new TopicSyncConnection((TopicConnectionFactory )factory, 89 (TopicConnection )connection, numSessions, 90 threadName, clientID, username, password); 91 } 92 93 protected AsyncConnection createAsyncConnection(ConnectionFactory factory, 94 Connection connection, 95 String threadName, 96 String clientID, 97 String username, 98 String password) 99 throws JMSException 100 { 101 return new TopicAsyncConnection((TopicConnectionFactory )factory, 102 (TopicConnection )connection, threadName, 103 clientID, username, password); 104 } 105 106 public JMSEndpoint createEndpoint(String destination) 107 { 108 return new TopicEndpoint(destination); 109 } 110 111 118 public JMSEndpoint createEndpoint(Destination destination) 119 throws JMSException 120 { 121 if(!(destination instanceof Topic )) 122 throw new IllegalArgumentException ("The input be a topic for this connector"); 123 return new TopicDestinationEndpoint((Topic )destination); 124 } 125 126 private TopicSession createTopicSession(TopicConnection connection, int ackMode) 127 throws JMSException 128 { 129 return connection.createTopicSession(false, 130 ackMode); 131 } 132 133 private Topic createTopic(TopicSession session, String subject) 134 throws Exception 135 { 136 return m_adapter.getTopic(session, subject); 137 } 138 139 private TopicSubscriber createSubscriber(TopicSession session, 140 TopicSubscription subscription) 141 throws Exception 142 { 143 if(subscription.isDurable()) 144 return createDurableSubscriber(session, 145 (Topic )subscription.m_endpoint.getDestination(session), 146 subscription.m_subscriptionName, 147 subscription.m_messageSelector, 148 subscription.m_noLocal); 149 else 150 return createSubscriber(session, 151 (Topic )subscription.m_endpoint.getDestination(session), 152 subscription.m_messageSelector, 153 subscription.m_noLocal); 154 } 155 156 private TopicSubscriber createDurableSubscriber(TopicSession session, 157 Topic topic, 158 String subscriptionName, 159 String messageSelector, 160 boolean noLocal) 161 throws JMSException 162 { 163 return session.createDurableSubscriber(topic, subscriptionName, 164 messageSelector, noLocal); 165 } 166 167 private TopicSubscriber createSubscriber(TopicSession session, 168 Topic topic, 169 String messageSelector, 170 boolean noLocal) 171 throws JMSException 172 { 173 return session.createSubscriber(topic, messageSelector, noLocal); 174 } 175 176 177 178 179 private final class TopicAsyncConnection extends AsyncConnection 180 { 181 182 TopicAsyncConnection(TopicConnectionFactory connectionFactory, 183 TopicConnection connection, 184 String threadName, 185 String clientID, 186 String username, 187 String password) 188 189 throws JMSException 190 { 191 super(connectionFactory, connection, threadName, 192 clientID, username, password); 193 } 194 195 protected ListenerSession createListenerSession(javax.jms.Connection connection, 196 Subscription subscription) 197 throws Exception 198 { 199 TopicSession session = createTopicSession((TopicConnection )connection, 200 subscription.m_ackMode); 201 TopicSubscriber subscriber = createSubscriber(session, 202 (TopicSubscription)subscription); 203 return new TopicListenerSession(session, subscriber, 204 (TopicSubscription)subscription); 205 } 206 207 private final class TopicListenerSession extends ListenerSession 208 { 209 210 TopicListenerSession(TopicSession session, 211 TopicSubscriber subscriber, 212 TopicSubscription subscription) 213 throws Exception 214 { 215 super(session, subscriber, subscription); 216 } 217 218 void cleanup() 219 { 220 try{m_consumer.close();}catch(Exception ignore){} 221 try 222 { 223 TopicSubscription sub = (TopicSubscription)m_subscription; 224 if(sub.isDurable() && sub.m_unsubscribe) 225 { 226 ((TopicSession )m_session).unsubscribe(sub.m_subscriptionName); 227 } 228 } 229 catch(Exception ignore){} 230 try{m_session.close();}catch(Exception ignore){} 231 232 } 233 } 234 } 235 236 private final class TopicSyncConnection extends SyncConnection 237 { 238 TopicSyncConnection(TopicConnectionFactory connectionFactory, 239 TopicConnection connection, 240 int numSessions, 241 String threadName, 242 String clientID, 243 String username, 244 String password) 245 246 throws JMSException 247 { 248 super(connectionFactory, connection, numSessions, threadName, 249 clientID, username, password); 250 } 251 252 protected SendSession createSendSession(javax.jms.Connection connection) 253 throws JMSException 254 { 255 TopicSession session = createTopicSession((TopicConnection )connection, 256 JMSConstants.DEFAULT_ACKNOWLEDGE_MODE); 257 TopicPublisher publisher = session.createPublisher(null); 258 return new TopicSendSession(session, publisher); 259 } 260 261 private final class TopicSendSession extends SendSession 262 { 263 TopicSendSession(TopicSession session, 264 TopicPublisher publisher) 265 throws JMSException 266 { 267 super(session, publisher); 268 } 269 270 271 protected MessageConsumer createConsumer(Destination destination) 272 throws JMSException 273 { 274 return createSubscriber((TopicSession )m_session, (Topic )destination, 275 null, JMSConstants.DEFAULT_NO_LOCAL); 276 } 277 278 protected void deleteTemporaryDestination(Destination destination) 279 throws JMSException 280 { 281 ((TemporaryTopic )destination).delete(); 282 } 283 284 285 protected Destination createTemporaryDestination() 286 throws JMSException 287 { 288 return ((TopicSession )m_session).createTemporaryTopic(); 289 } 290 291 protected void send(Destination destination, Message message, 292 int deliveryMode, int priority, long timeToLive) 293 throws JMSException 294 { 295 ((TopicPublisher )m_producer).publish((Topic )destination, message, 296 deliveryMode, priority, timeToLive); 297 } 298 299 } 300 } 301 302 303 304 private class TopicEndpoint 305 extends JMSEndpoint 306 { 307 String m_topicName; 308 309 TopicEndpoint(String topicName) 310 { 311 super(TopicConnector.this); 312 m_topicName = topicName; 313 } 314 315 Destination getDestination(Session session) 316 throws Exception 317 { 318 return createTopic((TopicSession )session, m_topicName); 319 } 320 321 protected Subscription createSubscription(MessageListener listener, 322 HashMap properties) 323 { 324 return new TopicSubscription(listener, this, properties); 325 } 326 327 public String toString() 328 { 329 StringBuffer buffer = new StringBuffer ("TopicEndpoint:"); 330 buffer.append(m_topicName); 331 return buffer.toString(); 332 } 333 334 public boolean equals(Object object) 335 { 336 if(!super.equals(object)) 337 return false; 338 339 if(!(object instanceof TopicEndpoint)) 340 return false; 341 342 return m_topicName.equals(((TopicEndpoint)object).m_topicName); 343 } 344 } 345 346 private final class TopicSubscription extends Subscription 347 { 348 String m_subscriptionName; 349 boolean m_unsubscribe; 350 boolean m_noLocal; 351 352 TopicSubscription(MessageListener listener, 353 JMSEndpoint endpoint, 354 HashMap properties) 355 { 356 super(listener, endpoint, properties); 357 m_subscriptionName = MapUtils.removeStringProperty(properties, 358 JMSConstants.SUBSCRIPTION_NAME, 359 null); 360 m_unsubscribe = MapUtils.removeBooleanProperty(properties, 361 JMSConstants.UNSUBSCRIBE, 362 JMSConstants.DEFAULT_UNSUBSCRIBE); 363 m_noLocal = MapUtils.removeBooleanProperty(properties, 364 JMSConstants.NO_LOCAL, 365 JMSConstants.DEFAULT_NO_LOCAL); 366 } 367 368 boolean isDurable() 369 { 370 return m_subscriptionName != null; 371 } 372 373 public boolean equals(Object obj) 374 { 375 if(!super.equals(obj)) 376 return false; 377 if(!(obj instanceof TopicSubscription)) 378 return false; 379 380 TopicSubscription other = (TopicSubscription)obj; 381 if(other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal) 382 return false; 383 384 if(isDurable()) 385 { 386 return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName); 387 } 388 else if(other.isDurable()) 389 return false; 390 else 391 return true; 392 } 393 394 public String toString() 395 { 396 StringBuffer buffer = new StringBuffer (super.toString()); 397 buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe); 398 if(isDurable()) 399 { 400 buffer.append(":"); 401 buffer.append(m_subscriptionName); 402 } 403 return buffer.toString(); 404 } 405 406 } 407 408 private final class TopicDestinationEndpoint 409 extends TopicEndpoint 410 { 411 Topic m_topic; 412 413 TopicDestinationEndpoint(Topic topic) 414 throws JMSException 415 { 416 super(topic.getTopicName()); 417 m_topic = topic; 418 } 419 420 Destination getDestination(Session session) 421 { 422 return m_topic; 423 } 424 425 } 426 427 428 } | Popular Tags |