KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > TransportConnection


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker;
16
17 import java.io.IOException JavaDoc;
18 import java.util.ArrayList JavaDoc;
19 import java.util.Collections JavaDoc;
20 import java.util.HashMap JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.LinkedList JavaDoc;
23 import java.util.List JavaDoc;
24 import java.util.Map JavaDoc;
25 import java.util.Properties JavaDoc;
26 import java.util.concurrent.ConcurrentHashMap JavaDoc;
27 import java.util.concurrent.CountDownLatch JavaDoc;
28 import java.util.concurrent.TimeUnit JavaDoc;
29 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
30 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
31 import java.util.concurrent.atomic.AtomicReference JavaDoc;
32
33 import org.apache.activemq.Service;
34 import org.apache.activemq.broker.ft.MasterBroker;
35 import org.apache.activemq.broker.region.ConnectionStatistics;
36 import org.apache.activemq.broker.region.RegionBroker;
37 import org.apache.activemq.command.BrokerInfo;
38 import org.apache.activemq.command.Command;
39 import org.apache.activemq.command.CommandTypes;
40 import org.apache.activemq.command.ConnectionControl;
41 import org.apache.activemq.command.ConnectionError;
42 import org.apache.activemq.command.ConnectionId;
43 import org.apache.activemq.command.ConnectionInfo;
44 import org.apache.activemq.command.ConsumerControl;
45 import org.apache.activemq.command.ConsumerId;
46 import org.apache.activemq.command.ConsumerInfo;
47 import org.apache.activemq.command.ControlCommand;
48 import org.apache.activemq.command.DataArrayResponse;
49 import org.apache.activemq.command.DestinationInfo;
50 import org.apache.activemq.command.ExceptionResponse;
51 import org.apache.activemq.command.FlushCommand;
52 import org.apache.activemq.command.IntegerResponse;
53 import org.apache.activemq.command.KeepAliveInfo;
54 import org.apache.activemq.command.Message;
55 import org.apache.activemq.command.MessageAck;
56 import org.apache.activemq.command.MessageDispatch;
57 import org.apache.activemq.command.MessageDispatchNotification;
58 import org.apache.activemq.command.MessagePull;
59 import org.apache.activemq.command.ProducerAck;
60 import org.apache.activemq.command.ProducerId;
61 import org.apache.activemq.command.ProducerInfo;
62 import org.apache.activemq.command.RemoveSubscriptionInfo;
63 import org.apache.activemq.command.Response;
64 import org.apache.activemq.command.SessionId;
65 import org.apache.activemq.command.SessionInfo;
66 import org.apache.activemq.command.ShutdownInfo;
67 import org.apache.activemq.command.TransactionId;
68 import org.apache.activemq.command.TransactionInfo;
69 import org.apache.activemq.command.WireFormatInfo;
70 import org.apache.activemq.network.DemandForwardingBridge;
71 import org.apache.activemq.network.NetworkBridgeConfiguration;
72 import org.apache.activemq.network.NetworkBridgeFactory;
73 import org.apache.activemq.security.MessageAuthorizationPolicy;
74 import org.apache.activemq.state.CommandVisitor;
75 import org.apache.activemq.state.ConsumerState;
76 import org.apache.activemq.state.ProducerState;
77 import org.apache.activemq.state.SessionState;
78 import org.apache.activemq.state.TransactionState;
79 import org.apache.activemq.thread.Task;
80 import org.apache.activemq.thread.TaskRunner;
81 import org.apache.activemq.thread.TaskRunnerFactory;
82 import org.apache.activemq.transport.DefaultTransportListener;
83 import org.apache.activemq.transport.Transport;
84 import org.apache.activemq.transport.TransportFactory;
85 import org.apache.activemq.util.IntrospectionSupport;
86 import org.apache.activemq.util.MarshallingSupport;
87 import org.apache.activemq.util.ServiceSupport;
88 import org.apache.commons.logging.Log;
89 import org.apache.commons.logging.LogFactory;
90
91 /**
92  * @version $Revision: 1.8 $
93  */

