1 package com.ubermq.jms.server.journal; 2 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 import com.ubermq.jms.client.DeliveryMode; 5 import com.ubermq.jms.server.*; 6 import com.ubermq.jms.common.datagram.*; 7 import com.ubermq.jms.server.journal.impl.*; 8 import com.ubermq.jms.common.routing.*; 9 import com.ubermq.jms.common.routing.impl.*; 10 import com.ubermq.kernel.*; 11 import com.ubermq.kernel.overflow.*; 12 import java.io.*; 13 14 32 public final class DurableSubscriptionProxy 33 implements RouteDestNode, 34 IDatagramEndpoint, 35 DatagramSink, 36 Externalizable, 37 IMessageProcessor 38 { 39 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(DurableSubscriptionProxy.class); 40 41 46 private DurableConnectionArbiter proxyFor; 47 48 private String name, displayName, subscription; 49 50 53 private DatagramFactoryHolder factories; 54 55 58 private IControlDatagram noopDatagram; 59 60 63 private IJournal journal; 64 65 68 private static final IOverflowHandler recoveryHandler = new ExponentialBackoff(); 69 70 73 private static final ClockDaemon cd; 74 static 75 { 76 cd = new ClockDaemon(); 77 cd.setThreadFactory(new ThreadFactory() 78 { 79 public Thread newThread(Runnable p0) 80 { 81 return new Thread (p0, "DurableSubscription Periodic Tasks"); 82 } 83 }); 84 } 85 86 87 91 private static final int DGF_LOGGED_DISCONNECTED = 1 << 8; 92 93 private static final String LOG_FILE_PATH = Configurator.getProperty(ServerConfig.DURABLE_LOG_PATH, "."); 94 private static final String LOG_FILE_PREFIX = Configurator.getProperty(ServerConfig.DURABLE_LOG_FILE_PREFIX, "durable-"); 95 private static final String LOG_FILE_EXTENSION = ".log"; 96 private static final long LOG_FILE_FIXED_SIZE = Long.valueOf(Configurator.getProperty(ServerConfig.DURABLE_LOG_SIZE, "10485760")).longValue(); public static final long serialVersionUID = 1L; 98 99 private static final long AUTO_RECOVER_INTERVAL = 10000L; 100 101 105 public DurableSubscriptionProxy(ConnectionDestNode e, 106 DatagramFactoryHolder factories, 107 DurableConnectionArbiter arbiter, 108 String name, 109 String displayName, 110 String subscription) 111 throws FileNotFoundException, IOException 112 { 113 this.name = name; 114 this.displayName = displayName; 115 this.subscription = subscription; 116 this.factories = factories; 117 this.noopDatagram = factories.controlFactory().noop(); 118 this.proxyFor = arbiter; 119 proxyFor.connect(e); 120 121 init(); 122 } 123 124 127 public DurableSubscriptionProxy(DatagramFactoryHolder factories, 128 DurableConnectionArbiter arbiter, 129 String name, 130 String displayName, 131 String subscription) 132 throws FileNotFoundException, IOException 133 { 134 this.name = name; 135 this.displayName = displayName; 136 this.subscription = subscription; 137 this.factories = factories; 138 this.noopDatagram = factories.controlFactory().noop(); 139 this.proxyFor = arbiter; 140 141 init(); 142 } 143 144 148 public DurableSubscriptionProxy() 149 { 150 } 151 152 public boolean equals(Object o) 153 { 154 if (o instanceof DurableSubscriptionProxy) 155 { 156 return ((DurableSubscriptionProxy)o).getName().equals(getName()); 157 } 158 else return false; 159 } 160 161 public int hashCode() {return getName().hashCode();} 162 163 private void init() 164 throws FileNotFoundException, IOException 165 { 166 this.journal = new SimpleJournal(createFile(name), factories.datagramFactory(), LOG_FILE_FIXED_SIZE); 167 168 final IOverflowHandler handler = new DropIncoming(); 171 cd.executePeriodically(AUTO_RECOVER_INTERVAL, 172 new Runnable () 173 { 174 public void run() 175 { 176 try 177 { 178 if (proxyFor.isOpen()) 179 outputToProxy(noopDatagram, handler); 180 } 181 catch(Throwable ise) 182 { 183 log.error("", ise); 184 } 185 } 186 }, 187 false); 188 } 189 190 202 public void writeExternal(ObjectOutput out) throws IOException 203 { 204 out.writeUTF(name); 205 out.writeUTF(subscription); 206 out.writeObject(proxyFor); 207 out.writeObject(factories); 208 209 out.writeUTF(displayName); 211 } 212 213 225 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException 226 { 227 this.name = in.readUTF(); 228 this.subscription = in.readUTF(); 229 this.proxyFor = (DurableConnectionArbiter)in.readObject(); 230 this.factories = (DatagramFactoryHolder)in.readObject(); 231 this.noopDatagram = factories.controlFactory().noop(); 232 233 try 235 { 236 this.displayName = in.readUTF(); 237 } 238 catch (IOException e) 239 { 240 log.error("", e); 241 this.displayName = name; 242 } 243 244 init(); 245 } 246 247 250 public synchronized void connect(ConnectionDestNode cdn) 251 { 252 this.proxyFor.connect(cdn); 253 } 254 255 public synchronized void disconnect(ConnectionDestNode cdn) 256 { 257 this.proxyFor.disconnect(cdn); 258 recover(); 259 } 260 261 public synchronized void close() 262 { 263 proxyFor.disconnectAll(); 264 journal.close(); 265 } 266 267 271 public synchronized void unsubscribe() 272 { 273 journal.destroy(); 274 createFile(name).delete(); 275 } 276 277 280 public void accept(IConnectionInfo conn) 281 { 282 } 283 284 287 public void remove(IConnectionInfo conn) 288 { 289 } 290 291 302 public synchronized void output(IDatagram d, IOverflowHandler h) 303 throws IOException 304 { 305 308 if (!proxyFor.isOpen()) 312 { 313 if (d instanceof IMessageDatagram) 314 { 315 int deliveryMode = ((Number )((IMessageDatagram)d).getStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE)).intValue(); 316 317 if (deliveryMode == DeliveryMode.GUARANTEED_PROCESSING) 321 { 322 throw new IllegalStateException ("Guaranteed processing required, but the subscriber is disconnected."); 323 } 324 } 325 326 d.setDatagramFlagBits(DGF_LOGGED_DISCONNECTED); 328 } 329 journal.output(d, h); 330 331 outputToProxy(d, h); 334 } 335 336 339 public void deliver(IDatagram d) 340 { 341 if (d instanceof IAckDatagram) 342 ack(((IAckDatagram)d).getAckMessageId()); 343 } 344 345 private void ack(MessageId id) 346 { 347 log.debug("ack " + id ); 348 journal.ack(id); 349 } 350 351 355 public void process(IConnectionInfo conn, IDatagram read) 356 { 357 log.debug("recovering " + read); 358 359 if (read instanceof IMessageDatagram) 360 { 361 IMessageDatagram md = (IMessageDatagram)read; 362 363 if ((md.getDatagramFlags() & DGF_LOGGED_DISCONNECTED) == 0) 368 { 369 md.setStandardProperty(IMessageDatagram.STDPROP_REDELIVERY, 370 Boolean.TRUE); 371 } 372 } 373 374 outputToProxy(read, recoveryHandler); 375 } 376 377 private void outputToProxy(IDatagram d, IOverflowHandler h) 378 { 379 if (proxyFor != null && 380 proxyFor.isOpen()) 381 { 382 try 383 { 384 proxyFor.output(d, h); 385 } 386 catch(IOException ise) 387 { 388 if (proxyFor.isOpen()) 393 { 394 recover(); 395 outputToProxy(d, h); 396 } 397 else 398 { 399 ; 402 } 403 } 404 } 405 else 406 { 407 log.debug("journaled message in disconnected mode"); 408 } 409 } 410 411 417 public DurableConnectionArbiter getProxyFor() 418 { 419 return proxyFor; 420 } 421 422 public String getSubscription() 423 { 424 return subscription; 425 } 426 427 public String getName() 428 { 429 return name; 430 } 431 432 public String getDisplayName() 433 { 434 return displayName; 435 } 436 437 public int compareTo(Object o) 438 { 439 return getNodeName().compareTo(((RouteDestNode)o).getNodeName()); 440 } 441 442 public String getNodeName() 443 { 444 return "$D$" + name; 445 } 446 447 public String toString() 448 { 449 return getDisplayName(); 450 } 451 452 public boolean isOpen() 453 { 454 return true; 455 } 456 457 461 public synchronized void recover() 462 { 463 if (proxyFor.isOpen()) 464 journal.recover(this); 465 } 466 467 private static File createFile(String name) 468 { 469 return new File(LOG_FILE_PATH, LOG_FILE_PREFIX + name + LOG_FILE_EXTENSION); 470 } 471 472 } 473 | Popular Tags |