1 14 15 package org.apache.activemq.transport; 16 17 import java.io.IOException ; 18 import java.util.ArrayList ; 19 import java.util.HashMap ; 20 import java.util.Iterator ; 21 import java.util.Map ; 22 import org.apache.activemq.command.Command; 23 import org.apache.activemq.command.ExceptionResponse; 24 import org.apache.activemq.command.Response; 25 import org.apache.activemq.util.IntSequenceGenerator; 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 import java.util.concurrent.ConcurrentHashMap ; 29 30 36 public class ResponseCorrelator extends TransportFilter{ 37 38 private static final Log log=LogFactory.getLog(ResponseCorrelator.class); 39 private final Map requestMap=new HashMap (); 40 private IntSequenceGenerator sequenceGenerator; 41 private final boolean debug=log.isDebugEnabled(); 42 43 public ResponseCorrelator(Transport next){ 44 this(next,new IntSequenceGenerator()); 45 } 46 47 public ResponseCorrelator(Transport next,IntSequenceGenerator sequenceGenerator){ 48 super(next); 49 this.sequenceGenerator=sequenceGenerator; 50 } 51 52 public void oneway(Object o) throws IOException { 53 Command command=(Command)o; 54 command.setCommandId(sequenceGenerator.getNextSequenceId()); 55 command.setResponseRequired(false); 56 next.oneway(command); 57 } 58 59 public FutureResponse asyncRequest(Object o,ResponseCallback responseCallback) throws IOException { 60 Command command=(Command)o; 61 command.setCommandId(sequenceGenerator.getNextSequenceId()); 62 command.setResponseRequired(true); 63 FutureResponse future=new FutureResponse(responseCallback); 64 synchronized(requestMap){ 65 requestMap.put(new Integer (command.getCommandId()),future); 66 } 67 next.oneway(command); 68 return future; 69 } 70 71 public Object request(Object command) throws IOException { 72 FutureResponse response=asyncRequest(command,null); 73 return response.getResult(); 74 } 75 76 public Object request(Object command,int timeout) throws IOException { 77 FutureResponse response=asyncRequest(command,null); 78 return response.getResult(timeout); 79 } 80 81 public void onCommand(Object o){ 82 Command command=(Command)o; 83 if(command.isResponse()){ 84 Response response=(Response)command; 85 FutureResponse future=null; 86 synchronized(requestMap){ 87 future=(FutureResponse)requestMap.remove(new Integer (response.getCorrelationId())); 88 } 89 if(future!=null){ 90 future.set(response); 91 }else{ 92 if(debug) 93 log.debug("Received unexpected response for command id: "+response.getCorrelationId()); 94 } 95 }else{ 96 getTransportListener().onCommand(command); 97 } 98 } 99 100 104 public void onException(IOException error){ 105 ArrayList requests=new ArrayList (requestMap.values()); 107 requestMap.clear(); 108 for(Iterator iter=requests.iterator();iter.hasNext();){ 109 FutureResponse fr=(FutureResponse)iter.next(); 110 fr.set(new ExceptionResponse(error)); 111 } 112 super.onException(error); 113 } 114 115 public IntSequenceGenerator getSequenceGenerator(){ 116 return sequenceGenerator; 117 } 118 119 public String toString(){ 120 return next.toString(); 121 } 122 } 123 | Popular Tags |