KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > reliable > ReliableTransport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.reliable;
19
20 import java.io.IOException JavaDoc;
21 import java.util.SortedSet JavaDoc;
22 import java.util.TreeSet JavaDoc;
23
24 import org.apache.activemq.command.Command;
25 import org.apache.activemq.command.ReplayCommand;
26 import org.apache.activemq.command.Response;
27 import org.apache.activemq.openwire.CommandIdComparator;
28 import org.apache.activemq.transport.FutureResponse;
29 import org.apache.activemq.transport.ResponseCorrelator;
30 import org.apache.activemq.transport.Transport;
31 import org.apache.activemq.transport.udp.UdpTransport;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34
35 /**
36  * This interceptor deals with out of order commands together with being able to
37  * handle dropped commands and the re-requesting dropped commands.
38  *
39  * @version $Revision: 464110 $
40  */

41 public class ReliableTransport extends ResponseCorrelator {
42     private static final Log log = LogFactory.getLog(ReliableTransport.class);
43
44     private ReplayStrategy replayStrategy;
45     private SortedSet JavaDoc commands = new TreeSet JavaDoc(new CommandIdComparator());
46     private int expectedCounter = 1;
47     private int replayBufferCommandCount = 50;
48     private int requestTimeout = 2000;
49     private ReplayBuffer replayBuffer;
50     private Replayer replayer;
51     private UdpTransport udpTransport;
52
53     public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
54         super(next);
55         this.replayStrategy = replayStrategy;
56     }
57
58     public ReliableTransport(Transport next, UdpTransport udpTransport)
59             throws IOException JavaDoc {
60         super(next, udpTransport.getSequenceGenerator());
61         this.udpTransport = udpTransport;
62         this.replayer = udpTransport.createReplayer();
63     }
64
65     /**
66      * Requests that a range of commands be replayed
67      */

68     public void requestReplay(int fromCommandId, int toCommandId) {
69         ReplayCommand replay = new ReplayCommand();
70         replay.setFirstNakNumber(fromCommandId);
71         replay.setLastNakNumber(toCommandId);
72         try {
73             oneway(replay);
74         }
75         catch (IOException JavaDoc e) {
76             getTransportListener().onException(e);
77         }
78     }
79
80     public Object JavaDoc request(Object JavaDoc o) throws IOException JavaDoc {
81         final Command command = (Command) o;
82         FutureResponse response = asyncRequest(command, null);
83         while (true) {
84             Response result = response.getResult(requestTimeout);
85             if (result != null) {
86                 return result;
87             }
88             onMissingResponse(command, response);
89         }
90     }
91
92     public Object JavaDoc request(Object JavaDoc o, int timeout) throws IOException JavaDoc {
93         final Command command = (Command) o;
94         FutureResponse response = asyncRequest(command, null);
95         while (timeout > 0) {
96             int time = timeout;
97             if (timeout > requestTimeout) {
98                 time = requestTimeout;
99             }
100             Response result = response.getResult(time);
101             if (result != null) {
102                 return result;
103             }
104             onMissingResponse(command, response);
105             timeout -= time;
106         }
107         return response.getResult(0);
108     }
109
110     public void onCommand(Object JavaDoc o) {
111         Command command = (Command) o;
112         // lets pass wireformat through
113
if (command.isWireFormatInfo()) {
114             super.onCommand(command);
115             return;
116         }
117         else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
118             replayCommands((ReplayCommand) command);
119             return;
120         }
121
122         int actualCounter = command.getCommandId();
123         boolean valid = expectedCounter == actualCounter;
124
125         if (!valid) {
126             synchronized (commands) {
127                 int nextCounter = actualCounter;
128                 boolean empty = commands.isEmpty();
129                 if (!empty) {
130                     Command nextAvailable = (Command) commands.first();
131                     nextCounter = nextAvailable.getCommandId();
132                 }
133                 
134                 try {
135                     boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter);
136
137                     if (keep) {
138                         // lets add it to the list for later on
139
if (log.isDebugEnabled()) {
140                             log.debug("Received out of order command which is being buffered for later: " + command);
141                         }
142                         commands.add(command);
143                     }
144                 }
145                 catch (IOException JavaDoc e) {
146                     onException(e);
147                 }
148
149                 if (!empty) {
150                     // lets see if the first item in the set is the next
151
// expected
152
command = (Command) commands.first();
153                     valid = expectedCounter == command.getCommandId();
154                     if (valid) {
155                         commands.remove(command);
156                     }
157                 }
158             }
159         }
160
161         while (valid) {
162             // we've got a valid header so increment counter
163
replayStrategy.onReceivedPacket(this, expectedCounter);
164             expectedCounter++;
165             super.onCommand(command);
166
167             synchronized (commands) {
168                 // we could have more commands left
169
valid = !commands.isEmpty();
170                 if (valid) {
171                     // lets see if the first item in the set is the next
172
// expected
173
command = (Command) commands.first();
174                     valid = expectedCounter == command.getCommandId();
175                     if (valid) {
176                         commands.remove(command);
177                     }
178                 }
179             }
180         }
181     }
182
183     public int getBufferedCommandCount() {
184         synchronized (commands) {
185             return commands.size();
186         }
187     }
188
189     public int getExpectedCounter() {
190         return expectedCounter;
191     }
192
193     /**
194      * This property should never really be set - but is mutable primarily for
195      * test cases
196      */