94 public class TransportConnection implements Service,Connection,Task,CommandVisitor{
95
96     private static final Log log=LogFactory.getLog(TransportConnection.class);
97     private static final Log transportLog=LogFactory.getLog(TransportConnection.class.getName()+".Transport");
98     private static final Log serviceLog=LogFactory.getLog(TransportConnection.class.getName()+".Service");
99     // Keeps track of the broker and connector that created this connection.
100
protected final Broker broker;
101     private MasterBroker masterBroker;
102     protected final TransportConnector connector;
103     private final Transport transport;
104     private MessageAuthorizationPolicy messageAuthorizationPolicy;
105     // Keeps track of the state of the connections.
106
protected final ConcurrentHashMap JavaDoc localConnectionStates=new ConcurrentHashMap JavaDoc();
107     protected final Map JavaDoc brokerConnectionStates;
108     // The broker and wireformat info that was exchanged.
109
protected BrokerInfo brokerInfo;
110     private WireFormatInfo wireFormatInfo;
111     // Used to do async dispatch.. this should perhaps be pushed down into the transport layer..
112
protected final List JavaDoc dispatchQueue=Collections.synchronizedList(new LinkedList JavaDoc());
113     protected final TaskRunner taskRunner;
114     protected final AtomicReference JavaDoc transportException = new AtomicReference JavaDoc();
115     private boolean inServiceException=false;
116     private ConnectionStatistics statistics=new ConnectionStatistics();
117     private boolean manageable;
118     private boolean slow;
119     private boolean markedCandidate;
120     private boolean blockedCandidate;
121     private boolean blocked;
122     private boolean connected;
123     private boolean active;
124     private boolean starting;
125     private boolean pendingStop;
126     private long timeStamp=0;
127     private final AtomicBoolean JavaDoc stopped = new AtomicBoolean JavaDoc(false);
128     private final AtomicBoolean JavaDoc transportDisposed = new AtomicBoolean JavaDoc();
129     private final AtomicBoolean JavaDoc disposed=new AtomicBoolean JavaDoc(false);
130     private CountDownLatch JavaDoc stopLatch=new CountDownLatch JavaDoc(1);
131     private final AtomicBoolean JavaDoc asyncException=new AtomicBoolean JavaDoc(false);
132     private final Map JavaDoc<ProducerId,ProducerBrokerExchange>producerExchanges = new HashMap JavaDoc<ProducerId,ProducerBrokerExchange>();
133     private final Map JavaDoc<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap JavaDoc<ConsumerId,ConsumerBrokerExchange>();
134     private CountDownLatch JavaDoc dispatchStoppedLatch = new CountDownLatch JavaDoc(1);
135     protected AtomicBoolean JavaDoc dispatchStopped=new AtomicBoolean JavaDoc(false);
136     private ConnectionContext context;
137     private boolean networkConnection;
138     private AtomicInteger JavaDoc protocolVersion=new AtomicInteger JavaDoc(CommandTypes.PROTOCOL_VERSION);
139     private DemandForwardingBridge duplexBridge = null;
140     
141     static class ConnectionState extends org.apache.activemq.state.ConnectionState{
142
143         private final ConnectionContext context;
144         TransportConnection connection;
145
146         public ConnectionState(ConnectionInfo info,ConnectionContext context,TransportConnection connection){
147             super(info);
148             this.context=context;
149             this.connection=connection;
150         }
151
152         public ConnectionContext getContext(){
153             return context;
154         }
155
156         public TransportConnection getConnection(){
157             return connection;
158         }
159     }
160
161     /**
162      * @param connector
163      * @param transport
164      * @param broker
165      * @param taskRunnerFactory - can be null if you want direct dispatch to the transport else commands are sent async.
166      */

167     public TransportConnection(TransportConnector connector,final Transport transport,Broker broker,
168             TaskRunnerFactory taskRunnerFactory){
169         this.connector=connector;
170         this.broker=broker;
171         RegionBroker rb=(RegionBroker)broker.getAdaptor(RegionBroker.class);
172         brokerConnectionStates=rb.getConnectionStates();
173         if(connector!=null){
174             this.statistics.setParent(connector.getStatistics());
175         }
176         if(taskRunnerFactory!=null){
177             taskRunner=taskRunnerFactory.createTaskRunner(this,"ActiveMQ Connection Dispatcher: "
178                     +System.identityHashCode(this));
179         }else{
180             taskRunner=null;
181         }
182         connector.setBrokerName(broker.getBrokerName());
183         this.transport=transport;
184         this.transport.setTransportListener(new DefaultTransportListener(){
185
186             public void onCommand(Object JavaDoc o){
187                 Command command=(Command)o;
188                 Response response=service(command);
189                 if(response!=null){
190                     dispatchSync(response);
191                 }
192             }
193
194             public void onException(IOException JavaDoc exception){
195                 serviceTransportException(exception);
196             }
197         });
198         connected=true;
199     }
200
201     /**
202      * Returns the number of messages to be dispatched to this connection
203      */

204     public int getDispatchQueueSize(){
205         return dispatchQueue.size();
206     }
207
208     public void serviceTransportException(IOException JavaDoc e){
209         if(!disposed.get()){
210             transportException.set(e);
211             if(transportLog.isDebugEnabled())
212                 transportLog.debug("Transport failed: "+e,e);
213             ServiceSupport.dispose(this);
214         }
215     }
216
217     /**
218      * Calls the serviceException method in an async thread. Since handling a service exception closes a socket, we
219      * should not tie up broker threads since client sockets may hang or cause deadlocks.
220      *
221      * @param e
222      */

223     public void serviceExceptionAsync(final IOException JavaDoc e){
224         if(asyncException.compareAndSet(false,true)){
225             new Thread JavaDoc("Async Exception Handler"){
226
227                 public void run(){
228                     serviceException(e);
229                 }
230             }.start();
231         }
232     }
233
234     /**
235      * Closes a clients connection due to a detected error.
236      *
237      * Errors are ignored if: the client is closing or broker is closing. Otherwise, the connection error transmitted to
238      * the client before stopping it's transport.
239      */

240     public void serviceException(Throwable JavaDoc e){
241         // are we a transport exception such as not being able to dispatch
242
// synchronously to a transport
243
if(e instanceof IOException JavaDoc){
244             serviceTransportException((IOException JavaDoc)e);
245         }
246         // Handle the case where the broker is stopped
247
// But the client is still connected.
248
else if(e.getClass()==BrokerStoppedException.class){
249             if(!disposed.get()){
250                 if(serviceLog.isDebugEnabled())
251                     serviceLog.debug("Broker has been stopped. Notifying client and closing his connection.");
252                 ConnectionError ce=new ConnectionError();
253                 ce.setException(e);
254                 dispatchSync(ce);
255                 // Wait a little bit to try to get the output buffer to flush the exption notification to the client.
256
try{
257                     Thread.sleep(500);
258                 }catch(InterruptedException JavaDoc ie){
259                     Thread.currentThread().interrupt();
260                 }
261                 // Worst case is we just kill the connection before the notification gets to him.
262
ServiceSupport.dispose(this);
263             }
264         }else if(!disposed.get()&&!inServiceException){
265             inServiceException=true;
266             try{
267                 serviceLog.error("Async error occurred: "+e,e);
268                 ConnectionError ce=new ConnectionError();
269                 ce.setException(e);
270                 dispatchAsync(ce);
271             }finally{
272                 inServiceException=false;
273             }
274         }
275     }
276
277     public Response service(Command command){
278         Response response=null;
279         boolean responseRequired=command.isResponseRequired();
280         int commandId=command.getCommandId();
281         try{
282             response=command.visit(this);
283         }catch(Throwable JavaDoc e){
284             if(responseRequired){
285                 if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class)
286                     serviceLog.debug("Error occured while processing sync command: "+e,e);
287                 response=new ExceptionResponse(e);
288             }else{
289                 serviceException(e);
290             }
291         }
292         if(responseRequired){
293             if(response==null){
294                 response=new Response();
295             }
296             response.setCorrelationId(commandId);
297         }
298         
299         // The context may have been flagged so that the response is not sent.
300
if( context!=null ) {
301             if( context.isDontSendReponse() ) {
302                 context.setDontSendReponse(false);
303                 response=null;
304             }
305             context=null;
306         }
307         
308         return response;
309     }
310
311     protected ConnectionState lookupConnectionState(ConsumerId id){
312         ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
313         if(cs==null)
314             throw new IllegalStateException JavaDoc("Cannot lookup a consumer from a connection that had not been registered: "
315                     +id.getParentId().getParentId());
316         return cs;
317     }
318
319     protected ConnectionState lookupConnectionState(ProducerId id){
320         ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
321         if(cs==null)
322             throw new IllegalStateException JavaDoc("Cannot lookup a producer from a connection that had not been registered: "
323                     +id.getParentId().getParentId());
324         return cs;
325     }
326
327     protected ConnectionState lookupConnectionState(SessionId id){
328         ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId());
329         if(cs==null)
330             throw new IllegalStateException JavaDoc("Cannot lookup a session from a connection that had not been registered: "
331                     +id.getParentId());
332         return cs;
333     }
334
335     protected ConnectionState lookupConnectionState(ConnectionId connectionId){
336         ConnectionState cs=(ConnectionState)localConnectionStates.get(connectionId);
337         if(cs==null)
338             throw new IllegalStateException JavaDoc("Cannot lookup a connection that had not been registered: "+connectionId);
339         return cs;
340     }
341
342     public Response processKeepAlive(KeepAliveInfo info) throws Exception JavaDoc{
343         return null;
344     }
345
346     public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception JavaDoc{
347         broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(),info);
348         return null;
349     }
350
351     public Response processWireFormat(WireFormatInfo info) throws Exception JavaDoc{
352         wireFormatInfo=info;
353         protocolVersion.set(info.getVersion());
354         return null;
355     }
356
357     public Response processShutdown(ShutdownInfo info) throws Exception JavaDoc{
358         stop();
359         return null;
360     }
361
362     public Response processFlush(FlushCommand command) throws Exception JavaDoc{
363         return null;
364     }
365
366     synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception JavaDoc{
367         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
368         context=null;
369         if(cs!=null){
370             context=cs.getContext();
371         }
372         // Avoid replaying dup commands
373
if(cs.getTransactionState(info.getTransactionId())==null){
374             cs.addTransactionState(info.getTransactionId());
375             broker.beginTransaction(context,info.getTransactionId());
376         }
377         return null;
378     }
379
380     synchronized public Response processEndTransaction(TransactionInfo info) throws Exception JavaDoc{
381         // No need to do anything. This packet is just sent by the client
382
// make sure he is synced with the server as commit command could
383
// come from a different connection.
384
return null;
385     }
386
387     synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception JavaDoc{
388         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
389         context=null;
390         if(cs!=null){
391             context=cs.getContext();
392         }
393         TransactionState transactionState=cs.getTransactionState(info.getTransactionId());
394         if(transactionState==null)
395             throw new IllegalStateException JavaDoc("Cannot prepare a transaction that had not been started: "
396                     +info.getTransactionId());
397         // Avoid dups.
398
if(!transactionState.isPrepared()){
399             transactionState.setPrepared(true);
400             int result=broker.prepareTransaction(context,info.getTransactionId());
401             transactionState.setPreparedResult(result);
402             IntegerResponse response=new IntegerResponse(result);
403             return response;
404         }else{
405             IntegerResponse response=new IntegerResponse(transactionState.getPreparedResult());
406             return response;
407         }
408     }
409
410     synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception JavaDoc{
411         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
412         context=null;
413         if(cs!=null){
414             context=cs.getContext();
415         }
416         cs.removeTransactionState(info.getTransactionId());
417         broker.commitTransaction(context,info.getTransactionId(),true);
418         return null;
419     }
420
421     synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception JavaDoc{
422         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
423         context=null;
424         if(cs!=null){
425             context=cs.getContext();
426         }
427         cs.removeTransactionState(info.getTransactionId());
428         broker.commitTransaction(context,info.getTransactionId(),false);
429         return null;
430     }
431
432     synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception JavaDoc{
433         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
434         context=null;
435         if(cs!=null){
436             context=cs.getContext();
437         }
438         cs.removeTransactionState(info.getTransactionId());
439         broker.rollbackTransaction(context,info.getTransactionId());
440         return null;
441     }
442
443     synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception JavaDoc{
444         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
445         context=null;
446         if(cs!=null){
447             context=cs.getContext();
448         }
449         broker.forgetTransaction(context,info.getTransactionId());
450         return null;
451     }
452
453     synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception JavaDoc{
454         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
455         context=null;
456         if(cs!=null){
457             context=cs.getContext();
458         }
459         TransactionId[] preparedTransactions=broker.getPreparedTransactions(context);
460         return new DataArrayResponse(preparedTransactions);
461     }
462
463     public Response processMessage(Message messageSend) throws Exception JavaDoc{
464         ProducerId producerId=messageSend.getProducerId();
465         ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId);
466         ProducerState producerState = null;
467         if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){
468             producerState=producerExchange.getProducerState();
469         }
470         if(producerState!=null){
471             long seq=messageSend.getMessageId().getProducerSequenceId();
472             if(seq>producerState.getLastSequenceId()){
473                 producerState.setLastSequenceId(seq);
474                 broker.send(producerExchange,messageSend);
475             }else {
476                 if (log.isDebugEnabled()) {
477                     log.debug("Discarding duplicate: " + messageSend);
478                 }
479             }
480         }else{
481             // producer not local to this broker
482
broker.send(producerExchange,messageSend);
483         }
484         return null;
485     }
486
487     public Response processMessageAck(MessageAck ack) throws Exception JavaDoc{
488         ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
489         broker.acknowledge(consumerExchange,ack);
490         return null;
491     }
492
493     public Response processMessagePull(MessagePull pull) throws Exception JavaDoc{
494         return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(),pull);
495     }
496
497     public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception JavaDoc{
498         broker.processDispatchNotification(notification);
499         return null;
500     }
501
502     synchronized public Response processAddDestination(DestinationInfo info) throws Exception JavaDoc{
503         ConnectionState cs=lookupConnectionState(info.getConnectionId());
504         broker.addDestinationInfo(cs.getContext(),info);
505         if(info.getDestination().isTemporary()){
506             cs.addTempDestination(info);
507         }
508         return null;
509     }
510
511     synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception JavaDoc{
512         ConnectionState cs=lookupConnectionState(info.getConnectionId());
513         broker.removeDestinationInfo(cs.getContext(),info);
514         if(info.getDestination().isTemporary()){
515             cs.removeTempDestination(info.getDestination());
516         }
517         return null;
518     }
519
520     synchronized public Response processAddProducer(ProducerInfo info) throws Exception JavaDoc{
521         SessionId sessionId=info.getProducerId().getParentId();
522         ConnectionId connectionId=sessionId.getParentId();
523         ConnectionState cs=lookupConnectionState(connectionId);
524         SessionState ss=cs.getSessionState(sessionId);
525         if(ss==null)
526             throw new IllegalStateException JavaDoc("Cannot add a producer to a session that had not been registered: "
527                     +sessionId);
528         // Avoid replaying dup commands
529
if(!ss.getProducerIds().contains(info.getProducerId())){
530             broker.addProducer(cs.getContext(),info);
531             try{
532                 ss.addProducer(info);
533             }catch(IllegalStateException JavaDoc e){
534                 broker.removeProducer(cs.getContext(),info);
535             }
536         }
537         return null;
538     }
539
540     synchronized public Response processRemoveProducer(ProducerId id) throws Exception JavaDoc{
541         SessionId sessionId=id.getParentId();
542         ConnectionId connectionId=sessionId.getParentId();
543         ConnectionState cs=lookupConnectionState(connectionId);
544         SessionState ss=cs.getSessionState(sessionId);
545         if(ss==null)
546             throw new IllegalStateException JavaDoc("Cannot remove a producer from a session that had not been registered: "
547                     +sessionId);
548         ProducerState ps=ss.removeProducer(id);
549         if(ps==null)
550             throw new IllegalStateException JavaDoc("Cannot remove a producer that had not been registered: "+id);
551         removeProducerBrokerExchange(id);
552         broker.removeProducer(cs.getContext(),ps.getInfo());
553         return null;
554     }
555
556     synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception JavaDoc{
557         SessionId sessionId=info.getConsumerId().getParentId();
558         ConnectionId connectionId=sessionId.getParentId();
559         ConnectionState cs=lookupConnectionState(connectionId);
560         SessionState ss=cs.getSessionState(sessionId);
561         if(ss==null)
562             throw new IllegalStateException JavaDoc("Cannot add a consumer to a session that had not been registered: "
563                     +sessionId);
564         // Avoid replaying dup commands
565
if(!ss.getConsumerIds().contains(info.getConsumerId())){
566             broker.addConsumer(cs.getContext(),info);
567             try{
568                 ss.addConsumer(info);
569             }catch(IllegalStateException JavaDoc e){
570                 broker.removeConsumer(cs.getContext(),info);
571             }
572         }
573         return null;
574     }
575
576     synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception JavaDoc{
577         SessionId sessionId=id.getParentId();
578         ConnectionId connectionId=sessionId.getParentId();
579         ConnectionState cs=lookupConnectionState(connectionId);
580         SessionState ss=cs.getSessionState(sessionId);
581         if(ss==null)
582             throw new IllegalStateException JavaDoc("Cannot remove a consumer from a session that had not been registered: "
583                     +sessionId);
584         ConsumerState consumerState=ss.removeConsumer(id);
585         if(consumerState==null)
586             throw new IllegalStateException JavaDoc("Cannot remove a consumer that had not been registered: "+id);
587         broker.removeConsumer(cs.getContext(),consumerState.getInfo());
588         removeConsumerBrokerExchange(id);
589         return null;
590     }
591
592     synchronized public Response processAddSession(SessionInfo info) throws Exception JavaDoc{
593         ConnectionId connectionId=info.getSessionId().getParentId();
594         ConnectionState cs=lookupConnectionState(connectionId);
595         // Avoid replaying dup commands
596
if(!cs.getSessionIds().contains(info.getSessionId())){
597             broker.addSession(cs.getContext(),info);
598             try{
599                 cs.addSession(info);
600             }catch(IllegalStateException JavaDoc e){
601                 broker.removeSession(cs.getContext(),info);
602             }
603         }
604         return null;
605     }
606
607     synchronized public Response processRemoveSession(SessionId id) throws Exception JavaDoc{
608         ConnectionId connectionId=id.getParentId();
609         ConnectionState cs=lookupConnectionState(connectionId);
610         SessionState session=cs.getSessionState(id);
611         if(session==null)
612             throw new IllegalStateException JavaDoc("Cannot remove session that had not been registered: "+id);
613         // Don't let new consumers or producers get added while we are closing this down.
614
session.shutdown();
615         // Cascade the connection stop to the consumers and producers.
616
for(Iterator JavaDoc iter=session.getConsumerIds().iterator();iter.hasNext();){
617             ConsumerId consumerId=(ConsumerId)iter.next();
618             try{
619                 processRemoveConsumer(consumerId);
620             }catch(Throwable JavaDoc e){
621                 log.warn("Failed to remove consumer: "+consumerId+". Reason: "+e,e);
622             }
623         }
624         for(Iterator JavaDoc iter=session.getProducerIds().iterator();iter.hasNext();){
625             ProducerId producerId=(ProducerId)iter.next();
626             try{
627                 processRemoveProducer(producerId);
628             }catch(Throwable JavaDoc e){
629                 log.warn("Failed to remove producer: "+producerId+". Reason: "+e,e);
630             }
631         }
632         cs.removeSession(id);
633         broker.removeSession(cs.getContext(),session.getInfo());
634         return null;
635     }
636
637     synchronized public Response processAddConnection(ConnectionInfo info) throws Exception JavaDoc{
638         ConnectionState state=(ConnectionState)brokerConnectionStates.get(info.getConnectionId());
639         if(state!=null){
640             // ConnectionInfo replay?? Chances are that it's a client reconnecting,
641
// and we have not detected that that old connection died.. Kill the old connection
642
// to make sure our state is in sync with the client.
643
if(this!=state.getConnection()){
644                 log.debug("Killing previous stale connection: "+state.getConnection());
645                 state.getConnection().stop();
646                 if(!state.getConnection().stopLatch.await(15,TimeUnit.SECONDS)){
647                     throw new Exception JavaDoc("Previous connection could not be clean up.");
648                 }
649             }
650         }
651         log.debug("Setting up new connection: "+this);
652         // Setup the context.
653
String JavaDoc clientId=info.getClientId();
654         context=new ConnectionContext();
655         context.setConnection(this);
656         context.setBroker(broker);
657         context.setConnector(connector);
658         context.setTransactions(new ConcurrentHashMap JavaDoc());
659         context.setClientId(clientId);
660         context.setUserName(info.getUserName());
661         context.setConnectionId(info.getConnectionId());
662         context.setWireFormatInfo(wireFormatInfo);
663         context.setNetworkConnection(networkConnection);
664         context.incrementReference();
665         this.manageable=info.isManageable();
666         state=new ConnectionState(info,context,this);
667         brokerConnectionStates.put(info.getConnectionId(),state);
668         localConnectionStates.put(info.getConnectionId(),state);
669         broker.addConnection(context,info);
670         if(info.isManageable()&&broker.isFaultTolerantConfiguration()){
671             // send ConnectionCommand
672
ConnectionControl command=new ConnectionControl();
673             command.setFaultTolerant(broker.isFaultTolerantConfiguration());
674             dispatchAsync(command);
675         }
676         return null;
677     }
678
679     synchronized public Response processRemoveConnection(ConnectionId id){
680         ConnectionState cs=lookupConnectionState(id);
681         // Don't allow things to be added to the connection state while we are shutting down.
682
cs.shutdown();
683         // Cascade the connection stop to the sessions.
684
for(Iterator JavaDoc iter=cs.getSessionIds().iterator();iter.hasNext();){
685             SessionId sessionId=(SessionId)iter.next();
686             try{
687                 processRemoveSession(sessionId);
688             }catch(Throwable JavaDoc e){
689                 serviceLog.warn("Failed to remove session "+sessionId,e);
690             }
691         }
692         // Cascade the connection stop to temp destinations.
693
for(Iterator JavaDoc iter=cs.getTempDesinations().iterator();iter.hasNext();){
694             DestinationInfo di=(DestinationInfo)iter.next();
695             try{
696                 broker.removeDestination(cs.getContext(),di.getDestination(),0);
697             }catch(Throwable JavaDoc e){
698                 serviceLog.warn("Failed to remove tmp destination "+di.getDestination(),e);
699             }
700             iter.remove();
701         }
702         try{
703             broker.removeConnection(cs.getContext(),cs.getInfo(),null);
704         }catch(Throwable JavaDoc e){
705             serviceLog.warn("Failed to remove connection "+cs.getInfo(),e);
706         }
707         ConnectionState state=(ConnectionState)localConnectionStates.remove(id);
708         if(state!=null){
709             // If we are the last reference, we should remove the state
710
// from the broker.
711
if(state.getContext().decrementReference()==0){
712                 brokerConnectionStates.remove(id);
713             }
714         }
715         return null;
716     }
717
718     
719     public Response processProducerAck(ProducerAck ack) throws Exception JavaDoc {
720         // A broker should not get ProducerAck messages.
721
return null;
722     }
723
724     public Connector getConnector(){
725         return connector;
726     }
727
728     public void dispatchSync(Command message){
729         getStatistics().getEnqueues().increment();
730         try {
731             processDispatch(message);
732         } catch (IOException JavaDoc e) {
733             serviceExceptionAsync(e);
734         }
735     }
736
737     public void dispatchAsync(Command message){
738         if( !disposed.get() ) {
739             getStatistics().getEnqueues().increment();
740             if( taskRunner==null ) {
741                 dispatchSync( message );
742             } else {
743                 dispatchQueue.add(message);
744                 try {
745                     taskRunner.wakeup();
746                 } catch (InterruptedException JavaDoc e) {
747                     Thread.currentThread().interrupt();
748                 }
749             }
750         } else {
751             if(message.isMessageDispatch()) {
752                 MessageDispatch md=(MessageDispatch) message;
753                 Runnable JavaDoc sub=md.getTransmitCallback();
754                 broker.processDispatch(md);
755                 if(sub!=null){
756                     sub.run();
757                 }
758              }
759         }
760     }
761
762     protected void processDispatch(Command command) throws IOException JavaDoc {
763         try {
764             if( !disposed.get() ) {
765                  dispatch(command);
766             }
767        } finally {
768
769             if(command.isMessageDispatch()){
770                 MessageDispatch md=(MessageDispatch) command;
771                 Runnable JavaDoc sub=md.getTransmitCallback();
772                 broker.processDispatch(md);
773                 if(sub!=null){
774                     sub.run();
775                 }
776             }
777
778             getStatistics().getDequeues().increment();
779         }
780      }
781
782
783
784     public boolean iterate(){
785         try {
786             if( disposed.get() ) {
787                  if( dispatchStopped.compareAndSet(false, true)) {
788                      if( transportException.get()==null ) {
789                          try {
790                              dispatch(new ShutdownInfo());
791                          } catch (Throwable JavaDoc ignore) {
792                          }
793                      }
794                      dispatchStoppedLatch.countDown();
795                  }
796                  return false;
797              }
798
799              if( !dispatchStopped.get() ) {
800
801                  if( dispatchQueue.isEmpty() ) {
802                      return false;
803                  } else {
804                      Command command = (Command) dispatchQueue.remove(0);
805                      processDispatch( command );
806                      return true;
807                  }
808              } else {
809                  return false;
810              }
811
812          } catch (IOException JavaDoc e) {
813              if( dispatchStopped.compareAndSet(false, true)) {
814                  dispatchStoppedLatch.countDown();
815              }
816              serviceExceptionAsync(e);
817              return false;
818          }
819     }
820
821     /**
822      * Returns the statistics for this connection
823      */

