1 18 package org.apache.activemq.transport.reliable; 19 20 import java.io.IOException ; 21 import java.util.SortedSet ; 22 import java.util.TreeSet ; 23 24 import org.apache.activemq.command.Command; 25 import org.apache.activemq.command.ReplayCommand; 26 import org.apache.activemq.command.Response; 27 import org.apache.activemq.openwire.CommandIdComparator; 28 import org.apache.activemq.transport.FutureResponse; 29 import org.apache.activemq.transport.ResponseCorrelator; 30 import org.apache.activemq.transport.Transport; 31 import org.apache.activemq.transport.udp.UdpTransport; 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 35 41 public class ReliableTransport extends ResponseCorrelator { 42 private static final Log log = LogFactory.getLog(ReliableTransport.class); 43 44 private ReplayStrategy replayStrategy; 45 private SortedSet commands = new TreeSet (new CommandIdComparator()); 46 private int expectedCounter = 1; 47 private int replayBufferCommandCount = 50; 48 private int requestTimeout = 2000; 49 private ReplayBuffer replayBuffer; 50 private Replayer replayer; 51 private UdpTransport udpTransport; 52 53 public ReliableTransport(Transport next, ReplayStrategy replayStrategy) { 54 super(next); 55 this.replayStrategy = replayStrategy; 56 } 57 58 public ReliableTransport(Transport next, UdpTransport udpTransport) 59 throws IOException { 60 super(next, udpTransport.getSequenceGenerator()); 61 this.udpTransport = udpTransport; 62 this.replayer = udpTransport.createReplayer(); 63 } 64 65 68 public void requestReplay(int fromCommandId, int toCommandId) { 69 ReplayCommand replay = new ReplayCommand(); 70 replay.setFirstNakNumber(fromCommandId); 71 replay.setLastNakNumber(toCommandId); 72 try { 73 oneway(replay); 74 } 75 catch (IOException e) { 76 getTransportListener().onException(e); 77 } 78 } 79 80 public Object request(Object o) throws IOException { 81 final Command command = (Command) o; 82 FutureResponse response = asyncRequest(command, null); 83 while (true) { 84 Response result = response.getResult(requestTimeout); 85 if (result != null) { 86 return result; 87 } 88 onMissingResponse(command, response); 89 } 90 } 91 92 public Object request(Object o, int timeout) throws IOException { 93 final Command command = (Command) o; 94 FutureResponse response = asyncRequest(command, null); 95 while (timeout > 0) { 96 int time = timeout; 97 if (timeout > requestTimeout) { 98 time = requestTimeout; 99 } 100 Response result = response.getResult(time); 101 if (result != null) { 102 return result; 103 } 104 onMissingResponse(command, response); 105 timeout -= time; 106 } 107 return response.getResult(0); 108 } 109 110 public void onCommand(Object o) { 111 Command command = (Command) o; 112 if (command.isWireFormatInfo()) { 114 super.onCommand(command); 115 return; 116 } 117 else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) { 118 replayCommands((ReplayCommand) command); 119 return; 120 } 121 122 int actualCounter = command.getCommandId(); 123 boolean valid = expectedCounter == actualCounter; 124 125 if (!valid) { 126 synchronized (commands) { 127 int nextCounter = actualCounter; 128 boolean empty = commands.isEmpty(); 129 if (!empty) { 130 Command nextAvailable = (Command) commands.first(); 131 nextCounter = nextAvailable.getCommandId(); 132 } 133 134 try { 135 boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter); 136 137 if (keep) { 138 if (log.isDebugEnabled()) { 140 log.debug("Received out of order command which is being buffered for later: " + command); 141 } 142 commands.add(command); 143 } 144 } 145 catch (IOException e) { 146 onException(e); 147 } 148 149 if (!empty) { 150 command = (Command) commands.first(); 153 valid = expectedCounter == command.getCommandId(); 154 if (valid) { 155 commands.remove(command); 156 } 157 } 158 } 159 } 160 161 while (valid) { 162 replayStrategy.onReceivedPacket(this, expectedCounter); 164 expectedCounter++; 165 super.onCommand(command); 166 167 synchronized (commands) { 168 valid = !commands.isEmpty(); 170 if (valid) { 171 command = (Command) commands.first(); 174 valid = expectedCounter == command.getCommandId(); 175 if (valid) { 176 commands.remove(command); 177 } 178 } 179 } 180 } 181 } 182 183 public int getBufferedCommandCount() { 184 synchronized (commands) { 185 return commands.size(); 186 } 187 } 188 189 public int getExpectedCounter() { 190 return expectedCounter; 191 } 192 193 197 public void setExpectedCounter(int expectedCounter) { 198 this.expectedCounter = expectedCounter; 199 } 200 201 public int getRequestTimeout() { 202 return requestTimeout; 203 } 204 205 209 public void setRequestTimeout(int requestTimeout) { 210 this.requestTimeout = requestTimeout; 211 } 212 213 public ReplayStrategy getReplayStrategy() { 214 return replayStrategy; 215 } 216 217 public ReplayBuffer getReplayBuffer() { 218 if (replayBuffer == null) { 219 replayBuffer = createReplayBuffer(); 220 } 221 return replayBuffer; 222 } 223 224 public void setReplayBuffer(ReplayBuffer replayBuffer) { 225 this.replayBuffer = replayBuffer; 226 } 227 228 public int getReplayBufferCommandCount() { 229 return replayBufferCommandCount; 230 } 231 232 235 public void setReplayBufferCommandCount(int replayBufferSize) { 236 this.replayBufferCommandCount = replayBufferSize; 237 } 238 239 public void setReplayStrategy(ReplayStrategy replayStrategy) { 240 this.replayStrategy = replayStrategy; 241 } 242 243 public Replayer getReplayer() { 244 return replayer; 245 } 246 247 public void setReplayer(Replayer replayer) { 248 this.replayer = replayer; 249 } 250 251 public String toString() { 252 return next.toString(); 253 } 254 255 public void start() throws Exception { 256 if (udpTransport != null) { 257 udpTransport.setReplayBuffer(getReplayBuffer()); 258 } 259 if (replayStrategy == null) { 260 throw new IllegalArgumentException ("Property replayStrategy not specified"); 261 } 262 super.start(); 263 } 264 265 268 protected void onMissingResponse(Command command, FutureResponse response) { 269 log.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message"); 270 271 int commandId = command.getCommandId(); 272 requestReplay(commandId, commandId); 273 } 274 275 protected ReplayBuffer createReplayBuffer() { 276 return new DefaultReplayBuffer(getReplayBufferCommandCount()); 277 } 278 279 protected void replayCommands(ReplayCommand command) { 280 try { 281 if (replayer == null) { 282 onException(new IOException ("Cannot replay commands. No replayer property configured")); 283 } 284 if (log.isDebugEnabled()) { 285 log.debug("Processing replay command: " + command); 286 } 287 getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer); 288 289 } 293 catch (IOException e) { 294 onException(e); 295 } 296 } 297 298 } 299 | Popular Tags |