1 14 15 package org.apache.activemq.broker; 16 17 import java.io.IOException ; 18 import java.util.ArrayList ; 19 import java.util.Collections ; 20 import java.util.HashMap ; 21 import java.util.Iterator ; 22 import java.util.LinkedList ; 23 import java.util.List ; 24 import java.util.Map ; 25 import java.util.Properties ; 26 import java.util.concurrent.ConcurrentHashMap ; 27 import java.util.concurrent.CountDownLatch ; 28 import java.util.concurrent.TimeUnit ; 29 import java.util.concurrent.atomic.AtomicBoolean ; 30 import java.util.concurrent.atomic.AtomicInteger ; 31 import java.util.concurrent.atomic.AtomicReference ; 32 33 import org.apache.activemq.Service; 34 import org.apache.activemq.broker.ft.MasterBroker; 35 import org.apache.activemq.broker.region.ConnectionStatistics; 36 import org.apache.activemq.broker.region.RegionBroker; 37 import org.apache.activemq.command.BrokerInfo; 38 import org.apache.activemq.command.Command; 39 import org.apache.activemq.command.CommandTypes; 40 import org.apache.activemq.command.ConnectionControl; 41 import org.apache.activemq.command.ConnectionError; 42 import org.apache.activemq.command.ConnectionId; 43 import org.apache.activemq.command.ConnectionInfo; 44 import org.apache.activemq.command.ConsumerControl; 45 import org.apache.activemq.command.ConsumerId; 46 import org.apache.activemq.command.ConsumerInfo; 47 import org.apache.activemq.command.ControlCommand; 48 import org.apache.activemq.command.DataArrayResponse; 49 import org.apache.activemq.command.DestinationInfo; 50 import org.apache.activemq.command.ExceptionResponse; 51 import org.apache.activemq.command.FlushCommand; 52 import org.apache.activemq.command.IntegerResponse; 53 import org.apache.activemq.command.KeepAliveInfo; 54 import org.apache.activemq.command.Message; 55 import org.apache.activemq.command.MessageAck; 56 import org.apache.activemq.command.MessageDispatch; 57 import org.apache.activemq.command.MessageDispatchNotification; 58 import org.apache.activemq.command.MessagePull; 59 import org.apache.activemq.command.ProducerAck; 60 import org.apache.activemq.command.ProducerId; 61 import org.apache.activemq.command.ProducerInfo; 62 import org.apache.activemq.command.RemoveSubscriptionInfo; 63 import org.apache.activemq.command.Response; 64 import org.apache.activemq.command.SessionId; 65 import org.apache.activemq.command.SessionInfo; 66 import org.apache.activemq.command.ShutdownInfo; 67 import org.apache.activemq.command.TransactionId; 68 import org.apache.activemq.command.TransactionInfo; 69 import org.apache.activemq.command.WireFormatInfo; 70 import org.apache.activemq.network.DemandForwardingBridge; 71 import org.apache.activemq.network.NetworkBridgeConfiguration; 72 import org.apache.activemq.network.NetworkBridgeFactory; 73 import org.apache.activemq.security.MessageAuthorizationPolicy; 74 import org.apache.activemq.state.CommandVisitor; 75 import org.apache.activemq.state.ConsumerState; 76 import org.apache.activemq.state.ProducerState; 77 import org.apache.activemq.state.SessionState; 78 import org.apache.activemq.state.TransactionState; 79 import org.apache.activemq.thread.Task; 80 import org.apache.activemq.thread.TaskRunner; 81 import org.apache.activemq.thread.TaskRunnerFactory; 82 import org.apache.activemq.transport.DefaultTransportListener; 83 import org.apache.activemq.transport.Transport; 84 import org.apache.activemq.transport.TransportFactory; 85 import org.apache.activemq.util.IntrospectionSupport; 86 import org.apache.activemq.util.MarshallingSupport; 87 import org.apache.activemq.util.ServiceSupport; 88 import org.apache.commons.logging.Log; 89 import org.apache.commons.logging.LogFactory; 90 91 94 public class TransportConnection implements Service,Connection,Task,CommandVisitor{ 95 96 private static final Log log=LogFactory.getLog(TransportConnection.class); 97 private static final Log transportLog=LogFactory.getLog(TransportConnection.class.getName()+".Transport"); 98 private static final Log serviceLog=LogFactory.getLog(TransportConnection.class.getName()+".Service"); 99 protected final Broker broker; 101 private MasterBroker masterBroker; 102 protected final TransportConnector connector; 103 private final Transport transport; 104 private MessageAuthorizationPolicy messageAuthorizationPolicy; 105 protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap (); 107 protected final Map brokerConnectionStates; 108 protected BrokerInfo brokerInfo; 110 private WireFormatInfo wireFormatInfo; 111 protected final List dispatchQueue=Collections.synchronizedList(new LinkedList ()); 113 protected final TaskRunner taskRunner; 114 protected final AtomicReference transportException = new AtomicReference (); 115 private boolean inServiceException=false; 116 private ConnectionStatistics statistics=new ConnectionStatistics(); 117 private boolean manageable; 118 private boolean slow; 119 private boolean markedCandidate; 120 private boolean blockedCandidate; 121 private boolean blocked; 122 private boolean connected; 123 private boolean active; 124 private boolean starting; 125 private boolean pendingStop; 126 private long timeStamp=0; 127 private final AtomicBoolean stopped = new AtomicBoolean (false); 128 private final AtomicBoolean transportDisposed = new AtomicBoolean (); 129 private final AtomicBoolean disposed=new AtomicBoolean (false); 130 private CountDownLatch stopLatch=new CountDownLatch (1); 131 private final AtomicBoolean asyncException=new AtomicBoolean (false); 132 private final Map <ProducerId,ProducerBrokerExchange>producerExchanges = new HashMap <ProducerId,ProducerBrokerExchange>(); 133 private final Map <ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap <ConsumerId,ConsumerBrokerExchange>(); 134 private CountDownLatch dispatchStoppedLatch = new CountDownLatch (1); 135 protected AtomicBoolean dispatchStopped=new AtomicBoolean (false); 136 private ConnectionContext context; 137 private boolean networkConnection; 138 private AtomicInteger protocolVersion=new AtomicInteger (CommandTypes.PROTOCOL_VERSION); 139 private DemandForwardingBridge duplexBridge = null; 140 141 static class ConnectionState extends org.apache.activemq.state.ConnectionState{ 142 143 private final ConnectionContext context; 144 TransportConnection connection; 145 146 public ConnectionState(ConnectionInfo info,ConnectionContext context,TransportConnection connection){ 147 super(info); 148 this.context=context; 149 this.connection=connection; 150 } 151 152 public ConnectionContext getContext(){ 153 return context; 154 } 155 156 public TransportConnection getConnection(){ 157 return connection; 158 } 159 } 160 161 167 public TransportConnection(TransportConnector connector,final Transport transport,Broker broker, 168 TaskRunnerFactory taskRunnerFactory){ 169 this.connector=connector; 170 this.broker=broker; 171 RegionBroker rb=(RegionBroker)broker.getAdaptor(RegionBroker.class); 172 brokerConnectionStates=rb.getConnectionStates(); 173 if(connector!=null){ 174 this.statistics.setParent(connector.getStatistics()); 175 } 176 if(taskRunnerFactory!=null){ 177 taskRunner=taskRunnerFactory.createTaskRunner(this,"ActiveMQ Connection Dispatcher: " 178 +System.identityHashCode(this)); 179 }else{ 180 taskRunner=null; 181 } 182 connector.setBrokerName(broker.getBrokerName()); 183 this.transport=transport; 184 this.transport.setTransportListener(new DefaultTransportListener(){ 185 186 public void onCommand(Object o){ 187 Command command=(Command)o; 188 Response response=service(command); 189 if(response!=null){ 190 dispatchSync(response); 191 } 192 } 193 194 public void onException(IOException exception){ 195 serviceTransportException(exception); 196 } 197 }); 198 connected=true; 199 } 200 201 204 public int getDispatchQueueSize(){ 205 return dispatchQueue.size(); 206 } 207 208 public void serviceTransportException(IOException e){ 209 if(!disposed.get()){ 210 transportException.set(e); 211 if(transportLog.isDebugEnabled()) 212 transportLog.debug("Transport failed: "+e,e); 213 ServiceSupport.dispose(this); 214 } 215 } 216 217 223 public void serviceExceptionAsync(final IOException e){ 224 if(asyncException.compareAndSet(false,true)){ 225 new Thread ("Async Exception Handler"){ 226 227 public void run(){ 228 serviceException(e); 229 } 230 }.start(); 231 } 232 } 233 234 240 public void serviceException(Throwable e){ 241 if(e instanceof IOException ){ 244 serviceTransportException((IOException )e); 245 } 246 else if(e.getClass()==BrokerStoppedException.class){ 249 if(!disposed.get()){ 250 if(serviceLog.isDebugEnabled()) 251 serviceLog.debug("Broker has been stopped. Notifying client and closing his connection."); 252 ConnectionError ce=new ConnectionError(); 253 ce.setException(e); 254 dispatchSync(ce); 255 try{ 257 Thread.sleep(500); 258 }catch(InterruptedException ie){ 259 Thread.currentThread().interrupt(); 260 } 261 ServiceSupport.dispose(this); 263 } 264 }else if(!disposed.get()&&!inServiceException){ 265 inServiceException=true; 266 try{ 267 serviceLog.error("Async error occurred: "+e,e); 268 ConnectionError ce=new ConnectionError(); 269 ce.setException(e); 270 dispatchAsync(ce); 271 }finally{ 272 inServiceException=false; 273 } 274 } 275 } 276 277 public Response service(Command command){ 278 Response response=null; 279 boolean responseRequired=command.isResponseRequired(); 280 int commandId=command.getCommandId(); 281 try{ 282 response=command.visit(this); 283 }catch(Throwable e){ 284 if(responseRequired){ 285 if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class) 286 serviceLog.debug("Error occured while processing sync command: "+e,e); 287 response=new ExceptionResponse(e); 288 }else{ 289 serviceException(e); 290 } 291 } 292 if(responseRequired){ 293 if(response==null){ 294 response=new Response(); 295 } 296 response.setCorrelationId(commandId); 297 } 298 299 if( context!=null ) { 301 if( context.isDontSendReponse() ) { 302 context.setDontSendReponse(false); 303 response=null; 304 } 305 context=null; 306 } 307 308 return response; 309 } 310 311 protected ConnectionState lookupConnectionState(ConsumerId id){ 312 ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId()); 313 if(cs==null) 314 throw new IllegalStateException ("Cannot lookup a consumer from a connection that had not been registered: " 315 +id.getParentId().getParentId()); 316 return cs; 317 } 318 319 protected ConnectionState lookupConnectionState(ProducerId id){ 320 ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId()); 321 if(cs==null) 322 throw new IllegalStateException ("Cannot lookup a producer from a connection that had not been registered: " 323 +id.getParentId().getParentId()); 324 return cs; 325 } 326 327 protected ConnectionState lookupConnectionState(SessionId id){ 328 ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId()); 329 if(cs==null) 330 throw new IllegalStateException ("Cannot lookup a session from a connection that had not been registered: " 331 +id.getParentId()); 332 return cs; 333 } 334 335 protected ConnectionState lookupConnectionState(ConnectionId connectionId){ 336 ConnectionState cs=(ConnectionState)localConnectionStates.get(connectionId); 337 if(cs==null) 338 throw new IllegalStateException ("Cannot lookup a connection that had not been registered: "+connectionId); 339 return cs; 340 } 341 342 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 343 return null; 344 } 345 346 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 347 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(),info); 348 return null; 349 } 350 351 public Response processWireFormat(WireFormatInfo info) throws Exception { 352 wireFormatInfo=info; 353 protocolVersion.set(info.getVersion()); 354 return null; 355 } 356 357 public Response processShutdown(ShutdownInfo info) throws Exception { 358 stop(); 359 return null; 360 } 361 362 public Response processFlush(FlushCommand command) throws Exception { 363 return null; 364 } 365 366 synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception { 367 ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); 368 context=null; 369 if(cs!=null){ 370 context=cs.getContext(); 371 } 372 if(cs.getTransactionState(info.getTransactionId())==null){ 374 cs.addTransactionState(info.getTransactionId()); 375 broker.beginTransaction(context,info.getTransactionId()); 376 } 377 return null; 378 } 379 380 synchronized public Response processEndTransaction(TransactionInfo info) throws Exception { 381 return null; 385 } 386 387 synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception { 388 ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); 389 context=null; 390 if(cs!=null){ 391 context=cs.getContext(); 392 } 393 TransactionState transactionState=cs.getTransactionState(info.getTransactionId()); 394 if(transactionState==null) 395 throw new IllegalStateException ("Cannot prepare a transaction that had not been started: " 396 +info.getTransactionId()); 397 if(!transactionState.isPrepared()){ 399 transactionState.setPrepared(true); 400 int result=broker.prepareTransaction(context,info.getTransactionId()); 401 transactionState.setPreparedResult(result); 402 IntegerResponse response=new IntegerResponse(result); 403 return response; 404 }else{ 405 IntegerResponse response=new IntegerResponse(transactionState.getPreparedResult()); 406 return response; 407 } 408 } 409 410 synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 411 ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); 412 context=null; 413 if(cs!=null){ 414 context=cs.getContext(); 415 } 416 cs.removeTransactionState(info.getTransactionId()); 417 broker.commitTransaction(context,info.getTransactionId(),true); 418 return null; 419 } 420 421 synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 422 ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); 423 context=null; 424 if(cs!=null){ 425 context=cs.getContext(); 426 } 427 cs.removeTransactionState(info.getTransactionId()); 428 broker.commitTransaction(context,info.getTransactionId(),false); 429 return null; 430 } 431 432 synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception { 433 ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); 434 context=null; 435 if(cs!=null){ 436 context=cs.getContext(); 437 } 438 cs.removeTransactionState(info.getTransactionId()); 439 broker.rollbackTransaction(context,info.getTransactionId()); 440 return null; 441 } 442 443 synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception { 444 ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); 445 context=null; 446 if(cs!=null){ 447 context=cs.getContext(); 448 } 449 broker.forgetTransaction(context,info.getTransactionId()); 450 return null; 451 } 452 453 synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception { 454 ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); 455 context=null; 456 if(cs!=null){ 457 context=cs.getContext(); 458 } 459 TransactionId[] preparedTransactions=broker.getPreparedTransactions(context); 460 return new DataArrayResponse(preparedTransactions); 461 } 462 463 public Response processMessage(Message messageSend) throws Exception { 464 ProducerId producerId=messageSend.getProducerId(); 465 ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId); 466 ProducerState producerState = null; 467 if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){ 468 producerState=producerExchange.getProducerState(); 469 } 470 if(producerState!=null){ 471 long seq=messageSend.getMessageId().getProducerSequenceId(); 472 if(seq>producerState.getLastSequenceId()){ 473 producerState.setLastSequenceId(seq); 474 broker.send(producerExchange,messageSend); 475 }else { 476 if (log.isDebugEnabled()) { 477 log.debug("Discarding duplicate: " + messageSend); 478 } 479 } 480 }else{ 481 broker.send(producerExchange,messageSend); 483 } 484 return null; 485 } 486 487 public Response processMessageAck(MessageAck ack) throws Exception { 488 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 489 broker.acknowledge(consumerExchange,ack); 490 return null; 491 } 492 493 public Response processMessagePull(MessagePull pull) throws Exception { 494 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(),pull); 495 } 496 497 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 498 broker.processDispatchNotification(notification); 499 return null; 500 } 501 502 synchronized public Response processAddDestination(DestinationInfo info) throws Exception { 503 ConnectionState cs=lookupConnectionState(info.getConnectionId()); 504 broker.addDestinationInfo(cs.getContext(),info); 505 if(info.getDestination().isTemporary()){ 506 cs.addTempDestination(info); 507 } 508 return null; 509 } 510 511 synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception { 512 ConnectionState cs=lookupConnectionState(info.getConnectionId()); 513 broker.removeDestinationInfo(cs.getContext(),info); 514 if(info.getDestination().isTemporary()){ 515 cs.removeTempDestination(info.getDestination()); 516 } 517 return null; 518 } 519 520 synchronized public Response processAddProducer(ProducerInfo info) throws Exception { 521 SessionId sessionId=info.getProducerId().getParentId(); 522 ConnectionId connectionId=sessionId.getParentId(); 523 ConnectionState cs=lookupConnectionState(connectionId); 524 SessionState ss=cs.getSessionState(sessionId); 525 if(ss==null) 526 throw new IllegalStateException ("Cannot add a producer to a session that had not been registered: " 527 +sessionId); 528 if(!ss.getProducerIds().contains(info.getProducerId())){ 530 broker.addProducer(cs.getContext(),info); 531 try{ 532 ss.addProducer(info); 533 }catch(IllegalStateException e){ 534 broker.removeProducer(cs.getContext(),info); 535 } 536 } 537 return null; 538 } 539 540 synchronized public Response processRemoveProducer(ProducerId id) throws Exception { 541 SessionId sessionId=id.getParentId(); 542 ConnectionId connectionId=sessionId.getParentId(); 543 ConnectionState cs=lookupConnectionState(connectionId); 544 SessionState ss=cs.getSessionState(sessionId); 545 if(ss==null) 546 throw new IllegalStateException ("Cannot remove a producer from a session that had not been registered: " 547 +sessionId); 548 ProducerState ps=ss.removeProducer(id); 549 if(ps==null) 550 throw new IllegalStateException ("Cannot remove a producer that had not been registered: "+id); 551 removeProducerBrokerExchange(id); 552 broker.removeProducer(cs.getContext(),ps.getInfo()); 553 return null; 554 } 555 556 synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception { 557 SessionId sessionId=info.getConsumerId().getParentId(); 558 ConnectionId connectionId=sessionId.getParentId(); 559 ConnectionState cs=lookupConnectionState(connectionId); 560 SessionState ss=cs.getSessionState(sessionId); 561 if(ss==null) 562 throw new IllegalStateException ("Cannot add a consumer to a session that had not been registered: " 563 +sessionId); 564 if(!ss.getConsumerIds().contains(info.getConsumerId())){ 566 broker.addConsumer(cs.getContext(),info); 567 try{ 568 ss.addConsumer(info); 569 }catch(IllegalStateException e){ 570 broker.removeConsumer(cs.getContext(),info); 571 } 572 } 573 return null; 574 } 575 576 synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception { 577 SessionId sessionId=id.getParentId(); 578 ConnectionId connectionId=sessionId.getParentId(); 579 ConnectionState cs=lookupConnectionState(connectionId); 580 SessionState ss=cs.getSessionState(sessionId); 581 if(ss==null) 582 throw new IllegalStateException ("Cannot remove a consumer from a session that had not been registered: " 583 +sessionId); 584 ConsumerState consumerState=ss.removeConsumer(id); 585 if(consumerState==null) 586 throw new IllegalStateException ("Cannot remove a consumer that had not been registered: "+id); 587 broker.removeConsumer(cs.getContext(),consumerState.getInfo()); 588 removeConsumerBrokerExchange(id); 589 return null; 590 } 591 592 synchronized public Response processAddSession(SessionInfo info) throws Exception { 593 ConnectionId connectionId=info.getSessionId().getParentId(); 594 ConnectionState cs=lookupConnectionState(connectionId); 595 if(!cs.getSessionIds().contains(info.getSessionId())){ 597 broker.addSession(cs.getContext(),info); 598 try{ 599 cs.addSession(info); 600 }catch(IllegalStateException e){ 601 broker.removeSession(cs.getContext(),info); 602 } 603 } 604 return null; 605 } 606 607 synchronized public Response processRemoveSession(SessionId id) throws Exception { 608 ConnectionId connectionId=id.getParentId(); 609 ConnectionState cs=lookupConnectionState(connectionId); 610 SessionState session=cs.getSessionState(id); 611 if(session==null) 612 throw new IllegalStateException ("Cannot remove session that had not been registered: "+id); 613 session.shutdown(); 615 for(Iterator iter=session.getConsumerIds().iterator();iter.hasNext();){ 617 ConsumerId consumerId=(ConsumerId)iter.next(); 618 try{ 619 processRemoveConsumer(consumerId); 620 }catch(Throwable e){ 621 log.warn("Failed to remove consumer: "+consumerId+". Reason: "+e,e); 622 } 623 } 624 for(Iterator iter=session.getProducerIds().iterator();iter.hasNext();){ 625 ProducerId producerId=(ProducerId)iter.next(); 626 try{ 627 processRemoveProducer(producerId); 628 }catch(Throwable e){ 629 log.warn("Failed to remove producer: "+producerId+". Reason: "+e,e); 630 } 631 } 632 cs.removeSession(id); 633 broker.removeSession(cs.getContext(),session.getInfo()); 634 return null; 635 } 636 637 synchronized public Response processAddConnection(ConnectionInfo info) throws Exception { 638 ConnectionState state=(ConnectionState)brokerConnectionStates.get(info.getConnectionId()); 639 if(state!=null){ 640 if(this!=state.getConnection()){ 644 log.debug("Killing previous stale connection: "+state.getConnection()); 645 state.getConnection().stop(); 646 if(!state.getConnection().stopLatch.await(15,TimeUnit.SECONDS)){ 647 throw new Exception ("Previous connection could not be clean up."); 648 } 649 } 650 } 651 log.debug("Setting up new connection: "+this); 652 String clientId=info.getClientId(); 654 context=new ConnectionContext(); 655 context.setConnection(this); 656 context.setBroker(broker); 657 context.setConnector(connector); 658 context.setTransactions(new ConcurrentHashMap ()); 659 context.setClientId(clientId); 660 context.setUserName(info.getUserName()); 661 context.setConnectionId(info.getConnectionId()); 662 context.setWireFormatInfo(wireFormatInfo); 663 context.setNetworkConnection(networkConnection); 664 context.incrementReference(); 665 this.manageable=info.isManageable(); 666 state=new ConnectionState(info,context,this); 667 brokerConnectionStates.put(info.getConnectionId(),state); 668 localConnectionStates.put(info.getConnectionId(),state); 669 broker.addConnection(context,info); 670 if(info.isManageable()&&broker.isFaultTolerantConfiguration()){ 671 ConnectionControl command=new ConnectionControl(); 673 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 674 dispatchAsync(command); 675 } 676 return null; 677 } 678 679 synchronized public Response processRemoveConnection(ConnectionId id){ 680 ConnectionState cs=lookupConnectionState(id); 681 cs.shutdown(); 683 for(Iterator iter=cs.getSessionIds().iterator();iter.hasNext();){ 685 SessionId sessionId=(SessionId)iter.next(); 686 try{ 687 processRemoveSession(sessionId); 688 }catch(Throwable e){ 689 serviceLog.warn("Failed to remove session "+sessionId,e); 690 } 691 } 692 for(Iterator iter=cs.getTempDesinations().iterator();iter.hasNext();){ 694 DestinationInfo di=(DestinationInfo)iter.next(); 695 try{ 696 broker.removeDestination(cs.getContext(),di.getDestination(),0); 697 }catch(Throwable e){ 698 serviceLog.warn("Failed to remove tmp destination "+di.getDestination(),e); 699 } 700 iter.remove(); 701 } 702 try{ 703 broker.removeConnection(cs.getContext(),cs.getInfo(),null); 704 }catch(Throwable e){ 705 serviceLog.warn("Failed to remove connection "+cs.getInfo(),e); 706 } 707 ConnectionState state=(ConnectionState)localConnectionStates.remove(id); 708 if(state!=null){ 709 if(state.getContext().decrementReference()==0){ 712 brokerConnectionStates.remove(id); 713 } 714 } 715 return null; 716 } 717 718 719 public Response processProducerAck(ProducerAck ack) throws Exception { 720 return null; 722 } 723 724 public Connector getConnector(){ 725 return connector; 726 } 727 728 public void dispatchSync(Command message){ 729 getStatistics().getEnqueues().increment(); 730 try { 731 processDispatch(message); 732 } catch (IOException e) { 733 serviceExceptionAsync(e); 734 } 735 } 736 737 public void dispatchAsync(Command message){ 738 if( !disposed.get() ) { 739 getStatistics().getEnqueues().increment(); 740 if( taskRunner==null ) { 741 dispatchSync( message ); 742 } else { 743 dispatchQueue.add(message); 744 try { 745 taskRunner.wakeup(); 746 } catch (InterruptedException e) { 747 Thread.currentThread().interrupt(); 748 } 749 } 750 } else { 751 if(message.isMessageDispatch()) { 752 MessageDispatch md=(MessageDispatch) message; 753 Runnable sub=md.getTransmitCallback(); 754 broker.processDispatch(md); 755 if(sub!=null){ 756 sub.run(); 757 } 758 } 759 } 760 } 761 762 protected void processDispatch(Command command) throws IOException { 763 try { 764 if( !disposed.get() ) { 765 dispatch(command); 766 } 767 } finally { 768 769 if(command.isMessageDispatch()){ 770 MessageDispatch md=(MessageDispatch) command; 771 Runnable sub=md.getTransmitCallback(); 772 broker.processDispatch(md); 773 if(sub!=null){ 774 sub.run(); 775 } 776 } 777 778 getStatistics().getDequeues().increment(); 779 } 780 } 781 782 783 784 public boolean iterate(){ 785 try { 786 if( disposed.get() ) { 787 if( dispatchStopped.compareAndSet(false, true)) { 788 if( transportException.get()==null ) { 789 try { 790 dispatch(new ShutdownInfo()); 791 } catch (Throwable ignore) { 792 } 793 } 794 dispatchStoppedLatch.countDown(); 795 } 796 return false; 797 } 798 799 if( !dispatchStopped.get() ) { 800 801 if( dispatchQueue.isEmpty() ) { 802 return false; 803 } else { 804 Command command = (Command) dispatchQueue.remove(0); 805 processDispatch( command ); 806 return true; 807 } 808 } else { 809 return false; 810 } 811 812 } catch (IOException e) { 813 if( dispatchStopped.compareAndSet(false, true)) { 814 dispatchStoppedLatch.countDown(); 815 } 816 serviceExceptionAsync(e); 817 return false; 818 } 819 } 820 821 824 public ConnectionStatistics getStatistics(){ 825 return statistics; 826 } 827 828 public MessageAuthorizationPolicy getMessageAuthorizationPolicy(){ 829 return messageAuthorizationPolicy; 830 } 831 832 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy){ 833 this.messageAuthorizationPolicy=messageAuthorizationPolicy; 834 } 835 836 public boolean isManageable(){ 837 return manageable; 838 } 839 840 public synchronized void start() throws Exception { 841 starting=true; 842 try{ 843 transport.start(); 844 active=true; 845 this.processDispatch(connector.getBrokerInfo()); 846 connector.onStarted(this); 847 }finally{ 848 starting=false; 852 if(pendingStop){ 853 log.debug("Calling the delayed stop()"); 854 stop(); 855 } 856 } 857 } 858 859 public void stop() throws Exception { 860 synchronized(this){ 863 pendingStop=true; 864 if(starting){ 865 log.debug("stop() called in the middle of start(). Delaying..."); 866 return; 867 } 868 } 869 if(stopped.compareAndSet(false,true)){ 870 log.debug("Stopping connection: "+transport.getRemoteAddress()); 871 connector.onStopped(this); 872 try{ 873 if(masterBroker!=null){ 874 masterBroker.stop(); 875 } 876 if (duplexBridge != null) { 877 duplexBridge.stop(); 878 } 879 if(transportException==null){ 882 transport.oneway(new ShutdownInfo()); 883 } 884 }catch(Exception ignore){ 885 } 887 transport.stop(); 888 active=false; 889 if(disposed.compareAndSet(false,true)){ 890 891 ArrayList l=new ArrayList (localConnectionStates.values()); 894 for(Iterator iter=l.iterator();iter.hasNext();){ 895 ConnectionState cs=(ConnectionState) iter.next(); 896 cs.getContext().getStopping().set(true); 897 } 898 899 if( taskRunner!=null ) { 900 taskRunner.wakeup(); 901 dispatchStoppedLatch.await(5, TimeUnit.SECONDS); 903 disposeTransport(); 904 taskRunner.shutdown(); 905 } else { 906 disposeTransport(); 907 } 908 909 if( taskRunner!=null ) 910 taskRunner.shutdown(); 911 912 for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) { 914 Command command = (Command) iter.next(); 915 if(command.isMessageDispatch()) { 916 MessageDispatch md=(MessageDispatch) command; 917 Runnable sub=md.getTransmitCallback(); 918 broker.processDispatch(md); 919 if(sub!=null){ 920 sub.run(); 921 } 922 } 923 } 924 if(!broker.isStopped()){ 928 l=new ArrayList (localConnectionStates.keySet()); 929 for(Iterator iter=l.iterator();iter.hasNext();){ 930 ConnectionId connectionId=(ConnectionId)iter.next(); 931 try{ 932 log.debug("Cleaning up connection resources."); 933 processRemoveConnection(connectionId); 934 }catch(Throwable ignore){ 935 ignore.printStackTrace(); 936 } 937 } 938 if(brokerInfo!=null){ 939 broker.removeBroker(this,brokerInfo); 940 } 941 } 942 stopLatch.countDown(); 943 } 944 } 945 } 946 947 950 public boolean isBlockedCandidate(){ 951 return blockedCandidate; 952 } 953 954 957 public void setBlockedCandidate(boolean blockedCandidate){ 958 this.blockedCandidate=blockedCandidate; 959 } 960 961 964 public boolean isMarkedCandidate(){ 965 return markedCandidate; 966 } 967 968 971 public void setMarkedCandidate(boolean markedCandidate){ 972 this.markedCandidate=markedCandidate; 973 if(!markedCandidate){ 974 timeStamp=0; 975 blockedCandidate=false; 976 } 977 } 978 979 982 public void setSlow(boolean slow){ 983 this.slow=slow; 984 } 985 986 989 public boolean isSlow(){ 990 return slow; 991 } 992 993 996 public boolean isMarkedBlockedCandidate(){ 997 return markedCandidate; 998 } 999 1000 1003 public void doMark(){ 1004 if(timeStamp==0){ 1005 timeStamp=System.currentTimeMillis(); 1006 } 1007 } 1008 1009 1012 public boolean isBlocked(){ 1013 return blocked; 1014 } 1015 1016 1019 public boolean isConnected(){ 1020 return connected; 1021 } 1022 1023 1026 public void setBlocked(boolean blocked){ 1027 this.blocked=blocked; 1028 } 1029 1030 1033 public void setConnected(boolean connected){ 1034 this.connected=connected; 1035 } 1036 1037 1040 public boolean isActive(){ 1041 return active; 1042 } 1043 1044 1047 public void setActive(boolean active){ 1048 this.active=active; 1049 } 1050 1051 1054 public synchronized boolean isStarting(){ 1055 return starting; 1056 } 1057 1058 synchronized protected void setStarting(boolean starting){ 1059 this.starting=starting; 1060 } 1061 1062 1065 public synchronized boolean isPendingStop(){ 1066 return pendingStop; 1067 } 1068 1069 protected synchronized void setPendingStop(boolean pendingStop){ 1070 this.pendingStop=pendingStop; 1071 } 1072 1073 public Response processBrokerInfo(BrokerInfo info){ 1074 if(info.isSlaveBroker()){ 1075 MutableBrokerFilter parent=(MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class); 1078 masterBroker=new MasterBroker(parent,transport); 1079 masterBroker.startProcessing(); 1080 log.info("Slave Broker "+info.getBrokerName()+" is attached"); 1081 }else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1082 try{ 1085 Properties props = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1086 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1087 IntrospectionSupport.setProperties(config,props,""); 1088 config.setBrokerName(broker.getBrokerName()); 1089 Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI()); 1090 duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,transport); 1091 duplexBridge.setCreatedByDuplex(true); 1093 duplexBridge.start(); 1094 log.info("Created Duplex Bridge back to " + info.getBrokerName()); 1095 }catch(Exception e){ 1096 log.error("Creating duplex network bridge",e); 1097 } 1098 } 1099 if(this.brokerInfo!=null){ 1101 log.warn("Unexpected extra broker info command received: "+info); 1102 Thread.dumpStack(); 1103 } 1104 this.brokerInfo=info; 1105 broker.addBroker(this,info); 1106 networkConnection = true; 1107 for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) { 1108 ConnectionState cs = (ConnectionState) iter.next(); 1109 cs.getContext().setNetworkConnection(true); 1110 } 1111 1112 return null; 1113 } 1114 1115 protected void dispatch(Command command) throws IOException { 1116 try{ 1117 setMarkedCandidate(true); 1118 transport.oneway(command); 1119 }finally{ 1120 setMarkedCandidate(false); 1121 } 1122 } 1123 1124 public String getRemoteAddress(){ 1125 return transport.getRemoteAddress(); 1126 } 1127 1128 public String getConnectionId() { 1129 Iterator iterator = localConnectionStates.values().iterator(); 1130 ConnectionState object = (ConnectionState) iterator.next(); 1131 if( object == null ) { 1132 return null; 1133 } 1134 if( object.getInfo().getClientId() !=null ) 1135 return object.getInfo().getClientId(); 1136 return object.getInfo().getConnectionId().toString(); 1137 } 1138 1139 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){ 1140 ProducerBrokerExchange result=producerExchanges.get(id); 1141 if(result==null){ 1142 synchronized(producerExchanges){ 1143 result=new ProducerBrokerExchange(); 1144 ConnectionState state=lookupConnectionState(id); 1145 context=state.getContext(); 1146 result.setConnectionContext(context); 1147 SessionState ss=state.getSessionState(id.getParentId()); 1148 if(ss!=null){ 1149 result.setProducerState(ss.getProducerState(id)); 1150 ProducerState producerState=ss.getProducerState(id); 1151 if(producerState!=null&&producerState.getInfo()!=null){ 1152 ProducerInfo info=producerState.getInfo(); 1153 result.setMutable(info.getDestination()==null||info.getDestination().isComposite()); 1154 } 1155 } 1156 producerExchanges.put(id,result); 1157 } 1158 } else { 1159 context = result.getConnectionContext(); 1160 } 1161 return result; 1162 } 1163 1164 private void removeProducerBrokerExchange(ProducerId id) { 1165 synchronized(producerExchanges) { 1166 producerExchanges.remove(id); 1167 } 1168 } 1169 1170 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id){ 1171 ConsumerBrokerExchange result=consumerExchanges.get(id); 1172 if(result==null){ 1173 synchronized(consumerExchanges){ 1174 result=new ConsumerBrokerExchange(); 1175 ConnectionState state=lookupConnectionState(id); 1176 context=state.getContext(); 1177 result.setConnectionContext(context); 1178 SessionState ss=state.getSessionState(id.getParentId()); 1179 if(ss!=null){ 1180 ConsumerState cs=ss.getConsumerState(id); 1181 if(cs!=null){ 1182 ConsumerInfo info=cs.getInfo(); 1183 if(info!=null){ 1184 if(info.getDestination()!=null&&info.getDestination().isPattern()){ 1185 result.setWildcard(true); 1186 } 1187 } 1188 } 1189 } 1190 consumerExchanges.put(id,result); 1191 } 1192 } 1193 return result; 1194 } 1195 1196 private void removeConsumerBrokerExchange(ConsumerId id) { 1197 synchronized(consumerExchanges) { 1198 consumerExchanges.remove(id); 1199 } 1200 } 1201 1202 protected void disposeTransport() { 1203 if( transportDisposed.compareAndSet(false, true) ) { 1204 try { 1205 transport.stop(); 1206 active = false; 1207 log.debug("Stopped connection: "+transport.getRemoteAddress()); 1208 } catch (Exception e) { 1209 log.debug("Could not stop transport: "+e,e); 1210 } 1211 } 1212 } 1213 1214 public int getProtocolVersion() { 1215 return protocolVersion.get(); 1216 } 1217 1218 public Response processControlCommand(ControlCommand command) throws Exception { 1219 if (command.equals("shutdown")) 1220 System.exit(0); 1221 return null; 1222 } 1223 1224 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1225 return null; 1226 } 1227 1228 public Response processConnectionControl(ConnectionControl control) throws Exception { 1229 return null; 1230 } 1231 1232 public Response processConnectionError(ConnectionError error) throws Exception { 1233 return null; 1234 } 1235 1236 public Response processConsumerControl(ConsumerControl control) throws Exception { 1237 return null; 1238 } 1239 1240} 1241 | Popular Tags |