824     public ConnectionStatistics getStatistics(){
825         return statistics;
826     }
827
828     public MessageAuthorizationPolicy getMessageAuthorizationPolicy(){
829         return messageAuthorizationPolicy;
830     }
831
832     public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy){
833         this.messageAuthorizationPolicy=messageAuthorizationPolicy;
834     }
835
836     public boolean isManageable(){
837         return manageable;
838     }
839
840     public synchronized void start() throws Exception JavaDoc{
841         starting=true;
842         try{
843             transport.start();
844             active=true;
845             this.processDispatch(connector.getBrokerInfo());
846             connector.onStarted(this);
847         }finally{
848             // stop() can be called from within the above block,
849
// but we want to be sure start() completes before
850
// stop() runs, so queue the stop until right now:
851
starting=false;
852             if(pendingStop){
853                 log.debug("Calling the delayed stop()");
854                 stop();
855             }
856         }
857     }
858
859     public void stop() throws Exception JavaDoc{
860         // If we're in the middle of starting
861
// then go no further... for now.
862
synchronized(this){
863             pendingStop=true;
864             if(starting){
865                 log.debug("stop() called in the middle of start(). Delaying...");
866                 return;
867             }
868         }
869         if(stopped.compareAndSet(false,true)){
870             log.debug("Stopping connection: "+transport.getRemoteAddress());
871             connector.onStopped(this);
872             try{
873                 if(masterBroker!=null){
874                     masterBroker.stop();
875                 }
876                 if (duplexBridge != null) {
877                     duplexBridge.stop();
878                 }
879                 // If the transport has not failed yet,
880
// notify the peer that we are doing a normal shutdown.
881
if(transportException==null){
882                     transport.oneway(new ShutdownInfo());
883                 }
884             }catch(Exception JavaDoc ignore){
885                 // ignore.printStackTrace();
886
}
887             transport.stop();
888             active=false;
889             if(disposed.compareAndSet(false,true)){
890
891                 // Let all the connection contexts know we are shutting down
892
// so that in progress operations can notice and unblock.
893
ArrayList JavaDoc l=new ArrayList JavaDoc(localConnectionStates.values());
894                  for(Iterator JavaDoc iter=l.iterator();iter.hasNext();){
895                      ConnectionState cs=(ConnectionState) iter.next();
896                      cs.getContext().getStopping().set(true);
897                  }
898                 
899                 if( taskRunner!=null ) {
900                     taskRunner.wakeup();
901                     // Give it a change to stop gracefully.
902
dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
903                     disposeTransport();
904                     taskRunner.shutdown();
905                 } else {
906                     disposeTransport();
907                 }
908
909                 if( taskRunner!=null )
910                     taskRunner.shutdown();
911                 
912                 // Run the MessageDispatch callbacks so that message references get cleaned up.
913
for (Iterator JavaDoc iter = dispatchQueue.iterator(); iter.hasNext();) {
914                     Command command = (Command) iter.next();
915                     if(command.isMessageDispatch()) {
916                         MessageDispatch md=(MessageDispatch) command;
917                         Runnable JavaDoc sub=md.getTransmitCallback();
918                         broker.processDispatch(md);
919                         if(sub!=null){
920                             sub.run();
921                         }
922                     }
923                 }
924                 //
925
// Remove all logical connection associated with this connection
926
// from the broker.
927
if(!broker.isStopped()){
928                     l=new ArrayList JavaDoc(localConnectionStates.keySet());
929                     for(Iterator JavaDoc iter=l.iterator();iter.hasNext();){
930                         ConnectionId connectionId=(ConnectionId)iter.next();
931                         try{
932                             log.debug("Cleaning up connection resources.");
933                             processRemoveConnection(connectionId);
934                         }catch(Throwable JavaDoc ignore){
935                             ignore.printStackTrace();
936                         }
937                     }
938                     if(brokerInfo!=null){
939                         broker.removeBroker(this,brokerInfo);
940                     }
941                 }
942                 stopLatch.countDown();
943             }
944         }
945     }
946
947     /**
948      * @return Returns the blockedCandidate.
949      */