197     public void setExpectedCounter(int expectedCounter) {
198         this.expectedCounter = expectedCounter;
199     }
200
201     public int getRequestTimeout() {
202         return requestTimeout;
203     }
204
205     /**
206      * Sets the default timeout of requests before starting to request commands
207      * are replayed
208      */

209     public void setRequestTimeout(int requestTimeout) {
210         this.requestTimeout = requestTimeout;
211     }
212
213     public ReplayStrategy getReplayStrategy() {
214         return replayStrategy;
215     }
216
217     public ReplayBuffer getReplayBuffer() {
218         if (replayBuffer == null) {
219             replayBuffer = createReplayBuffer();
220         }
221         return replayBuffer;
222     }
223
224     public void setReplayBuffer(ReplayBuffer replayBuffer) {
225         this.replayBuffer = replayBuffer;
226     }
227
228     public int getReplayBufferCommandCount() {
229         return replayBufferCommandCount;
230     }
231
232     /**
233      * Sets the default number of commands which are buffered
234      */

235     public void setReplayBufferCommandCount(int replayBufferSize) {
236         this.replayBufferCommandCount = replayBufferSize;
237     }
238
239     public void setReplayStrategy(ReplayStrategy replayStrategy) {
240         this.replayStrategy = replayStrategy;
241     }
242
243     public Replayer getReplayer() {
244         return replayer;
245     }
246
247     public void setReplayer(Replayer replayer) {
248         this.replayer = replayer;
249     }
250
251     public String JavaDoc toString() {
252         return next.toString();
253     }
254
255     public void start() throws Exception JavaDoc {
256         if (udpTransport != null) {
257             udpTransport.setReplayBuffer(getReplayBuffer());
258         }
259         if (replayStrategy == null) {
260             throw new IllegalArgumentException JavaDoc("Property replayStrategy not specified");
261         }
262         super.start();
263     }
264
265     /**
266      * Lets attempt to replay the request as a command may have disappeared
267      */

268     protected void onMissingResponse(Command command, FutureResponse response) {
269         log.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
270
271         int commandId = command.getCommandId();
272         requestReplay(commandId, commandId);
273     }
274
275     protected ReplayBuffer createReplayBuffer() {
276         return new DefaultReplayBuffer(getReplayBufferCommandCount());
277     }
278
279     protected void replayCommands(ReplayCommand command) {
280         try {
281             if (replayer == null) {
282                 onException(new IOException JavaDoc("Cannot replay commands. No replayer property configured"));
283             }
284             if (log.isDebugEnabled()) {
285                 log.debug("Processing replay command: " + command);
286             }
287             getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
288
289             // TODO we could proactively remove ack'd stuff from the replay
290
// buffer
291
// if we only have a single client talking to us
292
}
293         catch (IOException JavaDoc e) {
294             onException(e);
295         }
296     }
297
298 }
299
Popular Tags