1 18 package org.apache.activemq.transport.http; 19 20 import java.util.Queue ; 21 import java.util.concurrent.BlockingQueue ; 22 import java.util.concurrent.TimeUnit ; 23 24 import org.apache.activemq.transport.TransportSupport; 25 import org.apache.activemq.util.ServiceStopper; 26 27 import java.io.IOException ; 28 29 36 public class BlockingQueueTransport extends TransportSupport { 37 public static final long MAX_TIMEOUT = 30000L; 38 39 private BlockingQueue queue; 40 41 public BlockingQueueTransport(BlockingQueue channel) { 42 this.queue = channel; 43 } 44 45 public BlockingQueue getQueue() { 46 return queue; 47 } 48 49 public void oneway(Object command) throws IOException { 50 try { 51 boolean success = queue.offer(command, MAX_TIMEOUT, TimeUnit.MILLISECONDS); 52 if (!success) 53 throw new IOException ("Fail to add to BlockingQueue. Add timed out after " + MAX_TIMEOUT + "ms: size=" + queue.size()); 54 } catch (InterruptedException e) { 55 throw new IOException ("Fail to add to BlockingQueue. Interrupted while waiting for space: size=" + queue.size()); 56 } 57 } 58 59 60 public String getRemoteAddress() { 61 return "blockingQueue"; 62 } 63 64 protected void doStart() throws Exception { 65 } 66 67 protected void doStop(ServiceStopper stopper) throws Exception { 68 } 69 } 70 | Popular Tags |