950     public boolean isBlockedCandidate(){
951         return blockedCandidate;
952     }
953
954     /**
955      * @param blockedCandidate The blockedCandidate to set.
956      */

957     public void setBlockedCandidate(boolean blockedCandidate){
958         this.blockedCandidate=blockedCandidate;
959     }
960
961     /**
962      * @return Returns the markedCandidate.
963      */

964     public boolean isMarkedCandidate(){
965         return markedCandidate;
966     }
967
968     /**
969      * @param markedCandidate The markedCandidate to set.
970      */

971     public void setMarkedCandidate(boolean markedCandidate){
972         this.markedCandidate=markedCandidate;
973         if(!markedCandidate){
974             timeStamp=0;
975             blockedCandidate=false;
976         }
977     }
978
979     /**
980      * @param slow The slow to set.
981      */

982     public void setSlow(boolean slow){
983         this.slow=slow;
984     }
985
986     /**
987      * @return true if the Connection is slow
988      */

989     public boolean isSlow(){
990         return slow;
991     }
992
993     /**
994      * @return true if the Connection is potentially blocked
995      */

996     public boolean isMarkedBlockedCandidate(){
997         return markedCandidate;
998     }
999
1000    /**
1001     * Mark the Connection, so we can deem if it's collectable on the next sweep
1002     */

