1 package com.ubermq.jms.client.proc; 2 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 import com.ubermq.jms.client.*; 5 import com.ubermq.jms.client.impl.*; 6 import com.ubermq.jms.common.datagram.*; 7 import com.ubermq.jms.common.routing.*; 8 import com.ubermq.jms.common.routing.impl.*; 9 import com.ubermq.kernel.*; 10 import com.ubermq.kernel.overflow.*; 11 import java.io.*; 12 import java.util.*; 13 14 19 public class ClientProc 20 implements com.ubermq.jms.client.IClientProcessor 21 { 22 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(ClientProc.class); 23 24 27 private final IConfigurableRouter subRouter = new Router(), 28 ackRouter = new Router(); 29 30 33 private final Object controlAckNotifier; 34 35 38 private SynchronizedBoolean controlAckSuccess; 39 40 43 private SynchronizedBoolean controlAckWasNotified; 44 45 49 private final List replay; 50 51 54 private IConnectionInfo managedConn; 55 56 60 private final IOverflowHandler controlHandler; 61 62 65 private final IControlDatagramFactory controlFactory; 66 67 71 protected boolean fSendControlDgrams = true; 72 73 private static final boolean SHOULD_WAIT_FOR_ACK = 74 Boolean.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_SHOULD_WAIT_FOR_ACK, "true")).booleanValue(); 75 private static final int RPC_TIMEOUT = 10000; 77 82 public ClientProc(IControlDatagramFactory controlFactory) 83 { 84 this.controlFactory = controlFactory; 85 this.controlHandler = new ExponentialBackoff( 86 Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_INITIAL_TIMEOUT, "50")).longValue(), 87 Integer.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_BACKOFF_MULTIPLIER, "2")).intValue(), 88 Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_MAXIMUM_TIMEOUT, "5000")).longValue(), 89 false); 90 this.replay = new LinkedList(); 91 92 this.controlAckNotifier = new Object (); 93 this.controlAckSuccess = new SynchronizedBoolean(false); 94 this.controlAckWasNotified = new SynchronizedBoolean(false); 95 } 96 97 public void accept(IConnectionInfo conn) 98 { 99 this.managedConn = conn; 100 } 101 102 public void remove(IConnectionInfo conn) 103 { 104 this.managedConn = null; 105 106 } 108 109 113 public void reconnected() 114 { 115 Iterator iter = replay.iterator(); 117 while (iter.hasNext()) 118 { 119 IControlDatagram cd = (IControlDatagram)iter.next(); 120 controlSequence(cd, controlHandler, false); 121 } 122 } 123 124 public void process(IConnectionInfo conn, IDatagram d) 125 { 126 try { 127 if (d instanceof IAckDatagram) { 131 ack((IAckDatagram)d); 132 } 133 if (d instanceof IControlDatagram) { 134 ctl((IControlDatagram)d); 135 } 136 if (d instanceof IMessageDatagram) { 137 msg((IMessageDatagram)d); 138 } 139 } catch(Exception x) { 140 log.error("", x); 141 } 142 } 143 144 private void ack(IAckDatagram ad) 145 { 146 if (ad.getAckMessageId() == null) 149 { 150 controlAckSuccess.set(!ad.isNegativeAck()); 151 controlAckWasNotified.set(true); 152 synchronized(controlAckNotifier) { 153 controlAckNotifier.notifyAll(); 154 } 155 } 156 else 157 { 158 MessageIdSourceSpec ss = new MessageIdSourceSpec(ad.getAckMessageId()); 159 synchronized(ackRouter) 160 { 161 Iterator acks = ackRouter.getRoutes(ss).iterator(); 162 if (!acks.hasNext()) return; 163 164 EndpointDestNode edn = ((EndpointDestNode)acks.next()); 165 edn.getEndpoint().deliver(ad); 166 167 ackRouter.remove(ss, edn); 169 } 170 } 171 } 172 173 private void msg(IMessageDatagram md) 174 { 175 for(Iterator acks = subRouter.getRoutes(new StaticSourceDescriptor(md.getTopicName())).iterator();acks.hasNext();) 176 { 177 EndpointDestNode edn = ((EndpointDestNode)acks.next()); 178 edn.getEndpoint().deliver(md); 179 } 180 } 181 182 private void ctl(IControlDatagram cd) 183 { 184 } 187 188 public void registerSubscription(String spec, 189 String selector, 190 IDatagramEndpoint e) 191 { 192 TopicSourceSpec ss = new TopicSourceSpec(spec); 193 194 boolean alreadyRegistered = internalRegister(ss, e); 196 197 if (!alreadyRegistered) { 200 controlSequence(controlFactory.subscribe(spec, selector), controlHandler); 201 } 202 } 203 204 private boolean internalRegister(TopicSourceSpec ss, 205 IDatagramEndpoint e) 206 { 207 RouteDestNode rdn = new EndpointDestNode(e); 209 subRouter.addKnownNode(rdn); 210 subRouter.addRoute(ss, rdn); 211 return false; 212 } 213 214 public void registerDurableSubscription(String spec, 215 String name, 216 String selector, 217 IDatagramEndpoint e) 218 { 219 TopicSourceSpec ss = new TopicSourceSpec(spec); 220 221 boolean alreadyRegistered = internalRegister(ss, e); 223 224 if (!alreadyRegistered) { 227 controlSequence(controlFactory.durableSubscribe(name, spec, selector), controlHandler); 228 controlSequence(controlFactory.durableRecover(name), controlHandler); 229 } 230 231 } 232 233 234 public void unregisterSubscription(String spec, 235 IDatagramEndpoint e) 236 { 237 TopicSourceSpec ss = new TopicSourceSpec(spec); 238 RouteDestNode rdn = new EndpointDestNode(e); 239 subRouter.remove(ss, rdn); 240 241 if (!(subRouter.getRoutes(ss).size() > 0)) 244 controlSequence(controlFactory.unsubscribe(spec), controlHandler); 245 } 246 247 public void unregisterDurableSubscription(String name) 248 { 249 controlSequence(controlFactory.durableUnsubscribe(name), controlHandler); 250 } 251 252 public void durableGoingAway(String name) 253 { 254 controlSequence(controlFactory.durableGoingAway(name), controlHandler); 255 } 256 257 261 public void startQueue(String queue, String selector, IDatagramEndpoint e) 262 { 263 TopicSourceSpec ss = new TopicSourceSpec(new LocalQueue(queue).getInternalTopicName()); 264 265 boolean alreadyRegistered = internalRegister(ss, e); 267 if (!alreadyRegistered) 268 controlSequence(controlFactory.queueStart(queue, selector), controlHandler); 269 } 270 271 274 public void stopQueue(String queue, IDatagramEndpoint e) 275 { 276 SourceSpec ss = new TopicSourceSpec(new LocalQueue(queue).getInternalTopicName()); 277 278 subRouter.remove(ss, new EndpointDestNode(e)); 279 controlSequence(controlFactory.queueStop(queue), controlHandler); 280 } 281 282 public boolean shouldWaitForAck() 283 { 284 return SHOULD_WAIT_FOR_ACK; 285 } 286 287 public void registerNeedAck(MessageId msgId, 288 IDatagramEndpoint e) 289 { 290 RouteDestNode rdn = new EndpointDestNode(e); 291 292 synchronized(ackRouter) 293 { 294 ackRouter.addKnownNode(rdn); 295 ackRouter.addRoute(new MessageIdSourceSpec(msgId), rdn); 296 } 297 } 298 299 public boolean controlSequence(IControlDatagram d, 300 IOverflowHandler h) 301 { 302 return controlSequence(d, h, true); 303 } 304 305 private synchronized boolean controlSequence(IControlDatagram d, 306 IOverflowHandler h, 307 boolean saveInReplayLog) 308 { 309 if (!fSendControlDgrams || 311 managedConn == null) 312 return true; 313 314 if (saveInReplayLog) 316 replay.add(d); 317 318 return outputAndWait(d, h); 319 } 320 321 334 private synchronized boolean outputAndWait(IDatagram d, 335 IOverflowHandler h) 336 throws IllegalStateException 337 { 338 log.debug("Outputting RPC datagram " + d + " on conn " + this.managedConn); 339 340 controlAckSuccess.set(false); 342 controlAckWasNotified.set(false); 343 344 synchronized(controlAckNotifier) { 346 try 347 { 348 managedConn.output(d, h); 349 } 350 catch (IOException e) { 351 remove(managedConn); 354 return false; 355 } 356 357 try 359 { 360 log.debug("Waiting for RPC reply on conn " + this.managedConn); 361 controlAckNotifier.wait(RPC_TIMEOUT); 362 if (!controlAckWasNotified.get()) 363 { 364 RuntimeException x = new IllegalStateException ("Datagram was not acknowledged in the timeout period."); 365 log.debug("RPC reply timed out", x); 366 throw x; 367 } 368 } 369 catch (InterruptedException e) {} 370 } 371 372 return controlAckSuccess.get(); 373 } 374 375 public String toString() 376 { 377 return "ClientProc for " + this.managedConn.toString(); 378 } 379 380 private final static class TopicSourceSpec 381 extends com.ubermq.jms.common.routing.impl.RegexpSourceSpec 382 implements SourceDescriptor 383 { 384 private String spec; 385 386 public TopicSourceSpec(String spec) 387 { 388 super(com.ubermq.jms.common.routing.impl.RegexpHelper.xlat(spec), 389 spec); 390 this.spec = spec; 391 } 392 393 public String getMatchValue() 394 { 395 return spec; 396 } 397 } 398 399 private final static class MessageIdSourceSpec 400 implements SourceSpec, SourceDescriptor 401 { 402 MessageId msgId; 403 404 public MessageIdSourceSpec(MessageId msgId) {this.msgId = msgId;} 405 public boolean matches(SourceDescriptor ss) 406 { 407 try { 408 return (msgId.equals(((MessageIdSourceSpec)ss).msgId)); 409 } catch(ClassCastException x) {return false;} 410 } 411 public boolean isMoreSpecificThan(SourceSpec ss) {return false;} 412 public String getDisplayName() {return msgId.toString();} 413 public String getMatchValue() 414 { 415 return msgId.toString(); 416 } 417 public boolean isIdempotentForEqualDescriptors() 418 { 419 return true; 420 } 421 422 public boolean equals(Object obj) 423 { 424 return (((MessageIdSourceSpec)obj).msgId.equals(msgId)); 425 } 426 public int hashCode() {return msgId.hashCode();} 427 } 428 429 private final static class EndpointDestNode 430 implements RouteDestNode 431 { 432 IDatagramEndpoint e; 433 434 public EndpointDestNode(IDatagramEndpoint e) {this.e = e;} 435 public String getDisplayName() {return getNodeName();} 436 public String getNodeName() {return e.toString();} 437 public boolean equals(Object o) { 438 try { 439 return (e == ((EndpointDestNode)o).e); 440 } catch(ClassCastException cce ) {return false;} 441 } 442 public int compareTo(Object o) { 443 try { 444 boolean gr = (e.hashCode() > ((EndpointDestNode)o).e.hashCode()); 445 boolean eq = equals(o); 446 return (eq) ? 0 : (gr ? 1 : -1); 447 } catch(ClassCastException cce ) {return 1;} 448 } 449 public int hashCode() {return e.hashCode();} 450 public IDatagramEndpoint getEndpoint() {return e;} 451 } 452 } 453 | Popular Tags |