1 14 15 package org.apache.activemq.transport.vm; 16 17 import java.io.IOException ; 18 import java.net.URI ; 19 import java.util.Collections ; 20 import java.util.Iterator ; 21 import java.util.LinkedList ; 22 import java.util.List ; 23 import java.util.concurrent.LinkedBlockingQueue ; 24 import java.util.concurrent.atomic.AtomicBoolean ; 25 import java.util.concurrent.atomic.AtomicLong ; 26 import org.apache.activemq.command.Command; 27 import org.apache.activemq.thread.Task; 28 import org.apache.activemq.thread.TaskRunner; 29 import org.apache.activemq.thread.TaskRunnerFactory; 30 import org.apache.activemq.transport.FutureResponse; 31 import org.apache.activemq.transport.ResponseCallback; 32 import org.apache.activemq.transport.Transport; 33 import org.apache.activemq.transport.TransportDisposedIOException; 34 import org.apache.activemq.transport.TransportListener; 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 38 43 public class VMTransport implements Transport,Task{ 44 45 private static final Log log=LogFactory.getLog(VMTransport.class); 46 private static final AtomicLong nextId=new AtomicLong (0); 47 private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY, 48 true,1000); 49 protected VMTransport peer; 50 protected TransportListener transportListener; 51 protected boolean disposed; 52 protected boolean marshal; 53 protected boolean network; 54 protected boolean async=true; 55 protected AtomicBoolean started=new AtomicBoolean (); 56 protected int asyncQueueDepth=2000; 57 protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList ()); 58 protected LinkedBlockingQueue messageQueue=null; 59 protected final URI location; 60 protected final long id; 61 private TaskRunner taskRunner; 62 private final Object mutex=new Object (); 63 64 public VMTransport(URI location){ 65 this.location=location; 66 this.id=nextId.getAndIncrement(); 67 } 68 69 public VMTransport getPeer(){ 70 synchronized(mutex){ 71 return peer; 72 } 73 } 74 75 public void setPeer(VMTransport peer){ 76 synchronized(mutex){ 77 this.peer=peer; 78 } 79 } 80 81 public void oneway(Object command) throws IOException { 82 if(disposed){ 83 throw new TransportDisposedIOException("Transport disposed."); 84 } 85 if(peer==null) 86 throw new IOException ("Peer not connected."); 87 if(!peer.disposed){ 88 if(async){ 89 asyncOneWay(command); 90 }else{ 91 syncOneWay(command); 92 } 93 }else{ 94 throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed."); 95 } 96 } 97 98 protected void syncOneWay(Object command){ 99 final TransportListener tl=peer.transportListener; 100 prePeerSetQueue=peer.prePeerSetQueue; 101 if(tl==null){ 102 prePeerSetQueue.add(command); 103 }else{ 104 tl.onCommand(command); 105 } 106 } 107 108 protected void asyncOneWay(Object command) throws IOException { 109 try{ 110 synchronized(mutex){ 111 if(messageQueue==null){ 112 messageQueue=new LinkedBlockingQueue (this.asyncQueueDepth); 113 } 114 } 115 messageQueue.put(command); 116 wakeup(); 117 }catch(final InterruptedException e){ 118 log.error("messageQueue interupted",e); 119 throw new IOException (e.getMessage()); 120 } 121 } 122 123 public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException { 124 throw new AssertionError ("Unsupported Method"); 125 } 126 127 public Object request(Object command) throws IOException { 128 throw new AssertionError ("Unsupported Method"); 129 } 130 131 public Object request(Object command,int timeout) throws IOException { 132 throw new AssertionError ("Unsupported Method"); 133 } 134 135 public TransportListener getTransportListener(){ 136 synchronized(mutex){ 137 return transportListener; 138 } 139 } 140 141 public void setTransportListener(TransportListener commandListener){ 142 synchronized(mutex){ 143 this.transportListener=commandListener; 144 } 145 wakeup(); 146 peer.wakeup(); 147 } 148 149 public void start() throws Exception { 150 if(started.compareAndSet(false,true)){ 151 if(transportListener==null) 152 throw new IOException ("TransportListener not set."); 153 if(!async){ 154 for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ 155 Command command=(Command)iter.next(); 156 transportListener.onCommand(command); 157 iter.remove(); 158 } 159 }else{ 160 peer.wakeup(); 161 wakeup(); 162 } 163 } 164 } 165 166 public void stop() throws Exception { 167 if(started.compareAndSet(true,false)){ 168 if(!disposed){ 169 disposed=true; 170 } 171 if(taskRunner!=null){ 172 taskRunner.shutdown(1000); 173 taskRunner=null; 174 } 175 } 176 } 177 178 public Object narrow(Class target){ 179 if(target.isAssignableFrom(getClass())){ 180 return this; 181 } 182 return null; 183 } 184 185 public boolean isMarshal(){ 186 return marshal; 187 } 188 189 public void setMarshal(boolean marshal){ 190 this.marshal=marshal; 191 } 192 193 public boolean isNetwork(){ 194 return network; 195 } 196 197 public void setNetwork(boolean network){ 198 this.network=network; 199 } 200 201 public String toString(){ 202 return location+"#"+id; 203 } 204 205 public String getRemoteAddress(){ 206 if(peer!=null){ 207 return peer.toString(); 208 } 209 return null; 210 } 211 212 215 public boolean iterate(){ 216 final TransportListener tl=peer.transportListener; 217 Command command=null; 218 synchronized(mutex){ 219 if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){ 220 command=(Command)messageQueue.poll(); 221 } 222 } 223 if(tl!=null&&command!=null){ 224 tl.onCommand(command); 225 } 226 boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed; 227 return result; 228 } 229 230 233 public boolean isAsync(){ 234 return async; 235 } 236 237 240 public void setAsync(boolean async){ 241 this.async=async; 242 } 243 244 247 public int getAsyncQueueDepth(){ 248 return asyncQueueDepth; 249 } 250 251 254 public void setAsyncQueueDepth(int asyncQueueDepth){ 255 this.asyncQueueDepth=asyncQueueDepth; 256 } 257 258 protected void wakeup(){ 259 if(async){ 260 synchronized(mutex){ 261 if(taskRunner==null){ 262 taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString()); 263 } 264 } 265 try{ 266 taskRunner.wakeup(); 267 }catch(InterruptedException e){ 268 Thread.currentThread().interrupt(); 269 } 270 } 271 } 272 } 273 | Popular Tags |