1 18 package org.apache.activemq.network.jms; 19 20 import javax.jms.Connection ; 21 import javax.jms.Destination ; 22 import javax.jms.JMSException ; 23 import javax.jms.Session ; 24 import javax.jms.Topic ; 25 import javax.jms.TopicConnection ; 26 import javax.jms.TopicConnectionFactory ; 27 import javax.jms.TopicSession ; 28 import javax.naming.NamingException ; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 33 40 public class JmsTopicConnector extends JmsConnector{ 41 private static final Log log=LogFactory.getLog(JmsTopicConnector.class); 42 private String outboundTopicConnectionFactoryName; 43 private String localConnectionFactoryName; 44 private TopicConnectionFactory outboundTopicConnectionFactory; 45 private TopicConnectionFactory localTopicConnectionFactory; 46 private TopicConnection outboundTopicConnection; 47 private TopicConnection localTopicConnection; 48 private InboundTopicBridge[] inboundTopicBridges; 49 private OutboundTopicBridge[] outboundTopicBridges; 50 51 public boolean init(){ 52 boolean result=super.init(); 53 if(result){ 54 try{ 55 initializeForeignTopicConnection(); 56 initializeLocalTopicConnection(); 57 initializeInboundJmsMessageConvertor(); 58 initializeOutboundJmsMessageConvertor(); 59 initializeInboundTopicBridges(); 60 initializeOutboundTopicBridges(); 61 }catch(Exception e){ 62 log.error("Failed to initialize the JMSConnector",e); 63 } 64 } 65 return result; 66 } 67 68 69 70 73 public InboundTopicBridge[] getInboundTopicBridges(){ 74 return inboundTopicBridges; 75 } 76 77 81 public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges){ 82 this.inboundTopicBridges=inboundTopicBridges; 83 } 84 85 88 public OutboundTopicBridge[] getOutboundTopicBridges(){ 89 return outboundTopicBridges; 90 } 91 92 96 public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges){ 97 this.outboundTopicBridges=outboundTopicBridges; 98 } 99 100 103 public TopicConnectionFactory getLocalTopicConnectionFactory(){ 104 return localTopicConnectionFactory; 105 } 106 107 111 public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory){ 112 this.localTopicConnectionFactory=localConnectionFactory; 113 } 114 115 118 public TopicConnectionFactory getOutboundTopicConnectionFactory(){ 119 return outboundTopicConnectionFactory; 120 } 121 122 125 public String getOutboundTopicConnectionFactoryName(){ 126 return outboundTopicConnectionFactoryName; 127 } 128 129 133 public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName){ 134 this.outboundTopicConnectionFactoryName=foreignTopicConnectionFactoryName; 135 } 136 137 140 public String getLocalConnectionFactoryName(){ 141 return localConnectionFactoryName; 142 } 143 144 148 public void setLocalConnectionFactoryName(String localConnectionFactoryName){ 149 this.localConnectionFactoryName=localConnectionFactoryName; 150 } 151 152 155 public TopicConnection getLocalTopicConnection(){ 156 return localTopicConnection; 157 } 158 159 163 public void setLocalTopicConnection(TopicConnection localTopicConnection){ 164 this.localTopicConnection=localTopicConnection; 165 } 166 167 170 public TopicConnection getOutboundTopicConnection(){ 171 return outboundTopicConnection; 172 } 173 174 178 public void setOutboundTopicConnection(TopicConnection foreignTopicConnection){ 179 this.outboundTopicConnection=foreignTopicConnection; 180 } 181 182 186 public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory){ 187 this.outboundTopicConnectionFactory=foreignTopicConnectionFactory; 188 } 189 190 191 public void restartProducerConnection() throws NamingException , JMSException { 192 outboundTopicConnection = null; 193 initializeForeignTopicConnection(); 194 } 195 196 protected void initializeForeignTopicConnection() throws NamingException ,JMSException { 197 if(outboundTopicConnection==null){ 198 if(outboundTopicConnectionFactory==null){ 200 if(outboundTopicConnectionFactoryName!=null){ 202 outboundTopicConnectionFactory=(TopicConnectionFactory ) jndiOutboundTemplate.lookup( 203 outboundTopicConnectionFactoryName,TopicConnectionFactory .class); 204 if(outboundUsername!=null){ 205 outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername, 206 outboundPassword); 207 }else{ 208 outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(); 209 } 210 }else { 211 throw new JMSException ("Cannot create localConnection - no information"); 212 } 213 }else { 214 if(outboundUsername!=null){ 215 outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername, 216 outboundPassword); 217 }else{ 218 outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(); 219 } 220 } 221 } 222 outboundTopicConnection.start(); 223 } 224 225 protected void initializeLocalTopicConnection() throws NamingException ,JMSException { 226 if(localTopicConnection==null){ 227 if(localTopicConnectionFactory==null){ 229 if(embeddedConnectionFactory==null){ 230 if(localConnectionFactoryName!=null){ 232 localTopicConnectionFactory=(TopicConnectionFactory ) jndiLocalTemplate.lookup( 233 localConnectionFactoryName,TopicConnectionFactory .class); 234 if(localUsername!=null){ 235 localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername, 236 localPassword); 237 }else{ 238 localTopicConnection=localTopicConnectionFactory.createTopicConnection(); 239 } 240 }else { 241 throw new JMSException ("Cannot create localConnection - no information"); 242 } 243 }else{ 244 localTopicConnection = embeddedConnectionFactory.createTopicConnection(); 245 } 246 }else { 247 if(localUsername!=null){ 248 localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername, 249 localPassword); 250 }else{ 251 localTopicConnection=localTopicConnectionFactory.createTopicConnection(); 252 } 253 } 254 } 255 localTopicConnection.start(); 256 } 257 258 protected void initializeInboundJmsMessageConvertor(){ 259 inboundMessageConvertor.setConnection(localTopicConnection); 260 } 261 262 protected void initializeOutboundJmsMessageConvertor(){ 263 outboundMessageConvertor.setConnection(outboundTopicConnection); 264 } 265 266 protected void initializeInboundTopicBridges() throws JMSException { 267 if(inboundTopicBridges!=null){ 268 TopicSession outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 269 TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 270 for(int i=0;i<inboundTopicBridges.length;i++){ 271 InboundTopicBridge bridge=inboundTopicBridges[i]; 272 String localTopicName=bridge.getLocalTopicName(); 273 Topic activemqTopic=createActiveMQTopic(localSession,localTopicName); 274 String topicName=bridge.getInboundTopicName(); 275 Topic foreignTopic=createForeignTopic(outboundSession,topicName); 276 bridge.setConsumerTopic(foreignTopic); 277 bridge.setProducerTopic(activemqTopic); 278 bridge.setProducerConnection(localTopicConnection); 279 bridge.setConsumerConnection(outboundTopicConnection); 280 if(bridge.getJmsMessageConvertor()==null){ 281 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 282 } 283 bridge.setJmsConnector(this); 284 addInboundBridge(bridge); 285 } 286 outboundSession.close(); 287 localSession.close(); 288 } 289 } 290 291 protected void initializeOutboundTopicBridges() throws JMSException { 292 if(outboundTopicBridges!=null){ 293 TopicSession outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 294 TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 295 for(int i=0;i<outboundTopicBridges.length;i++){ 296 OutboundTopicBridge bridge=outboundTopicBridges[i]; 297 String localTopicName=bridge.getLocalTopicName(); 298 Topic activemqTopic=createActiveMQTopic(localSession,localTopicName); 299 String topicName=bridge.getOutboundTopicName(); 300 Topic foreignTopic=createForeignTopic(outboundSession,topicName); 301 bridge.setConsumerTopic(activemqTopic); 302 bridge.setProducerTopic(foreignTopic); 303 bridge.setProducerConnection(outboundTopicConnection); 304 bridge.setConsumerConnection(localTopicConnection); 305 if(bridge.getJmsMessageConvertor()==null){ 306 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 307 } 308 bridge.setJmsConnector(this); 309 addOutboundBridge(bridge); 310 } 311 outboundSession.close(); 312 localSession.close(); 313 } 314 } 315 316 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){ 317 Topic replyToProducerTopic =(Topic )destination; 318 boolean isInbound = replyToProducerConnection.equals(localTopicConnection); 319 320 if(isInbound){ 321 InboundTopicBridge bridge = (InboundTopicBridge) replyToBridges.get(replyToProducerTopic); 322 if (bridge == null){ 323 bridge = new InboundTopicBridge(){ 324 protected Destination processReplyToDestination (Destination destination){ 325 return null; 326 } 327 }; 328 try{ 329 TopicSession replyToConsumerSession = ((TopicConnection )replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 330 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 331 replyToConsumerSession.close(); 332 bridge.setConsumerTopic(replyToConsumerTopic); 333 bridge.setProducerTopic(replyToProducerTopic); 334 bridge.setProducerConnection((TopicConnection )replyToProducerConnection); 335 bridge.setConsumerConnection((TopicConnection )replyToConsumerConnection); 336 bridge.setDoHandleReplyTo(false); 337 if(bridge.getJmsMessageConvertor()==null){ 338 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 339 } 340 bridge.setJmsConnector(this); 341 bridge.start(); 342 log.info("Created replyTo bridge for " + replyToProducerTopic); 343 }catch(Exception e){ 344 log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); 345 return null; 346 } 347 replyToBridges.put(replyToProducerTopic, bridge); 348 } 349 return bridge.getConsumerTopic(); 350 }else{ 351 OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(replyToProducerTopic); 352 if (bridge == null){ 353 bridge = new OutboundTopicBridge(){ 354 protected Destination processReplyToDestination (Destination destination){ 355 return null; 356 } 357 }; 358 try{ 359 TopicSession replyToConsumerSession = ((TopicConnection )replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 360 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 361 replyToConsumerSession.close(); 362 bridge.setConsumerTopic(replyToConsumerTopic); 363 bridge.setProducerTopic(replyToProducerTopic); 364 bridge.setProducerConnection((TopicConnection )replyToProducerConnection); 365 bridge.setConsumerConnection((TopicConnection )replyToConsumerConnection); 366 bridge.setDoHandleReplyTo(false); 367 if(bridge.getJmsMessageConvertor()==null){ 368 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 369 } 370 bridge.setJmsConnector(this); 371 bridge.start(); 372 log.info("Created replyTo bridge for " + replyToProducerTopic); 373 }catch(Exception e){ 374 log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); 375 return null; 376 } 377 replyToBridges.put(replyToProducerTopic, bridge); 378 } 379 return bridge.getConsumerTopic(); 380 } 381 } 382 383 protected Topic createActiveMQTopic(TopicSession session,String topicName) throws JMSException { 384 return session.createTopic(topicName); 385 } 386 387 protected Topic createForeignTopic(TopicSession session,String topicName) throws JMSException { 388 Topic result = null; 389 try{ 390 result = session.createTopic(topicName); 391 }catch(JMSException e){ 392 try{ 394 result = (Topic ) jndiOutboundTemplate.lookup(topicName, Topic .class); 395 }catch(NamingException e1){ 396 String errStr = "Failed to look-up Topic for name: " + topicName; 397 log.error(errStr,e); 398 JMSException jmsEx = new JMSException (errStr); 399 jmsEx.setLinkedException(e1); 400 throw jmsEx; 401 } 402 } 403 return result; 404 } 405 406 407 } 408 | Popular Tags |