1003    public void doMark(){
1004        if(timeStamp==0){
1005            timeStamp=System.currentTimeMillis();
1006        }
1007    }
1008
1009    /**
1010     * @return if after being marked, the Connection is still writing
1011     */

1012    public boolean isBlocked(){
1013        return blocked;
1014    }
1015
1016    /**
1017     * @return true if the Connection is connected
1018     */

1019    public boolean isConnected(){
1020        return connected;
1021    }
1022
1023    /**
1024     * @param blocked The blocked to set.
1025     */

1026    public void setBlocked(boolean blocked){
1027        this.blocked=blocked;
1028    }
1029
1030    /**
1031     * @param connected The connected to set.
1032     */

1033    public void setConnected(boolean connected){
1034        this.connected=connected;
1035    }
1036
1037    /**
1038     * @return true if the Connection is active
1039     */

1040    public boolean isActive(){
1041        return active;
1042    }
1043
1044    /**
1045     * @param active The active to set.
1046     */

1047    public void setActive(boolean active){
1048        this.active=active;
1049    }
1050
1051    /**
1052     * @return true if the Connection is starting
1053     */

1054    public synchronized boolean isStarting(){
1055        return starting;
1056    }
1057
1058    synchronized protected void setStarting(boolean starting){
1059        this.starting=starting;
1060    }
1061
1062    /**
1063     * @return true if the Connection needs to stop
1064     */

