KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > vm > VMTransport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.transport.vm;
16
17 import java.io.IOException JavaDoc;
18 import java.net.URI JavaDoc;
19 import java.util.Collections JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import java.util.LinkedList JavaDoc;
22 import java.util.List JavaDoc;
23 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
24 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
25 import java.util.concurrent.atomic.AtomicLong JavaDoc;
26 import org.apache.activemq.command.Command;
27 import org.apache.activemq.thread.Task;
28 import org.apache.activemq.thread.TaskRunner;
29 import org.apache.activemq.thread.TaskRunnerFactory;
30 import org.apache.activemq.transport.FutureResponse;
31 import org.apache.activemq.transport.ResponseCallback;
32 import org.apache.activemq.transport.Transport;
33 import org.apache.activemq.transport.TransportDisposedIOException;
34 import org.apache.activemq.transport.TransportListener;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37
38 /**
39  * A Transport implementation that uses direct method invocations.
40  *
41  * @version $Revision$
42  */

43 public class VMTransport implements Transport,Task{
44
45     private static final Log log=LogFactory.getLog(VMTransport.class);
46     private static final AtomicLong JavaDoc nextId=new AtomicLong JavaDoc(0);
47     private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY,
48             true,1000);
49     protected VMTransport peer;
50     protected TransportListener transportListener;
51     protected boolean disposed;
52     protected boolean marshal;
53     protected boolean network;
54     protected boolean async=true;
55     protected AtomicBoolean JavaDoc started=new AtomicBoolean JavaDoc();
56     protected int asyncQueueDepth=2000;
57     protected List JavaDoc prePeerSetQueue=Collections.synchronizedList(new LinkedList JavaDoc());
58     protected LinkedBlockingQueue JavaDoc messageQueue=null;
59     protected final URI JavaDoc location;
60     protected final long id;
61     private TaskRunner taskRunner;
62     private final Object JavaDoc mutex=new Object JavaDoc();
63
64     public VMTransport(URI JavaDoc location){
65         this.location=location;
66         this.id=nextId.getAndIncrement();
67     }
68
69     public VMTransport getPeer(){
70         synchronized(mutex){
71             return peer;
72         }
73     }
74
75     public void setPeer(VMTransport peer){
76         synchronized(mutex){
77             this.peer=peer;
78         }
79     }
80
81     public void oneway(Object JavaDoc command) throws IOException JavaDoc{
82         if(disposed){
83             throw new TransportDisposedIOException("Transport disposed.");
84         }
85         if(peer==null)
86             throw new IOException JavaDoc("Peer not connected.");
87         if(!peer.disposed){
88             if(async){
89                 asyncOneWay(command);
90             }else{
91                 syncOneWay(command);
92             }
93         }else{
94             throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
95         }
96     }
97
98     protected void syncOneWay(Object JavaDoc command){
99         final TransportListener tl=peer.transportListener;
100         prePeerSetQueue=peer.prePeerSetQueue;
101         if(tl==null){
102             prePeerSetQueue.add(command);
103         }else{
104             tl.onCommand(command);
105         }
106     }
107
108     protected void asyncOneWay(Object JavaDoc command) throws IOException JavaDoc{
109         try{
110             synchronized(mutex){
111                 if(messageQueue==null){
112                     messageQueue=new LinkedBlockingQueue JavaDoc(this.asyncQueueDepth);
113                 }
114             }
115             messageQueue.put(command);
116             wakeup();
117         }catch(final InterruptedException JavaDoc e){
118             log.error("messageQueue interupted",e);
119             throw new IOException JavaDoc(e.getMessage());
120         }
121     }
122
123     public FutureResponse asyncRequest(Object JavaDoc command,ResponseCallback responseCallback) throws IOException JavaDoc{
124         throw new AssertionError JavaDoc("Unsupported Method");
125     }
126
127     public Object JavaDoc request(Object JavaDoc command) throws IOException JavaDoc{
128         throw new AssertionError JavaDoc("Unsupported Method");
129     }
130
131     public Object JavaDoc request(Object JavaDoc command,int timeout) throws IOException JavaDoc{
132         throw new AssertionError JavaDoc("Unsupported Method");
133     }
134
135     public TransportListener getTransportListener(){
136         synchronized(mutex){
137             return transportListener;
138         }
139     }
140
141     public void setTransportListener(TransportListener commandListener){
142         synchronized(mutex){
143             this.transportListener=commandListener;
144         }
145         wakeup();
146         peer.wakeup();
147     }
148
149     public void start() throws Exception JavaDoc{
150         if(started.compareAndSet(false,true)){
151             if(transportListener==null)
152                 throw new IOException JavaDoc("TransportListener not set.");
153             if(!async){
154                 for(Iterator JavaDoc iter=prePeerSetQueue.iterator();iter.hasNext();){
155                     Command command=(Command)iter.next();
156                     transportListener.onCommand(command);
157                     iter.remove();
158                 }
159             }else{
160                 peer.wakeup();
161                 wakeup();
162             }
163         }
164     }
165
166     public void stop() throws Exception JavaDoc{
167         if(started.compareAndSet(true,false)){
168             if(!disposed){
169                 disposed=true;
170             }
171             if(taskRunner!=null){
172                 taskRunner.shutdown(1000);
173                 taskRunner=null;
174             }
175         }
176     }
177
178     public Object JavaDoc narrow(Class JavaDoc target){
179         if(target.isAssignableFrom(getClass())){
180             return this;
181         }
182         return null;
183     }
184
185     public boolean isMarshal(){
186         return marshal;
187     }
188
189     public void setMarshal(boolean marshal){
190         this.marshal=marshal;
191     }
192
193     public boolean isNetwork(){
194         return network;
195     }
196
197     public void setNetwork(boolean network){
198         this.network=network;
199     }
200
201     public String JavaDoc toString(){
202         return location+"#"+id;
203     }
204
205     public String JavaDoc getRemoteAddress(){
206         if(peer!=null){
207             return peer.toString();
208         }
209         return null;
210     }
211
212     /**
213      * @see org.apache.activemq.thread.Task#iterate()
214      */

215     public boolean iterate(){
216         final TransportListener tl=peer.transportListener;
217         Command command=null;
218         synchronized(mutex){
219             if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){
220                 command=(Command)messageQueue.poll();
221             }
222         }
223         if(tl!=null&&command!=null){
224             tl.onCommand(command);
225         }
226         boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
227         return result;
228     }
229
230     /**
231      * @return the async
232      */

233     public boolean isAsync(){
234         return async;
235     }
236
237     /**
238      * @param async the async to set
239      */

240     public void setAsync(boolean async){
241         this.async=async;
242     }
243
244     /**
245      * @return the asyncQueueDepth
246      */

247     public int getAsyncQueueDepth(){
248         return asyncQueueDepth;
249     }
250
251     /**
252      * @param asyncQueueDepth the asyncQueueDepth to set
253      */

254     public void setAsyncQueueDepth(int asyncQueueDepth){
255         this.asyncQueueDepth=asyncQueueDepth;
256     }
257
258     protected void wakeup(){
259         if(async){
260             synchronized(mutex){
261                 if(taskRunner==null){
262                     taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
263                 }
264             }
265             try{
266                 taskRunner.wakeup();
267             }catch(InterruptedException JavaDoc e){
268                 Thread.currentThread().interrupt();
269             }
270         }
271     }
272 }
273
Popular Tags