1 18 package org.apache.activemq.transport.discovery.simple; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 23 import org.apache.activemq.command.DiscoveryEvent; 24 import org.apache.activemq.transport.discovery.DiscoveryAgent; 25 import org.apache.activemq.transport.discovery.DiscoveryListener; 26 27 import java.util.concurrent.atomic.AtomicBoolean ; 28 29 34 public class SimpleDiscoveryAgent implements DiscoveryAgent { 35 36 private long initialReconnectDelay = 1000; 37 private long maxReconnectDelay = 1000 * 30; 38 private long backOffMultiplier = 2; 39 private boolean useExponentialBackOff = false; 40 private int maxReconnectAttempts; 41 private final Object sleepMutex = new Object (); 42 private long minConnectTime = 500; 43 44 private DiscoveryListener listener; 45 String services[] = new String [] {}; 46 String group = "DEFAULT"; 47 private final AtomicBoolean running = new AtomicBoolean (false); 48 49 class SimpleDiscoveryEvent extends DiscoveryEvent { 50 51 private int connectFailures; 52 private long reconnectDelay = initialReconnectDelay; 53 private long connectTime = System.currentTimeMillis(); 54 private AtomicBoolean failed = new AtomicBoolean (false); 55 56 public SimpleDiscoveryEvent(String service) { 57 super(service); 58 } 59 60 } 61 62 public void setDiscoveryListener(DiscoveryListener listener) { 63 this.listener = listener; 64 } 65 66 public void registerService(String name) throws IOException { 67 } 68 69 public void start() throws Exception { 70 running.set(true); 71 for (int i = 0; i < services.length; i++) { 72 listener.onServiceAdd(new SimpleDiscoveryEvent(services[i])); 73 } 74 } 75 76 public void stop() throws Exception { 77 running.set(false); 78 synchronized(sleepMutex) { 79 sleepMutex.notifyAll(); 80 } 81 } 82 83 public String [] getServices() { 84 return services; 85 } 86 87 public void setServices(String services) { 88 this.services = services.split(","); 89 } 90 91 public void setServices(String services[]) { 92 this.services = services; 93 } 94 95 public void setServices(URI services[]) { 96 this.services = new String [services.length]; 97 for (int i = 0; i < services.length; i++) { 98 this.services[i] = services[i].toString(); 99 } 100 } 101 102 public String getGroup() { 103 return group; 104 } 105 106 public void setGroup(String group) { 107 this.group = group; 108 } 109 110 public void setBrokerName(String brokerName) { 111 } 112 113 public void serviceFailed(DiscoveryEvent devent) throws IOException { 114 115 final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent; 116 if( event.failed.compareAndSet(false, true) ) { 117 118 listener.onServiceRemove(event); 119 Thread thread = new Thread () { 120 public void run() { 121 122 123 if( event.connectTime + minConnectTime > System.currentTimeMillis() ) { 126 127 event.connectFailures++; 128 129 if( maxReconnectAttempts>0 && event.connectFailures >= maxReconnectAttempts ) { 130 return; 132 } 133 134 synchronized(sleepMutex){ 135 try{ 136 if( !running.get() ) 137 return; 138 139 sleepMutex.wait(event.reconnectDelay); 140 }catch(InterruptedException ie){ 141 Thread.currentThread().interrupt(); 142 return; 143 } 144 } 145 146 if (!useExponentialBackOff) { 147 event.reconnectDelay = initialReconnectDelay; 148 } else { 149 event.reconnectDelay*=backOffMultiplier; 151 if(event.reconnectDelay>maxReconnectDelay) 152 event.reconnectDelay=maxReconnectDelay; 153 } 154 155 } else { 156 event.connectFailures = 0; 157 event.reconnectDelay = initialReconnectDelay; 158 } 159 160 if( !running.get() ) 161 return; 162 163 event.connectTime = System.currentTimeMillis(); 164 event.failed.set(false); 165 166 listener.onServiceAdd(event); 167 } 168 }; 169 thread.setDaemon(true); 170 thread.start(); 171 } 172 } 173 174 public long getBackOffMultiplier() { 175 return backOffMultiplier; 176 } 177 178 public void setBackOffMultiplier(long backOffMultiplier) { 179 this.backOffMultiplier = backOffMultiplier; 180 } 181 182 public long getInitialReconnectDelay() { 183 return initialReconnectDelay; 184 } 185 186 public void setInitialReconnectDelay(long initialReconnectDelay) { 187 this.initialReconnectDelay = initialReconnectDelay; 188 } 189 190 public int getMaxReconnectAttempts() { 191 return maxReconnectAttempts; 192 } 193 194 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 195 this.maxReconnectAttempts = maxReconnectAttempts; 196 } 197 198 public long getMaxReconnectDelay() { 199 return maxReconnectDelay; 200 } 201 202 public void setMaxReconnectDelay(long maxReconnectDelay) { 203 this.maxReconnectDelay = maxReconnectDelay; 204 } 205 206 public long getMinConnectTime() { 207 return minConnectTime; 208 } 209 210 public void setMinConnectTime(long minConnectTime) { 211 this.minConnectTime = minConnectTime; 212 } 213 214 public boolean isUseExponentialBackOff() { 215 return useExponentialBackOff; 216 } 217 218 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 219 this.useExponentialBackOff = useExponentialBackOff; 220 } 221 222 } 223 | Popular Tags |