1 18 package org.apache.activemq.transport; 19 20 import java.io.IOException ; 21 import java.io.InterruptedIOException ; 22 23 import org.apache.activemq.command.Response; 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 27 import java.util.concurrent.ArrayBlockingQueue ; 28 import java.util.concurrent.TimeUnit ; 29 30 public class FutureResponse { 31 private static final Log log = LogFactory.getLog(FutureResponse.class); 32 33 private final ResponseCallback responseCallback; 34 private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue (1); 35 36 public FutureResponse(ResponseCallback responseCallback) { 37 this.responseCallback = responseCallback; 38 } 39 40 public Response getResult() throws IOException { 41 try { 42 return (Response) responseSlot.take(); 43 } 44 catch (InterruptedException e) { 45 Thread.currentThread().interrupt(); 46 if (log.isDebugEnabled()) { 47 log.debug("Operation interupted: " + e, e); 48 } 49 throw new InterruptedIOException ("Interrupted."); 50 } 51 } 52 53 public Response getResult(int timeout) throws IOException { 54 try { 55 return (Response) responseSlot.poll(timeout,TimeUnit.MILLISECONDS); 56 } catch (InterruptedException e) { 57 throw new InterruptedIOException ("Interrupted."); 58 } 59 } 60 61 public void set(Response result) { 62 if( responseSlot.offer(result) ) { 63 if( responseCallback !=null ) { 64 responseCallback.onCompletion(this); 65 } 66 } 67 } 68 } 69 | Popular Tags |