1 18 package org.apache.activemq.network.jms; 19 20 import org.apache.commons.logging.Log; 21 import org.apache.commons.logging.LogFactory; 22 23 import javax.jms.Connection ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Queue ; 27 import javax.jms.QueueConnection ; 28 import javax.jms.QueueConnectionFactory ; 29 import javax.jms.QueueSession ; 30 import javax.jms.Session ; 31 import javax.naming.NamingException ; 32 39 public class JmsQueueConnector extends JmsConnector{ 40 private static final Log log=LogFactory.getLog(JmsQueueConnector.class); 41 private String outboundQueueConnectionFactoryName; 42 private String localConnectionFactoryName; 43 private QueueConnectionFactory outboundQueueConnectionFactory; 44 private QueueConnectionFactory localQueueConnectionFactory; 45 private QueueConnection outboundQueueConnection; 46 private QueueConnection localQueueConnection; 47 private InboundQueueBridge[] inboundQueueBridges; 48 private OutboundQueueBridge[] outboundQueueBridges; 49 50 public boolean init(){ 51 boolean result=super.init(); 52 if(result){ 53 try{ 54 initializeForeignQueueConnection(); 55 initializeLocalQueueConnection(); 56 initializeInboundJmsMessageConvertor(); 57 initializeOutboundJmsMessageConvertor(); 58 initializeInboundQueueBridges(); 59 initializeOutboundQueueBridges(); 60 }catch(Exception e){ 61 log.error("Failed to initialize the JMSConnector",e); 62 } 63 } 64 return result; 65 } 66 67 68 69 72 public InboundQueueBridge[] getInboundQueueBridges(){ 73 return inboundQueueBridges; 74 } 75 76 80 public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges){ 81 this.inboundQueueBridges=inboundQueueBridges; 82 } 83 84 87 public OutboundQueueBridge[] getOutboundQueueBridges(){ 88 return outboundQueueBridges; 89 } 90 91 95 public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges){ 96 this.outboundQueueBridges=outboundQueueBridges; 97 } 98 99 102 public QueueConnectionFactory getLocalQueueConnectionFactory(){ 103 return localQueueConnectionFactory; 104 } 105 106 110 public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory){ 111 this.localQueueConnectionFactory=localConnectionFactory; 112 } 113 114 117 public QueueConnectionFactory getOutboundQueueConnectionFactory(){ 118 return outboundQueueConnectionFactory; 119 } 120 121 124 public String getOutboundQueueConnectionFactoryName(){ 125 return outboundQueueConnectionFactoryName; 126 } 127 128 132 public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName){ 133 this.outboundQueueConnectionFactoryName=foreignQueueConnectionFactoryName; 134 } 135 136 139 public String getLocalConnectionFactoryName(){ 140 return localConnectionFactoryName; 141 } 142 143 147 public void setLocalConnectionFactoryName(String localConnectionFactoryName){ 148 this.localConnectionFactoryName=localConnectionFactoryName; 149 } 150 151 154 public QueueConnection getLocalQueueConnection(){ 155 return localQueueConnection; 156 } 157 158 162 public void setLocalQueueConnection(QueueConnection localQueueConnection){ 163 this.localQueueConnection=localQueueConnection; 164 } 165 166 169 public QueueConnection getOutboundQueueConnection(){ 170 return outboundQueueConnection; 171 } 172 173 177 public void setOutboundQueueConnection(QueueConnection foreignQueueConnection){ 178 this.outboundQueueConnection=foreignQueueConnection; 179 } 180 181 185 public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory){ 186 this.outboundQueueConnectionFactory=foreignQueueConnectionFactory; 187 } 188 189 public void restartProducerConnection() throws NamingException , JMSException { 190 outboundQueueConnection = null; 191 initializeForeignQueueConnection(); 192 } 193 194 protected void initializeForeignQueueConnection() throws NamingException ,JMSException { 195 if(outboundQueueConnection==null){ 196 if(outboundQueueConnectionFactory==null){ 198 if(outboundQueueConnectionFactoryName!=null){ 200 outboundQueueConnectionFactory=(QueueConnectionFactory ) jndiOutboundTemplate.lookup( 201 outboundQueueConnectionFactoryName,QueueConnectionFactory .class); 202 if(outboundUsername!=null){ 203 outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername, 204 outboundPassword); 205 }else{ 206 outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(); 207 } 208 }else { 209 throw new JMSException ("Cannot create localConnection - no information"); 210 } 211 }else { 212 if(outboundUsername!=null){ 213 outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername, 214 outboundPassword); 215 }else{ 216 outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(); 217 } 218 } 219 } 220 outboundQueueConnection.start(); 221 } 222 223 protected void initializeLocalQueueConnection() throws NamingException ,JMSException { 224 if(localQueueConnection==null){ 225 if(localQueueConnectionFactory==null){ 227 if(embeddedConnectionFactory==null){ 228 if(localConnectionFactoryName!=null){ 230 localQueueConnectionFactory=(QueueConnectionFactory ) jndiLocalTemplate.lookup( 231 localConnectionFactoryName,QueueConnectionFactory .class); 232 if(localUsername!=null){ 233 localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername, 234 localPassword); 235 }else{ 236 localQueueConnection=localQueueConnectionFactory.createQueueConnection(); 237 } 238 }else { 239 throw new JMSException ("Cannot create localConnection - no information"); 240 } 241 }else{ 242 localQueueConnection = embeddedConnectionFactory.createQueueConnection(); 243 } 244 }else { 245 if(localUsername!=null){ 246 localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername, 247 localPassword); 248 }else{ 249 localQueueConnection=localQueueConnectionFactory.createQueueConnection(); 250 } 251 } 252 } 253 localQueueConnection.start(); 254 } 255 256 protected void initializeInboundJmsMessageConvertor(){ 257 inboundMessageConvertor.setConnection(localQueueConnection); 258 } 259 260 protected void initializeOutboundJmsMessageConvertor(){ 261 outboundMessageConvertor.setConnection(outboundQueueConnection); 262 } 263 264 protected void initializeInboundQueueBridges() throws JMSException { 265 if(inboundQueueBridges!=null){ 266 QueueSession outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 267 QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 268 for(int i=0;i<inboundQueueBridges.length;i++){ 269 InboundQueueBridge bridge=inboundQueueBridges[i]; 270 String localQueueName=bridge.getLocalQueueName(); 271 Queue activemqQueue=createActiveMQQueue(localSession,localQueueName); 272 String queueName = bridge.getInboundQueueName(); 273 Queue foreignQueue=createForeignQueue(outboundSession,queueName); 274 bridge.setConsumerQueue(foreignQueue); 275 bridge.setProducerQueue(activemqQueue); 276 bridge.setProducerConnection(localQueueConnection); 277 bridge.setConsumerConnection(outboundQueueConnection); 278 if(bridge.getJmsMessageConvertor()==null){ 279 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 280 } 281 bridge.setJmsConnector(this); 282 addInboundBridge(bridge); 283 } 284 outboundSession.close(); 285 localSession.close(); 286 } 287 } 288 289 protected void initializeOutboundQueueBridges() throws JMSException { 290 if(outboundQueueBridges!=null){ 291 QueueSession outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 292 QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 293 for(int i=0;i<outboundQueueBridges.length;i++){ 294 OutboundQueueBridge bridge=outboundQueueBridges[i]; 295 String localQueueName=bridge.getLocalQueueName(); 296 Queue activemqQueue=createActiveMQQueue(localSession,localQueueName); 297 String queueName=bridge.getOutboundQueueName(); 298 Queue foreignQueue=createForeignQueue(outboundSession,queueName); 299 bridge.setConsumerQueue(activemqQueue); 300 bridge.setProducerQueue(foreignQueue); 301 bridge.setProducerConnection(outboundQueueConnection); 302 bridge.setConsumerConnection(localQueueConnection); 303 if(bridge.getJmsMessageConvertor()==null){ 304 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 305 } 306 bridge.setJmsConnector(this); 307 addOutboundBridge(bridge); 308 } 309 outboundSession.close(); 310 localSession.close(); 311 } 312 } 313 314 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){ 315 Queue replyToProducerQueue =(Queue )destination; 316 boolean isInbound = replyToProducerConnection.equals(localQueueConnection); 317 318 if(isInbound){ 319 InboundQueueBridge bridge = (InboundQueueBridge) replyToBridges.get(replyToProducerQueue); 320 if (bridge == null){ 321 bridge = new InboundQueueBridge(){ 322 protected Destination processReplyToDestination (Destination destination){ 323 return null; 324 } 325 }; 326 try{ 327 QueueSession replyToConsumerSession = ((QueueConnection )replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 328 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 329 replyToConsumerSession.close(); 330 bridge.setConsumerQueue(replyToConsumerQueue); 331 bridge.setProducerQueue(replyToProducerQueue); 332 bridge.setProducerConnection((QueueConnection )replyToProducerConnection); 333 bridge.setConsumerConnection((QueueConnection )replyToConsumerConnection); 334 bridge.setDoHandleReplyTo(false); 335 if(bridge.getJmsMessageConvertor()==null){ 336 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 337 } 338 bridge.setJmsConnector(this); 339 bridge.start(); 340 log.info("Created replyTo bridge for " + replyToProducerQueue); 341 }catch(Exception e){ 342 log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); 343 return null; 344 } 345 replyToBridges.put(replyToProducerQueue, bridge); 346 } 347 return bridge.getConsumerQueue(); 348 }else{ 349 OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(replyToProducerQueue); 350 if (bridge == null){ 351 bridge = new OutboundQueueBridge(){ 352 protected Destination processReplyToDestination (Destination destination){ 353 return null; 354 } 355 }; 356 try{ 357 QueueSession replyToConsumerSession = ((QueueConnection )replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 358 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 359 replyToConsumerSession.close(); 360 bridge.setConsumerQueue(replyToConsumerQueue); 361 bridge.setProducerQueue(replyToProducerQueue); 362 bridge.setProducerConnection((QueueConnection )replyToProducerConnection); 363 bridge.setConsumerConnection((QueueConnection )replyToConsumerConnection); 364 bridge.setDoHandleReplyTo(false); 365 if(bridge.getJmsMessageConvertor()==null){ 366 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 367 } 368 bridge.setJmsConnector(this); 369 bridge.start(); 370 log.info("Created replyTo bridge for " + replyToProducerQueue); 371 }catch(Exception e){ 372 log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); 373 return null; 374 } 375 replyToBridges.put(replyToProducerQueue, bridge); 376 } 377 return bridge.getConsumerQueue(); 378 } 379 } 380 381 protected Queue createActiveMQQueue(QueueSession session,String queueName) throws JMSException { 382 return session.createQueue(queueName); 383 } 384 385 protected Queue createForeignQueue(QueueSession session,String queueName) throws JMSException { 386 Queue result = null; 387 try{ 388 result = session.createQueue(queueName); 389 }catch(JMSException e){ 390 try{ 392 result = (Queue ) jndiOutboundTemplate.lookup(queueName, Queue .class); 393 }catch(NamingException e1){ 394 String errStr = "Failed to look-up Queue for name: " + queueName; 395 log.error(errStr,e); 396 JMSException jmsEx = new JMSException (errStr); 397 jmsEx.setLinkedException(e1); 398 throw jmsEx; 399 } 400 } 401 return result; 402 } 403 404 405 } 406 | Popular Tags |