KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > ResponseCorrelator


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.transport;
16
17 import java.io.IOException JavaDoc;
18 import java.util.ArrayList JavaDoc;
19 import java.util.HashMap JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import java.util.Map JavaDoc;
22 import org.apache.activemq.command.Command;
23 import org.apache.activemq.command.ExceptionResponse;
24 import org.apache.activemq.command.Response;
25 import org.apache.activemq.util.IntSequenceGenerator;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import java.util.concurrent.ConcurrentHashMap JavaDoc;
29
30 /**
31  * Adds the incrementing sequence number to commands along with performing the corelation of responses to requests to
32  * create a blocking request-response semantics.
33  *
34  * @version $Revision: 1.4 $
35  */

36 public class ResponseCorrelator extends TransportFilter{
37
38     private static final Log log=LogFactory.getLog(ResponseCorrelator.class);
39     private final Map JavaDoc requestMap=new HashMap JavaDoc();
40     private IntSequenceGenerator sequenceGenerator;
41     private final boolean debug=log.isDebugEnabled();
42
43     public ResponseCorrelator(Transport next){
44         this(next,new IntSequenceGenerator());
45     }
46
47     public ResponseCorrelator(Transport next,IntSequenceGenerator sequenceGenerator){
48         super(next);
49         this.sequenceGenerator=sequenceGenerator;
50     }
51
52     public void oneway(Object JavaDoc o) throws IOException JavaDoc{
53         Command command=(Command)o;
54         command.setCommandId(sequenceGenerator.getNextSequenceId());
55         command.setResponseRequired(false);
56         next.oneway(command);
57     }
58
59     public FutureResponse asyncRequest(Object JavaDoc o,ResponseCallback responseCallback) throws IOException JavaDoc{
60         Command command=(Command)o;
61         command.setCommandId(sequenceGenerator.getNextSequenceId());
62         command.setResponseRequired(true);
63         FutureResponse future=new FutureResponse(responseCallback);
64         synchronized(requestMap){
65             requestMap.put(new Integer JavaDoc(command.getCommandId()),future);
66         }
67         next.oneway(command);
68         return future;
69     }
70
71     public Object JavaDoc request(Object JavaDoc command) throws IOException JavaDoc{
72         FutureResponse response=asyncRequest(command,null);
73         return response.getResult();
74     }
75
76     public Object JavaDoc request(Object JavaDoc command,int timeout) throws IOException JavaDoc{
77         FutureResponse response=asyncRequest(command,null);
78         return response.getResult(timeout);
79     }
80
81     public void onCommand(Object JavaDoc o){
82         Command command=(Command)o;
83         if(command.isResponse()){
84             Response response=(Response)command;
85             FutureResponse future=null;
86             synchronized(requestMap){
87                 future=(FutureResponse)requestMap.remove(new Integer JavaDoc(response.getCorrelationId()));
88             }
89             if(future!=null){
90                 future.set(response);
91             }else{
92                 if(debug)
93                     log.debug("Received unexpected response for command id: "+response.getCorrelationId());
94             }
95         }else{
96             getTransportListener().onCommand(command);
97         }
98     }
99
100     /**
101      * If an async exception occurs, then assume no responses will arrive for any of current requests. Lets let them
102      * know of the problem.
103      */

104     public void onException(IOException JavaDoc error){
105         // Copy and Clear the request Map
106
ArrayList JavaDoc requests=new ArrayList JavaDoc(requestMap.values());
107         requestMap.clear();
108         for(Iterator JavaDoc iter=requests.iterator();iter.hasNext();){
109             FutureResponse fr=(FutureResponse)iter.next();
110             fr.set(new ExceptionResponse(error));
111         }
112         super.onException(error);
113     }
114
115     public IntSequenceGenerator getSequenceGenerator(){
116         return sequenceGenerator;
117     }
118
119     public String JavaDoc toString(){
120         return next.toString();
121     }
122 }
123
Popular Tags