1 18 package org.apache.activemq.transport.fanout; 19 20 import java.io.IOException ; 21 import java.io.InterruptedIOException ; 22 import java.net.URI ; 23 import java.util.ArrayList ; 24 import java.util.Iterator ; 25 26 import org.apache.activemq.command.Command; 27 import org.apache.activemq.command.ConsumerInfo; 28 import org.apache.activemq.command.Message; 29 import org.apache.activemq.command.Response; 30 import org.apache.activemq.state.ConnectionStateTracker; 31 import org.apache.activemq.thread.DefaultThreadPools; 32 import org.apache.activemq.thread.Task; 33 import org.apache.activemq.thread.TaskRunner; 34 import org.apache.activemq.transport.CompositeTransport; 35 import org.apache.activemq.transport.DefaultTransportListener; 36 import org.apache.activemq.transport.FutureResponse; 37 import org.apache.activemq.transport.ResponseCallback; 38 import org.apache.activemq.transport.Transport; 39 import org.apache.activemq.transport.TransportFactory; 40 import org.apache.activemq.transport.TransportListener; 41 import org.apache.activemq.util.IOExceptionSupport; 42 import org.apache.activemq.util.ServiceStopper; 43 import org.apache.activemq.util.ServiceSupport; 44 import org.apache.commons.logging.Log; 45 import org.apache.commons.logging.LogFactory; 46 47 import java.util.concurrent.ConcurrentHashMap ; 48 import java.util.concurrent.atomic.AtomicInteger ; 49 50 55 public class FanoutTransport implements CompositeTransport { 56 57 private static final Log log = LogFactory.getLog(FanoutTransport.class); 58 59 private TransportListener transportListener; 60 private boolean disposed; 61 62 private final Object reconnectMutex = new Object (); 63 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 64 private final ConcurrentHashMap requestMap = new ConcurrentHashMap (); 65 66 private final TaskRunner reconnectTask; 67 private boolean started; 68 69 private ArrayList transports = new ArrayList (); 70 private int connectedCount=0; 71 72 private int minAckCount = 2; 73 74 private long initialReconnectDelay = 10; 75 private long maxReconnectDelay = 1000 * 30; 76 private long backOffMultiplier = 2; 77 private boolean useExponentialBackOff = true; 78 private int maxReconnectAttempts; 79 private Exception connectionFailure; 80 private FanoutTransportHandler primary; 81 82 static class RequestCounter { 83 84 final Command command; 85 final AtomicInteger ackCount; 86 87 RequestCounter(Command command, int count) { 88 this.command = command; 89 this.ackCount = new AtomicInteger (count); 90 } 91 92 public String toString() { 93 return command.getCommandId()+"="+ackCount.get(); 94 } 95 } 96 97 class FanoutTransportHandler extends DefaultTransportListener { 98 99 private final URI uri; 100 private Transport transport; 101 102 private int connectFailures; 103 private long reconnectDelay = initialReconnectDelay; 104 private long reconnectDate; 105 106 public FanoutTransportHandler(URI uri) { 107 this.uri=uri; 108 } 109 110 public void onCommand(Object o) { 111 Command command = (Command) o; 112 if (command.isResponse()) { 113 Integer id = new Integer (((Response) command).getCorrelationId()); 114 RequestCounter rc = (RequestCounter) requestMap.get(id); 115 if( rc != null ) { 116 if( rc.ackCount.decrementAndGet() <= 0 ) { 117 requestMap.remove(id); 118 transportListener.onCommand(command); 119 } 120 } else { 121 transportListener.onCommand(command); 122 } 123 } else { 124 transportListener.onCommand(command); 125 } 126 } 127 128 public void onException(IOException error) { 129 try { 130 synchronized (reconnectMutex) { 131 if( transport == null ) 132 return; 133 134 log.debug("Transport failed, starting up reconnect task", error); 135 136 ServiceSupport.dispose(transport); 137 transport=null; 138 connectedCount--; 139 if( primary == this) { 140 primary = null; 141 } 142 reconnectTask.wakeup(); 143 } 144 } 145 catch (InterruptedException e) { 146 Thread.currentThread().interrupt(); 147 transportListener.onException(new InterruptedIOException ()); 148 } 149 } 150 } 151 152 public FanoutTransport() throws InterruptedIOException { 153 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { 155 public boolean iterate() { 156 return doConnect(); 157 } 158 }, "ActiveMQ Fanout Worker: "+System.identityHashCode(this)); 159 } 160 161 164 private boolean doConnect() { 165 long closestReconnectDate=0; 166 synchronized (reconnectMutex) { 167 168 if (disposed || connectionFailure!=null) { 169 reconnectMutex.notifyAll(); 170 } 171 172 if (transports.size() == connectedCount || disposed || connectionFailure!=null) { 173 return false; 174 } else { 175 176 if( transports.isEmpty() ) { 177 } else { 179 180 181 Iterator iter = transports.iterator(); 183 for (int i = 0; iter.hasNext() && !disposed; i++) { 184 185 long now = System.currentTimeMillis(); 186 187 FanoutTransportHandler fanoutHandler = (FanoutTransportHandler) iter.next(); 188 if( fanoutHandler.transport!=null ) { 189 continue; 190 } 191 192 if( fanoutHandler.reconnectDate!=0 && fanoutHandler.reconnectDate>now ) { 194 if( closestReconnectDate==0 || fanoutHandler.reconnectDate < closestReconnectDate ) { 195 closestReconnectDate = fanoutHandler.reconnectDate; 196 } 197 continue; 198 } 199 200 URI uri = fanoutHandler.uri; 201 try { 202 log.debug("Stopped: "+this); 203 log.debug("Attempting connect to: " + uri); 204 Transport t = TransportFactory.compositeConnect(uri); 205 log.debug("Connection established"); 206 fanoutHandler.transport = t; 207 fanoutHandler.reconnectDelay = 10; 208 fanoutHandler.connectFailures = 0; 209 if( primary == null ) { 210 primary = fanoutHandler; 211 } 212 t.setTransportListener(fanoutHandler); 213 connectedCount++; 214 if (started) { 215 restoreTransport(fanoutHandler); 216 } 217 } 218 catch (Exception e) { 219 log.debug("Connect fail to: " + uri + ", reason: " + e); 220 221 if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) { 222 log.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)"); 223 connectionFailure = e; 224 reconnectMutex.notifyAll(); 225 return false; 226 } else { 227 228 if (useExponentialBackOff) { 229 fanoutHandler.reconnectDelay *= backOffMultiplier; 231 if (fanoutHandler.reconnectDelay > maxReconnectDelay) 232 fanoutHandler.reconnectDelay = maxReconnectDelay; 233 } 234 235 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay; 236 237 if( closestReconnectDate==0 || fanoutHandler.reconnectDate < closestReconnectDate ) { 238 closestReconnectDate = fanoutHandler.reconnectDate; 239 } 240 } 241 } 242 } 243 if (transports.size() == connectedCount || disposed ) { 244 reconnectMutex.notifyAll(); 245 return false; 246 } 247 248 } 249 } 250 251 } 252 253 try { 254 long reconnectDelay = closestReconnectDate - System.currentTimeMillis(); 255 if(reconnectDelay>0) { 256 log.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 257 Thread.sleep(reconnectDelay); 258 } 259 } 260 catch (InterruptedException e1) { 261 Thread.currentThread().interrupt(); 262 } 263 return true; 264 } 265 266 public void start() throws Exception { 267 synchronized (reconnectMutex) { 268 log.debug("Started."); 269 if (started) 270 return; 271 started = true; 272 for (Iterator iter = transports.iterator(); iter.hasNext();) { 273 FanoutTransportHandler th = (FanoutTransportHandler) iter.next(); 274 if( th.transport != null ) { 275 restoreTransport(th); 276 } 277 } 278 } 279 } 280 281 public void stop() throws Exception { 282 synchronized (reconnectMutex) { 283 ServiceStopper ss = new ServiceStopper(); 284 285 if (!started) 286 return; 287 started = false; 288 disposed = true; 289 290 for (Iterator iter = transports.iterator(); iter.hasNext();) { 291 FanoutTransportHandler th = (FanoutTransportHandler) iter.next(); 292 if( th.transport != null ) { 293 ss.stop(th.transport); 294 } 295 } 296 297 log.debug("Stopped: "+this); 298 ss.throwFirstException(); 299 } 300 reconnectTask.shutdown(); 301 } 302 303 public long getInitialReconnectDelay() { 304 return initialReconnectDelay; 305 } 306 307 public void setInitialReconnectDelay(long initialReconnectDelay) { 308 this.initialReconnectDelay = initialReconnectDelay; 309 } 310 311 public long getMaxReconnectDelay() { 312 return maxReconnectDelay; 313 } 314 315 public void setMaxReconnectDelay(long maxReconnectDelay) { 316 this.maxReconnectDelay = maxReconnectDelay; 317 } 318 319 public long getReconnectDelayExponent() { 320 return backOffMultiplier; 321 } 322 323 public void setReconnectDelayExponent(long reconnectDelayExponent) { 324 this.backOffMultiplier = reconnectDelayExponent; 325 } 326 327 public int getMaxReconnectAttempts() { 328 return maxReconnectAttempts; 329 } 330 331 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 332 this.maxReconnectAttempts = maxReconnectAttempts; 333 } 334 335 public void oneway(Object o) throws IOException { 336 final Command command = (Command) o; 337 try { 338 synchronized (reconnectMutex) { 339 340 boolean fanout = isFanoutCommand(command); 345 if (stateTracker.track(command)==null && command.isResponseRequired() ) { 346 int size = fanout ? minAckCount : 1; 347 requestMap.put(new Integer (command.getCommandId()), new RequestCounter(command, size)); 348 } 349 350 while (connectedCount != minAckCount && !disposed && connectionFailure==null ) { 352 log.debug("Waiting for at least "+minAckCount+" transports to be connected."); 353 reconnectMutex.wait(1000); 354 } 355 356 if( connectedCount != minAckCount ) { 358 359 Exception error; 360 361 if (disposed) { 363 error = new IOException ("Transport disposed."); 364 } else if (connectionFailure!=null) { 365 error = connectionFailure; 366 } else { 367 error = new IOException ("Unexpected failure."); 368 } 369 370 if( error instanceof IOException ) 371 throw (IOException )error; 372 throw IOExceptionSupport.create(error); 373 } 374 375 if( fanout ) { 377 for (Iterator iter = transports.iterator(); iter.hasNext();) { 378 FanoutTransportHandler th = (FanoutTransportHandler) iter.next(); 379 if( th.transport!=null ) { 380 try { 381 th.transport.oneway(command); 382 } catch (IOException e) { 383 log.debug("Send attempt: failed."); 384 th.onException(e); 385 } 386 } 387 } 388 } else { 389 try { 390 primary.transport.oneway(command); 391 } catch (IOException e) { 392 log.debug("Send attempt: failed."); 393 primary.onException(e); 394 } 395 } 396 397 } 398 } catch (InterruptedException e) { 399 Thread.currentThread().interrupt(); 401 throw new InterruptedIOException (); 402 } 403 } 404 405 409 private boolean isFanoutCommand(Command command) { 410 if( command.isMessage() ) { 411 return ((Message)command).getDestination().isTopic(); 412 } 413 if( command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ) { 414 return false; 415 } 416 return true; 417 } 418 419 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 420 throw new AssertionError ("Unsupported Method"); 421 } 422 423 public Object request(Object command) throws IOException { 424 throw new AssertionError ("Unsupported Method"); 425 } 426 427 public Object request(Object command,int timeout) throws IOException { 428 throw new AssertionError ("Unsupported Method"); 429 } 430 431 public void reconnect() { 432 log.debug("Waking up reconnect task"); 433 try { 434 reconnectTask.wakeup(); 435 } catch (InterruptedException e) { 436 Thread.currentThread().interrupt(); 437 } 438 } 439 440 public TransportListener getTransportListener() { 441 return transportListener; 442 } 443 444 public void setTransportListener(TransportListener commandListener) { 445 this.transportListener = commandListener; 446 } 447 448 public Object narrow(Class target) { 449 450 if (target.isAssignableFrom(getClass())) { 451 return this; 452 } 453 454 synchronized (reconnectMutex) { 455 for (Iterator iter = transports.iterator(); iter.hasNext();) { 456 FanoutTransportHandler th = (FanoutTransportHandler) iter.next(); 457 if( th.transport!=null ) { 458 Object rc = th.transport.narrow(target); 459 if( rc !=null ) 460 return rc; 461 } 462 } 463 } 464 465 return null; 466 467 } 468 469 protected void restoreTransport(FanoutTransportHandler th) throws Exception , IOException { 470 th.transport.start(); 471 stateTracker.setRestoreConsumers(th.transport==primary); 472 stateTracker.restore(th.transport); 473 for (Iterator iter2 = requestMap.values().iterator(); iter2.hasNext();) { 474 RequestCounter rc = (RequestCounter) iter2.next(); 475 th.transport.oneway(rc.command); 476 } 477 } 478 479 public void add(URI uris[]) { 480 481 synchronized (reconnectMutex) { 482 for (int i = 0; i < uris.length; i++) { 483 URI uri = uris[i]; 484 485 boolean match=false; 486 for (Iterator iter = transports.iterator(); iter.hasNext();) { 487 FanoutTransportHandler th = (FanoutTransportHandler) iter.next(); 488 if( th.uri.equals(uri)) { 489 match=true; 490 break; 491 } 492 } 493 if( !match ) { 494 FanoutTransportHandler th = new FanoutTransportHandler(uri); 495 transports.add(th); 496 reconnect(); 497 } 498 } 499 } 500 501 } 502 503 public void remove(URI uris[]) { 504 505 synchronized (reconnectMutex) { 506 for (int i = 0; i < uris.length; i++) { 507 URI uri = uris[i]; 508 509 boolean match=false; 510 for (Iterator iter = transports.iterator(); iter.hasNext();) { 511 FanoutTransportHandler th = (FanoutTransportHandler) iter.next(); 512 if( th.uri.equals(uri)) { 513 if( th.transport!=null ) { 514 ServiceSupport.dispose(th.transport); 515 connectedCount--; 516 } 517 iter.remove(); 518 break; 519 } 520 } 521 } 522 } 523 524 } 525 526 public String getRemoteAddress() { 527 if(primary != null){ 528 if(primary.transport != null){ 529 return primary.transport.getRemoteAddress(); 530 } 531 } 532 return null; 533 } 534 535 } 536 | Popular Tags |