1 18 package org.apache.activemq.network; 19 20 import java.io.IOException ; 21 22 import org.apache.activemq.advisory.AdvisorySupport; 23 import org.apache.activemq.command.ActiveMQDestination; 24 import org.apache.activemq.command.ActiveMQTempDestination; 25 import org.apache.activemq.command.ActiveMQTopic; 26 import org.apache.activemq.command.BrokerId; 27 import org.apache.activemq.command.BrokerInfo; 28 import org.apache.activemq.command.Command; 29 import org.apache.activemq.command.ConnectionError; 30 import org.apache.activemq.command.ConnectionId; 31 import org.apache.activemq.command.ConnectionInfo; 32 import org.apache.activemq.command.ConsumerId; 33 import org.apache.activemq.command.ConsumerInfo; 34 import org.apache.activemq.command.DataStructure; 35 import org.apache.activemq.command.DestinationInfo; 36 import org.apache.activemq.command.ExceptionResponse; 37 import org.apache.activemq.command.KeepAliveInfo; 38 import org.apache.activemq.command.Message; 39 import org.apache.activemq.command.MessageAck; 40 import org.apache.activemq.command.MessageDispatch; 41 import org.apache.activemq.command.NetworkBridgeFilter; 42 import org.apache.activemq.command.ProducerInfo; 43 import org.apache.activemq.command.RemoveInfo; 44 import org.apache.activemq.command.Response; 45 import org.apache.activemq.command.SessionInfo; 46 import org.apache.activemq.command.ShutdownInfo; 47 import org.apache.activemq.command.WireFormatInfo; 48 import org.apache.activemq.filter.DestinationFilter; 49 import org.apache.activemq.transport.DefaultTransportListener; 50 import org.apache.activemq.transport.FutureResponse; 51 import org.apache.activemq.transport.ResponseCallback; 52 import org.apache.activemq.transport.Transport; 53 import org.apache.activemq.transport.TransportDisposedIOException; 54 import org.apache.activemq.transport.TransportListener; 55 import org.apache.activemq.util.IdGenerator; 56 import org.apache.activemq.util.IntrospectionSupport; 57 import org.apache.activemq.util.LongSequenceGenerator; 58 import org.apache.activemq.util.MarshallingSupport; 59 import org.apache.activemq.util.ServiceStopper; 60 import org.apache.activemq.util.ServiceSupport; 61 import org.apache.commons.logging.Log; 62 import org.apache.commons.logging.LogFactory; 63 64 import java.security.GeneralSecurityException ; 65 import java.util.Properties ; 66 import java.util.concurrent.ConcurrentHashMap ; 67 import java.util.concurrent.CountDownLatch ; 68 import java.util.concurrent.atomic.AtomicBoolean ; 69 70 75 public abstract class DemandForwardingBridgeSupport implements NetworkBridge { 76 protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class); 77 protected final Transport localBroker; 78 protected final Transport remoteBroker; 79 protected final IdGenerator idGenerator = new IdGenerator(); 80 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 81 protected ConnectionInfo localConnectionInfo; 82 protected ConnectionInfo remoteConnectionInfo; 83 protected SessionInfo localSessionInfo; 84 protected ProducerInfo producerInfo; 85 protected String remoteBrokerName = "Unknown"; 86 protected String localClientId; 87 protected ConsumerInfo demandConsumerInfo; 88 protected int demandConsumerDispatched; 89 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean (false); 90 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean (false); 91 protected boolean disposed = false; 92 protected BrokerId localBrokerId; 93 protected ActiveMQDestination[] excludedDestinations; 94 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 95 protected ActiveMQDestination[] staticallyIncludedDestinations; 96 protected ActiveMQDestination[] durableDestinations; 97 protected final ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap (); 98 protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap (); 99 protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; 100 protected CountDownLatch startedLatch = new CountDownLatch (2); 101 protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch (1); 102 protected final AtomicBoolean remoteInterupted = new AtomicBoolean (false); 103 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean (false); 104 protected NetworkBridgeConfiguration configuration; 105 private NetworkBridgeFailedListener bridgeFailedListener; 106 private boolean createdByDuplex; 107 108 109 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 110 this.configuration=configuration; 111 this.localBroker = localBroker; 112 this.remoteBroker = remoteBroker; 113 } 114 115 public void start() throws Exception { 116 localBroker.setTransportListener(new DefaultTransportListener(){ 117 public void onCommand(Object o){ 118 Command command = (Command) o; 119 serviceLocalCommand(command); 120 } 121 122 public void onException(IOException error){ 123 serviceLocalException(error); 124 } 125 }); 126 remoteBroker.setTransportListener(new TransportListener(){ 127 public void onCommand(Object o){ 128 Command command = (Command) o; 129 serviceRemoteCommand(command); 130 } 131 132 public void onException(IOException error){ 133 serviceRemoteException(error); 134 } 135 136 public void transportInterupted(){ 137 if( remoteInterupted.compareAndSet(false, true) ) { 139 140 log.info("Outbound transport to " + remoteBrokerName + " interrupted."); 141 142 if( localBridgeStarted.get() ) { 143 clearDownSubscriptions(); 144 synchronized( DemandForwardingBridgeSupport.this ) { 145 try{ 146 localBroker.oneway(localConnectionInfo.createRemoveCommand()); 147 }catch(TransportDisposedIOException td){ 148 log.debug("local broker is now disposed",td); 149 } 150 catch(IOException e){ 151 log.warn("Caught exception from local start",e); 152 } 153 } 154 } 155 156 localBridgeStarted.set(false); 157 remoteBridgeStarted.set(false); 158 startedLatch = new CountDownLatch (2); 159 } 160 161 } 162 163 public void transportResumed(){ 164 if( remoteInterupted.compareAndSet(true, false) ) { 165 166 if( !lastConnectSucceeded.get() ) { 169 try { 170 log.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop."); 171 Thread.sleep(1000); 172 } catch (InterruptedException e) { 173 Thread.currentThread().interrupt(); 174 } 175 } 176 lastConnectSucceeded.set(false); 177 try { 178 startLocalBridge(); 179 remoteBridgeStarted.set(true); 180 startedLatch.countDown(); 181 log.info("Outbound transport to " + remoteBrokerName + " resumed"); 182 }catch(Exception e) { 183 log.error("Caught exception from local start in resume transport",e ); 184 } 185 186 } 187 } 188 }); 189 localBroker.start(); 190 remoteBroker.start(); 191 192 try{ 193 triggerRemoteStartBridge(); 194 }catch(IOException e){ 195 log.warn("Caught exception from remote start",e); 196 } 197 } 198 199 protected void triggerLocalStartBridge() throws IOException { 200 Thread thead=new Thread (){ 201 public void run(){ 202 try{ 203 startLocalBridge(); 204 }catch(Exception e){ 205 serviceLocalException(e); 206 } 207 } 208 }; 209 thead.start(); 210 } 211 212 protected void triggerRemoteStartBridge() throws IOException { 213 Thread thead=new Thread (){ 214 public void run(){ 215 try{ 216 startRemoteBridge(); 217 }catch(Exception e){ 218 serviceRemoteException(e); 219 } 220 } 221 }; 222 thead.start(); 223 } 224 225 protected void startLocalBridge() throws Exception { 226 if(localBridgeStarted.compareAndSet(false,true)){ 227 synchronized( this ) { 228 229 remoteBrokerNameKnownLatch.await(); 230 231 localConnectionInfo=new ConnectionInfo(); 232 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 233 localClientId="NC_"+remoteBrokerName+"_inbound"+configuration.getBrokerName(); 234 localConnectionInfo.setClientId(localClientId); 235 localConnectionInfo.setUserName(configuration.getUserName()); 236 localConnectionInfo.setPassword(configuration.getPassword()); 237 localBroker.oneway(localConnectionInfo); 238 239 localSessionInfo=new SessionInfo(localConnectionInfo,1); 240 localBroker.oneway(localSessionInfo); 241 242 log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName 243 +") has been established."); 244 245 startedLatch.countDown(); 246 setupStaticDestinations(); 247 } 248 } 249 } 250 251 protected void startRemoteBridge() throws Exception { 252 if(remoteBridgeStarted.compareAndSet(false,true)) { 253 254 synchronized (this) { 255 256 if( remoteConnectionInfo!=null ) { 257 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 258 } 259 260 remoteConnectionInfo=new ConnectionInfo(); 261 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 262 remoteConnectionInfo.setClientId("NC_"+configuration.getBrokerName()+"_outbound"); 263 remoteConnectionInfo.setUserName(configuration.getUserName()); 264 remoteConnectionInfo.setPassword(configuration.getPassword()); 265 remoteBroker.oneway(remoteConnectionInfo); 266 if (isCreatedByDuplex()==false) { 267 BrokerInfo brokerInfo=new BrokerInfo(); 268 brokerInfo.setBrokerName(configuration.getBrokerName()); 269 brokerInfo.setNetworkConnection(true); 270 brokerInfo.setDuplexConnection(configuration.isDuplex()); 271 272 Properties props = new Properties (); 274 IntrospectionSupport.getProperties(this,props,null); 275 String str = MarshallingSupport.propertiesToString(props); 276 brokerInfo.setNetworkProperties(str); 277 remoteBroker.oneway(brokerInfo); 278 } 279 280 SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1); 281 remoteBroker.oneway(remoteSessionInfo); 282 283 producerInfo=new ProducerInfo(remoteSessionInfo,1); 284 producerInfo.setResponseRequired(false); 285 remoteBroker.oneway(producerInfo); 286 287 demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1); 289 demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync()); 290 String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+configuration.getDestinationFilter(); 291 if( configuration.isBridgeTempDestinations() ) { 292 advisoryTopic += ","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 293 } 294 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 295 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 296 remoteBroker.oneway(demandConsumerInfo); 297 startedLatch.countDown(); 298 299 if (!disposed){ 300 triggerLocalStartBridge(); 301 } 302 303 } 304 } 305 } 306 307 public void stop() throws Exception { 308 log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed); 309 boolean wasDisposedAlready=disposed; 310 if(!disposed){ 311 try{ 312 disposed=true; 313 remoteBridgeStarted.set(false); 314 localBroker.oneway(new ShutdownInfo()); 315 remoteBroker.oneway(new ShutdownInfo()); 316 }catch(IOException e){ 317 log.debug("Caught exception stopping",e); 318 }finally{ 319 ServiceStopper ss=new ServiceStopper(); 320 ss.stop(localBroker); 321 ss.stop(remoteBroker); 322 ss.throwFirstException(); 323 } 324 } 325 if(wasDisposedAlready){ 326 log.debug(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped"); 327 }else{ 328 log.info(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped"); 329 } 330 } 331 332 public void serviceRemoteException(Throwable error){ 333 if(!disposed){ 334 if(error instanceof SecurityException ||error instanceof GeneralSecurityException ){ 335 log.error("Network connection between "+localBroker+" and "+remoteBroker 336 +" shutdown due to a remote error: "+error); 337 }else{ 338 log.warn("Network connection between "+localBroker+" and "+remoteBroker 339 +" shutdown due to a remote error: "+error); 340 } 341 log.debug("The remote Exception was: "+error,error); 342 new Thread (){ 343 344 public void run(){ 345 ServiceSupport.dispose(DemandForwardingBridgeSupport.this); 346 } 347 }.start(); 348 fireBridgeFailed(); 349 } 350 } 351 352 protected void serviceRemoteCommand(Command command) { 353 if(!disposed){ 354 try{ 355 if(command.isMessageDispatch()){ 356 waitStarted(); 357 MessageDispatch md=(MessageDispatch) command; 358 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 359 demandConsumerDispatched++; 360 if(demandConsumerDispatched>(demandConsumerInfo.getPrefetchSize()*.75)){ 361 remoteBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,demandConsumerDispatched)); 362 demandConsumerDispatched=0; 363 } 364 }else if(command.isBrokerInfo()){ 365 366 lastConnectSucceeded.set(true); 367 serviceRemoteBrokerInfo(command); 368 localBroker.oneway(command); 370 371 }else if(command.getClass() == ConnectionError.class ) { 372 ConnectionError ce = (ConnectionError) command; 373 serviceRemoteException(ce.getException()); 374 }else{ 375 switch(command.getDataStructureType()){ 376 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 377 case WireFormatInfo.DATA_STRUCTURE_TYPE: 378 case ShutdownInfo.DATA_STRUCTURE_TYPE: 379 break; 380 default: 381 log.warn("Unexpected remote command: "+command); 382 } 383 } 384 }catch(Exception e){ 385 serviceRemoteException(e); 386 } 387 } 388 } 389 390 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 391 392 final int networkTTL = configuration.getNetworkTTL(); 393 if(data.getClass()==ConsumerInfo.class){ 394 ConsumerInfo info=(ConsumerInfo) data; 396 BrokerId[] path=info.getBrokerPath(); 397 if((path!=null&&path.length>= networkTTL)){ 398 if(log.isDebugEnabled()) 399 log.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only"); 400 return; 401 } 402 if(contains(info.getBrokerPath(),localBrokerPath[0])){ 403 if(log.isDebugEnabled()) 405 log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once"); 406 return; 407 } 408 if (!isPermissableDestination(info.getDestination())){ 409 if(log.isDebugEnabled()) 411 log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited"); 412 return; 413 } 414 info=info.copy(); 416 addRemoteBrokerToBrokerPath(info); 417 DemandSubscription sub=createDemandSubscription(info); 418 if (sub != null){ 419 addSubscription(sub); 420 if(log.isDebugEnabled()) 421 log.debug(configuration.getBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info); 422 }else { 423 if(log.isDebugEnabled()) 424 log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination"); 425 } 426 } 427 else if (data.getClass()==DestinationInfo.class){ 428 DestinationInfo destInfo = (DestinationInfo) data; 431 BrokerId[] path=destInfo.getBrokerPath(); 432 if((path!=null&&path.length>= networkTTL)){ 433 if(log.isDebugEnabled()) 434 log.debug("Ignoring Subscription " + destInfo + " restricted to " + networkTTL + " network hops only"); 435 return; 436 } 437 if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){ 438 if(log.isDebugEnabled()) 440 log.debug("Ignoring sub " + destInfo + " already routed through this broker once"); 441 return; 442 } 443 444 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 445 if (destInfo.getDestination() instanceof ActiveMQTempDestination){ 446 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 448 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 449 } 450 451 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath())); 452 453 log.debug("Replying destination control command: "+destInfo); 454 localBroker.oneway(destInfo); 455 456 } 457 else if(data.getClass()==RemoveInfo.class){ 458 ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId(); 459 removeDemandSubscription(id); 460 } 461 } 462 463 public void serviceLocalException(Throwable error) { 464 if( !disposed ) { 465 log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error); 466 log.debug("The local Exception was:"+error,error); 467 new Thread () { 468 public void run() { 469 ServiceSupport.dispose(DemandForwardingBridgeSupport.this); 470 } 471 }.start(); 472 fireBridgeFailed(); 473 } 474 } 475 476 protected void addSubscription(DemandSubscription sub) throws IOException { 477 if (sub != null){ 478 localBroker.oneway(sub.getLocalInfo()); 479 } 480 } 481 482 protected void removeSubscription(DemandSubscription sub) throws IOException { 483 if(sub!=null){ 484 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 485 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 486 } 487 } 488 489 protected DemandSubscription getDemandSubscription(MessageDispatch md) { 490 return (DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId()); 491 } 492 493 protected Message configureMessage(MessageDispatch md) { 494 Message message=md.getMessage().copy(); 495 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath)); 497 message.setProducerId(producerInfo.getProducerId()); 498 message.setDestination(md.getDestination()); 499 if(message.getOriginalTransactionId()==null) 500 message.setOriginalTransactionId(message.getTransactionId()); 501 message.setTransactionId(null); 502 return message; 503 } 504 505 protected void serviceLocalCommand(Command command) { 506 if(!disposed){ 507 final boolean trace=log.isTraceEnabled(); 508 try{ 509 if(command.isMessageDispatch()){ 510 waitStarted(); 511 final MessageDispatch md=(MessageDispatch) command; 512 DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId()); 513 if(sub!=null){ 514 Message message= configureMessage(md); 515 if(trace) 516 log.trace("bridging "+configuration.getBrokerName()+" -> "+remoteBrokerName+": "+message); 517 518 519 520 if( !message.isResponseRequired() ) { 521 522 remoteBroker.oneway(message); 525 localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); 526 527 } else { 528 529 ResponseCallback callback = new ResponseCallback() { 532 public void onCompletion(FutureResponse future) { 533 try { 534 Response response = future.getResult(); 535 if(response.isException()){ 536 ExceptionResponse er=(ExceptionResponse) response; 537 serviceLocalException(er.getException()); 538 } else { 539 localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); 540 } 541 } catch (IOException e) { 542 serviceLocalException(e); 543 } 544 } 545 }; 546 547 remoteBroker.asyncRequest(message, callback); 548 } 549 550 }else{ 551 if (trace)log.trace("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); 552 } 553 }else if(command.isBrokerInfo()){ 554 serviceLocalBrokerInfo(command); 555 }else if(command.isShutdownInfo()){ 556 log.info(configuration.getBrokerName()+" Shutting down"); 557 if( !remoteInterupted.get() ) { 561 stop(); 562 } 563 }else if(command.getClass() == ConnectionError.class ) { 564 ConnectionError ce = (ConnectionError) command; 565 serviceLocalException(ce.getException()); 566 }else{ 567 switch(command.getDataStructureType()){ 568 case WireFormatInfo.DATA_STRUCTURE_TYPE: 569 break; 570 default: 571 log.warn("Unexpected local command: "+command); 572 } 573 } 574 }catch(Exception e){ 575 serviceLocalException(e); 576 } 577 } 578 } 579 580 583 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 584 return dynamicallyIncludedDestinations; 585 } 586 587 590 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 591 this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations; 592 } 593 594 597 public ActiveMQDestination[] getExcludedDestinations() { 598 return excludedDestinations; 599 } 600 601 604 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 605 this.excludedDestinations=excludedDestinations; 606 } 607 608 611 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 612 return staticallyIncludedDestinations; 613 } 614 615 618 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 619 this.staticallyIncludedDestinations=staticallyIncludedDestinations; 620 } 621 622 625 public ActiveMQDestination[] getDurableDestinations() { 626 return durableDestinations; 627 } 628 629 632 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 633 this.durableDestinations=durableDestinations; 634 } 635 636 639 public Transport getLocalBroker() { 640 return localBroker; 641 } 642 643 646 public Transport getRemoteBroker() { 647 return remoteBroker; 648 } 649 650 653 public boolean isCreatedByDuplex(){ 654 return this.createdByDuplex; 655 } 656 657 660 public void setCreatedByDuplex(boolean createdByDuplex){ 661 this.createdByDuplex=createdByDuplex; 662 } 663 664 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 665 if(brokerPath!=null){ 666 for(int i=0;i<brokerPath.length;i++){ 667 if(brokerId.equals(brokerPath[i])) 668 return true; 669 } 670 } 671 return false; 672 } 673 674 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 675 if (brokerPath == null || brokerPath.length == 0) 676 return pathsToAppend; 677 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 678 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 679 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 680 return rc; 681 } 682 683 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 684 if (brokerPath == null || brokerPath.length == 0) 685 return new BrokerId[] { idToAppend }; 686 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 687 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 688 rc[brokerPath.length] = idToAppend; 689 return rc; 690 } 691 692 protected boolean isPermissableDestination(ActiveMQDestination destination) { 693 if( destination.isTemporary() && !configuration.isBridgeTempDestinations() ) { 695 return false; 696 } 697 698 DestinationFilter filter=DestinationFilter.parseFilter(destination); 699 ActiveMQDestination[] dests = excludedDestinations; 700 if(dests!=null&&dests.length>0){ 701 for(int i=0;i<dests.length;i++){ 702 ActiveMQDestination match=dests[i]; 703 if(match!=null&&filter.matches(match)){ 704 return false; 705 } 706 } 707 } 708 dests = dynamicallyIncludedDestinations; 709 if(dests!=null&&dests.length>0){ 710 for(int i=0;i<dests.length;i++){ 711 ActiveMQDestination match=dests[i]; 712 if(match!=null&&filter.matches(match)){ 713 return true; 714 } 715 } 716 return false; 717 } 718 719 return true; 720 } 721 722 726 protected void setupStaticDestinations() { 727 ActiveMQDestination[] dests = staticallyIncludedDestinations; 728 if (dests != null){ 729 for(int i=0;i<dests.length;i++){ 730 ActiveMQDestination dest=dests[i]; 731 DemandSubscription sub = createDemandSubscription(dest); 732 try{ 733 addSubscription(sub); 734 }catch(IOException e){ 735 log.error("Failed to add static destination " + dest,e); 736 } 737 if(log.isTraceEnabled()) 738 log.trace("Forwarding messages for static destination: " + dest); 739 } 740 } 741 } 742 743 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 744 return doCreateDemandSubscription(info); 745 } 746 747 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 748 DemandSubscription result=new DemandSubscription(info); 749 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator 750 .getNextSequenceId())); 751 752 if( configuration.isDecreaseNetworkConsumerPriority() ) { 753 byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY; 754 if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){ 755 priority-=info.getBrokerPath().length+1; 757 } 758 result.getLocalInfo().setPriority(priority); 759 } 760 configureDemandSubscription(info, result); 761 return result; 762 } 763 764 protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { 765 ConsumerInfo info = new ConsumerInfo(); 766 info.setDestination(destination); 767 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator 770 .getNextSequenceId())); 771 DemandSubscription result=new DemandSubscription(info); 772 result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); 773 return result; 774 } 775 776 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 777 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 778 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize()); 779 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(),sub); 780 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(),sub); 781 782 sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info)); 785 } 786 787 protected void removeDemandSubscription(ConsumerId id) throws IOException { 788 DemandSubscription sub=(DemandSubscription) subscriptionMapByRemoteId.remove(id); 789 if (sub != null){ 790 removeSubscription(sub); 791 if(log.isTraceEnabled()) 792 log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" : "+sub.getRemoteInfo()); 793 } 794 } 795 796 protected void waitStarted() throws InterruptedException { 797 startedLatch.await(); 798 } 799 800 protected void clearDownSubscriptions() { 801 subscriptionMapByLocalId.clear(); 802 subscriptionMapByRemoteId.clear(); 803 } 804 805 protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException ; 806 807 protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException ; 808 809 protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException ; 810 811 protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException ; 812 813 protected abstract BrokerId[] getRemoteBrokerPath(); 814 815 public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){ 816 this.bridgeFailedListener=listener; 817 } 818 819 private void fireBridgeFailed() { 820 NetworkBridgeFailedListener l = this.bridgeFailedListener; 821 if (l!=null) { 822 l.bridgeFailed(); 823 } 824 } 825 } 826 | Popular Tags |