1065    public synchronized boolean isPendingStop(){
1066        return pendingStop;
1067    }
1068
1069    protected synchronized void setPendingStop(boolean pendingStop){
1070        this.pendingStop=pendingStop;
1071    }
1072
1073    public Response processBrokerInfo(BrokerInfo info){
1074        if(info.isSlaveBroker()){
1075            // stream messages from this broker (the master) to
1076
// the slave
1077
MutableBrokerFilter parent=(MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class);
1078            masterBroker=new MasterBroker(parent,transport);
1079            masterBroker.startProcessing();
1080            log.info("Slave Broker "+info.getBrokerName()+" is attached");
1081        }else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1082            //so this TransportConnection is the rear end of a network bridge
1083
//We have been requested to create a two way pipe ...
1084
try{
1085                Properties JavaDoc props = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1086                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1087                IntrospectionSupport.setProperties(config,props,"");
1088                config.setBrokerName(broker.getBrokerName());
1089                Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
1090                duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,transport);
1091                //now turn duplex off this side
1092
duplexBridge.setCreatedByDuplex(true);
1093                duplexBridge.start();
1094                log.info("Created Duplex Bridge back to " + info.getBrokerName());
1095            }catch(Exception JavaDoc e){
1096               log.error("Creating duplex network bridge",e);
1097            }
1098        }
1099        // We only expect to get one broker info command per connection
1100
if(this.brokerInfo!=null){
1101            log.warn("Unexpected extra broker info command received: "+info);
1102            Thread.dumpStack();
1103        }
1104        this.brokerInfo=info;
1105        broker.addBroker(this,info);
1106        networkConnection = true;
1107        for (Iterator JavaDoc iter = localConnectionStates.values().iterator(); iter.hasNext();) {
1108            ConnectionState cs = (ConnectionState) iter.next();
1109            cs.getContext().setNetworkConnection(true);
1110        }
1111        
1112        return null;
1113    }
1114
1115    protected void dispatch(Command command) throws IOException JavaDoc{
1116        try{
1117            setMarkedCandidate(true);
1118            transport.oneway(command);
1119        }finally{
1120            setMarkedCandidate(false);
1121        }
1122    }
1123
1124    public String JavaDoc getRemoteAddress(){
1125        return transport.getRemoteAddress();
1126    }
1127    
1128    public String JavaDoc getConnectionId() {
1129        Iterator JavaDoc iterator = localConnectionStates.values().iterator();
1130        ConnectionState object = (ConnectionState) iterator.next();
1131        if( object == null ) {
1132            return null;
1133        }
1134        if( object.getInfo().getClientId() !=null )
1135            return object.getInfo().getClientId();
1136        return object.getInfo().getConnectionId().toString();
1137    }
1138    
1139    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
1140        ProducerBrokerExchange result=producerExchanges.get(id);
1141        if(result==null){
1142            synchronized(producerExchanges){
1143                result=new ProducerBrokerExchange();
1144                ConnectionState state=lookupConnectionState(id);
1145                context=state.getContext();
1146                result.setConnectionContext(context);
1147                SessionState ss=state.getSessionState(id.getParentId());
1148                if(ss!=null){
1149                    result.setProducerState(ss.getProducerState(id));
1150                    ProducerState producerState=ss.getProducerState(id);
1151                    if(producerState!=null&&producerState.getInfo()!=null){
1152                        ProducerInfo info=producerState.getInfo();
1153                        result.setMutable(info.getDestination()==null||info.getDestination().isComposite());
1154                    }
1155                }
1156                producerExchanges.put(id,result);
1157            }
1158        } else {
1159            context = result.getConnectionContext();
1160        }
1161        return result;
1162    }
1163    
1164    private void removeProducerBrokerExchange(ProducerId id) {
1165        synchronized(producerExchanges) {
1166            producerExchanges.remove(id);
1167        }
1168    }
1169    
1170    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id){
1171        ConsumerBrokerExchange result=consumerExchanges.get(id);
1172        if(result==null){
1173            synchronized(consumerExchanges){
1174                result=new ConsumerBrokerExchange();
1175                ConnectionState state=lookupConnectionState(id);
1176                context=state.getContext();
1177                result.setConnectionContext(context);
1178                SessionState ss=state.getSessionState(id.getParentId());
1179                if(ss!=null){
1180                    ConsumerState cs=ss.getConsumerState(id);
1181                    if(cs!=null){
1182                        ConsumerInfo info=cs.getInfo();
1183                        if(info!=null){
1184                            if(info.getDestination()!=null&&info.getDestination().isPattern()){
1185                                result.setWildcard(true);
1186                            }
1187                        }
1188                    }
1189                }
1190                consumerExchanges.put(id,result);
1191            }
1192        }
1193        return result;
1194    }
1195    
1196    private void removeConsumerBrokerExchange(ConsumerId id) {
1197        synchronized(consumerExchanges) {
1198            consumerExchanges.remove(id);
1199        }
1200    }
1201    
1202    protected void disposeTransport() {
1203        if( transportDisposed.compareAndSet(false, true) ) {
1204        try {
1205            transport.stop();
1206            active = false;
1207            log.debug("Stopped connection: "+transport.getRemoteAddress());
1208        } catch (Exception JavaDoc e) {
1209            log.debug("Could not stop transport: "+e,e);
1210        }
1211        }
1212    }
1213    
1214    public int getProtocolVersion() {
1215        return protocolVersion.get();
1216    }
1217
1218    public Response processControlCommand(ControlCommand command) throws Exception JavaDoc {
1219        if (command.equals("shutdown"))
1220            System.exit(0);
1221        return null;
1222    }
1223
1224    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception JavaDoc {
1225        return null;
1226    }
1227
1228    public Response processConnectionControl(ConnectionControl control) throws Exception JavaDoc {
1229        return null;
1230    }
1231
1232    public Response processConnectionError(ConnectionError error) throws Exception JavaDoc {
1233        return null;
1234    }
1235
1236    public Response processConsumerControl(ConsumerControl control) throws Exception JavaDoc {
1237        return null;
1238    }
1239
1240}
1241
Popular Tags