1 18 package org.apache.activemq.transport.failover; 19 20 import java.io.IOException ; 21 import java.io.InterruptedIOException ; 22 import java.net.URI ; 23 import java.util.ArrayList ; 24 import java.util.Iterator ; 25 import java.util.Random ; 26 27 import org.apache.activemq.command.BrokerInfo; 28 import org.apache.activemq.command.Command; 29 import org.apache.activemq.command.Response; 30 import org.apache.activemq.state.ConnectionStateTracker; 31 import org.apache.activemq.state.Tracked; 32 import org.apache.activemq.thread.DefaultThreadPools; 33 import org.apache.activemq.thread.Task; 34 import org.apache.activemq.thread.TaskRunner; 35 import org.apache.activemq.transport.CompositeTransport; 36 import org.apache.activemq.transport.FutureResponse; 37 import org.apache.activemq.transport.ResponseCallback; 38 import org.apache.activemq.transport.Transport; 39 import org.apache.activemq.transport.TransportFactory; 40 import org.apache.activemq.transport.TransportListener; 41 import org.apache.activemq.util.IOExceptionSupport; 42 import org.apache.activemq.util.ServiceSupport; 43 import org.apache.commons.logging.Log; 44 import org.apache.commons.logging.LogFactory; 45 46 import java.util.concurrent.ConcurrentHashMap ; 47 import java.util.concurrent.CopyOnWriteArrayList ; 48 49 55 public class FailoverTransport implements CompositeTransport { 56 57 private static final Log log = LogFactory.getLog(FailoverTransport.class); 58 59 private TransportListener transportListener; 60 private boolean disposed; 61 private final CopyOnWriteArrayList uris = new CopyOnWriteArrayList (); 62 63 private final Object reconnectMutex = new Object (); 64 private final Object sleepMutex = new Object (); 65 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 66 private final ConcurrentHashMap requestMap = new ConcurrentHashMap (); 67 68 private URI connectedTransportURI; 69 private Transport connectedTransport; 70 private final TaskRunner reconnectTask; 71 private boolean started; 72 73 private long initialReconnectDelay = 10; 74 private long maxReconnectDelay = 1000 * 30; 75 private long backOffMultiplier = 2; 76 private boolean useExponentialBackOff = true; 77 private boolean randomize = true; 78 private boolean initialized; 79 private int maxReconnectAttempts; 80 private int connectFailures; 81 private long reconnectDelay = initialReconnectDelay; 82 private Exception connectionFailure; 83 84 private final TransportListener myTransportListener = createTransportListener(); 85 86 TransportListener createTransportListener() { 87 return new TransportListener() { 88 public void onCommand(Object o) { 89 Command command = (Command) o; 90 if (command == null) { 91 return; 92 } 93 if (command.isResponse()) { 94 Object object = requestMap.remove(new Integer (((Response) command).getCorrelationId())); 95 if( object!=null && object.getClass() == Tracked.class ) { 96 ((Tracked)object).onResponses(); 97 } 98 } 99 if (!initialized){ 100 if (command.isBrokerInfo()){ 101 BrokerInfo info = (BrokerInfo)command; 102 BrokerInfo[] peers = info.getPeerBrokerInfos(); 103 if (peers!= null){ 104 for (int i =0; i < peers.length;i++){ 105 String brokerString = peers[i].getBrokerURL(); 106 add(brokerString); 107 } 108 } 109 initialized = true; 110 } 111 112 } 113 if (transportListener != null) { 114 transportListener.onCommand(command); 115 } 116 } 117 118 public void onException(IOException error) { 119 try { 120 handleTransportFailure(error); 121 } 122 catch (InterruptedException e) { 123 Thread.currentThread().interrupt(); 124 transportListener.onException(new InterruptedIOException ()); 125 } 126 } 127 128 public void transportInterupted(){ 129 if (transportListener != null){ 130 transportListener.transportInterupted(); 131 } 132 } 133 134 public void transportResumed(){ 135 if(transportListener != null){ 136 transportListener.transportResumed(); 137 } 138 } 139 }; 140 } 141 142 public FailoverTransport() throws InterruptedIOException { 143 144 stateTracker.setTrackTransactions(true); 145 146 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { 148 149 public boolean iterate() { 150 151 Exception failure=null; 152 synchronized (reconnectMutex) { 153 154 if (disposed || connectionFailure!=null) { 155 reconnectMutex.notifyAll(); 156 } 157 158 if (connectedTransport != null || disposed || connectionFailure!=null) { 159 return false; 160 } else { 161 ArrayList connectList = getConnectList(); 162 if( connectList.isEmpty() ) { 163 failure = new IOException ("No uris available to connect to."); 164 } else { 165 if (!useExponentialBackOff){ 166 reconnectDelay = initialReconnectDelay; 167 } 168 Iterator iter = connectList.iterator(); 169 for (int i = 0; iter.hasNext() && connectedTransport == null && !disposed; i++) { 170 URI uri = (URI ) iter.next(); 171 try { 172 log.debug("Attempting connect to: " + uri); 173 Transport t = TransportFactory.compositeConnect(uri); 174 t.setTransportListener(myTransportListener); 175 t.start(); 176 177 if (started) { 178 restoreTransport(t); 179 } 180 181 log.debug("Connection established"); 182 reconnectDelay = initialReconnectDelay; 183 connectedTransportURI = uri; 184 connectedTransport = t; 185 reconnectMutex.notifyAll(); 186 connectFailures = 0; 187 if (transportListener != null){ 188 transportListener.transportResumed(); 189 } 190 log.info("Successfully reconnected to " + uri); 191 return false; 192 } 193 catch (Exception e) { 194 failure = e; 195 log.debug("Connect fail to: " + uri + ", reason: " + e); 196 } 197 } 198 } 199 } 200 201 if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) { 202 log.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); 203 connectionFailure = failure; 204 reconnectMutex.notifyAll(); 205 return false; 206 } 207 } 208 209 if(!disposed){ 210 211 log.debug("Waiting "+reconnectDelay+" ms before attempting connection. "); 212 synchronized(sleepMutex){ 213 try{ 214 sleepMutex.wait(reconnectDelay); 215 }catch(InterruptedException e){ 216 Thread.currentThread().interrupt(); 217 } 218 } 219 220 221 if(useExponentialBackOff){ 222 reconnectDelay*=backOffMultiplier; 224 if(reconnectDelay>maxReconnectDelay) 225 reconnectDelay=maxReconnectDelay; 226 } 227 } 228 return !disposed; 229 } 230 231 }, "ActiveMQ Failover Worker: "+System.identityHashCode(this)); 232 } 233 234 private void handleTransportFailure(IOException e) throws InterruptedException { 235 if (transportListener != null){ 236 transportListener.transportInterupted(); 237 } 238 synchronized (reconnectMutex) { 239 log.warn("Transport failed, attempting to automatically reconnect due to: " + e, e); 240 if (connectedTransport != null) { 241 initialized = false; 242 ServiceSupport.dispose(connectedTransport); 243 connectedTransport = null; 244 connectedTransportURI = null; 245 } 246 reconnectTask.wakeup(); 247 } 248 } 249 250 public void start() throws Exception { 251 synchronized (reconnectMutex) { 252 log.debug("Started."); 253 if (started) 254 return; 255 started = true; 256 if (connectedTransport != null) { 257 stateTracker.restore(connectedTransport); 258 } 259 } 260 } 261 262 public void stop() throws Exception { 263 synchronized (reconnectMutex) { 264 log.debug("Stopped."); 265 if (!started) 266 return; 267 started = false; 268 disposed = true; 269 270 if (connectedTransport != null) { 271 connectedTransport.stop(); 272 connectedTransport=null; 273 } 274 reconnectMutex.notifyAll(); 275 } 276 synchronized(sleepMutex){ 277 sleepMutex.notifyAll(); 278 } 279 reconnectTask.shutdown(); 280 } 281 282 public long getInitialReconnectDelay() { 283 return initialReconnectDelay; 284 } 285 286 public void setInitialReconnectDelay(long initialReconnectDelay) { 287 this.initialReconnectDelay = initialReconnectDelay; 288 } 289 290 public long getMaxReconnectDelay() { 291 return maxReconnectDelay; 292 } 293 294 public void setMaxReconnectDelay(long maxReconnectDelay) { 295 this.maxReconnectDelay = maxReconnectDelay; 296 } 297 298 public long getReconnectDelay() { 299 return reconnectDelay; 300 } 301 302 public void setReconnectDelay(long reconnectDelay) { 303 this.reconnectDelay = reconnectDelay; 304 } 305 306 public long getReconnectDelayExponent() { 307 return backOffMultiplier; 308 } 309 310 public void setReconnectDelayExponent(long reconnectDelayExponent) { 311 this.backOffMultiplier = reconnectDelayExponent; 312 } 313 314 public Transport getConnectedTransport() { 315 return connectedTransport; 316 } 317 318 public URI getConnectedTransportURI() { 319 return connectedTransportURI; 320 } 321 322 public int getMaxReconnectAttempts() { 323 return maxReconnectAttempts; 324 } 325 326 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 327 this.maxReconnectAttempts = maxReconnectAttempts; 328 } 329 330 333 public boolean isRandomize(){ 334 return randomize; 335 } 336 337 340 public void setRandomize(boolean randomize){ 341 this.randomize=randomize; 342 } 343 344 public void oneway(Object o) throws IOException { 345 Command command = (Command) o; 346 Exception error = null; 347 try { 348 349 synchronized (reconnectMutex) { 350 for (int i = 0;!disposed; i++) { 352 try { 353 354 while (connectedTransport == null && !disposed && connectionFailure==null ) { 356 log.trace("Waiting for transport to reconnect."); 357 try { 358 reconnectMutex.wait(1000); 359 } 360 catch (InterruptedException e) { 361 Thread.currentThread().interrupt(); 362 log.debug("Interupted: " + e, e); 363 } 364 } 365 366 if( connectedTransport==null ) { 367 if (disposed) { 370 error = new IOException ("Transport disposed."); 371 } else if (connectionFailure!=null) { 372 error = connectionFailure; 373 } else { 374 error = new IOException ("Unexpected failure."); 375 } 376 break; 377 } 378 379 Tracked tracked = stateTracker.track(command); 384 if( tracked!=null && tracked.isWaitingForResponse() ) { 385 requestMap.put(new Integer (command.getCommandId()), tracked); 386 } else if ( tracked==null && command.isResponseRequired()) { 387 requestMap.put(new Integer (command.getCommandId()), command); 388 } 389 390 try { 392 connectedTransport.oneway(command); 393 } catch (IOException e) { 394 395 if( tracked==null ) { 397 398 if( command.isResponseRequired() ) { 401 requestMap.remove(new Integer (command.getCommandId())); 402 } 403 404 throw e; 406 } 407 408 } 409 410 return; 411 412 } 413 catch (IOException e) { 414 log.debug("Send oneway attempt: " + i + " failed."); 415 handleTransportFailure(e); 416 } 417 } 418 } 419 } 420 catch (InterruptedException e) { 421 Thread.currentThread().interrupt(); 423 throw new InterruptedIOException (); 424 } 425 if(!disposed){ 426 if(error!=null){ 427 if(error instanceof IOException ) 428 throw (IOException ) error; 429 throw IOExceptionSupport.create(error); 430 } 431 } 432 } 433 434 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 435 throw new AssertionError ("Unsupported Method"); 436 } 437 438 public Object request(Object command) throws IOException { 439 throw new AssertionError ("Unsupported Method"); 440 } 441 442 public Object request(Object command,int timeout) throws IOException { 443 throw new AssertionError ("Unsupported Method"); 444 } 445 446 public void add(URI u[]) { 447 for (int i = 0; i < u.length; i++) { 448 if( !uris.contains(u[i]) ) 449 uris.add(u[i]); 450 } 451 reconnect(); 452 } 453 454 public void remove(URI u[]) { 455 for (int i = 0; i < u.length; i++) { 456 uris.remove(u[i]); 457 } 458 reconnect(); 459 } 460 461 public void add(String u){ 462 try { 463 URI uri = new URI (u); 464 if (!uris.contains(uri)) 465 uris.add(uri); 466 467 reconnect(); 468 }catch(Exception e){ 469 log.error("Failed to parse URI: " + u); 470 } 471 } 472 473 474 public void reconnect() { 475 log.debug("Waking up reconnect task"); 476 try { 477 reconnectTask.wakeup(); 478 } catch (InterruptedException e) { 479 Thread.currentThread().interrupt(); 480 } 481 } 482 483 private ArrayList getConnectList(){ 484 ArrayList l=new ArrayList (uris); 485 if (randomize){ 486 Random r=new Random (); 488 r.setSeed(System.currentTimeMillis()); 489 for (int i=0;i<l.size();i++){ 490 int p=r.nextInt(l.size()); 491 Object t=l.get(p); 492 l.set(p,l.get(i)); 493 l.set(i,t); 494 } 495 } 496 return l; 497 } 498 499 public TransportListener getTransportListener() { 500 return transportListener; 501 } 502 503 public void setTransportListener(TransportListener commandListener) { 504 this.transportListener = commandListener; 505 } 506 507 public Object narrow(Class target) { 508 509 if (target.isAssignableFrom(getClass())) { 510 return this; 511 } 512 synchronized (reconnectMutex) { 513 if (connectedTransport != null) { 514 return connectedTransport.narrow(target); 515 } 516 } 517 return null; 518 519 } 520 521 protected void restoreTransport(Transport t) throws Exception , IOException { 522 t.start(); 523 stateTracker.restore(t); 524 for (Iterator iter2 = requestMap.values().iterator(); iter2.hasNext();) { 525 Command command = (Command) iter2.next(); 526 t.oneway(command); 527 } 528 } 529 530 public boolean isUseExponentialBackOff() { 531 return useExponentialBackOff; 532 } 533 534 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 535 this.useExponentialBackOff = useExponentialBackOff; 536 } 537 538 public String toString() { 539 return connectedTransportURI==null ? "unconnected" : connectedTransportURI.toString(); 540 } 541 542 public String getRemoteAddress() { 543 if(connectedTransport != null){ 544 return connectedTransport.getRemoteAddress(); 545 } 546 return null; 547 } 548 549 } 550 | Popular Tags |