1 21 package com.presumo.jms.router; 22 23 import com.presumo.jms.message.JmsMessage; 24 import com.presumo.jms.message.SystemMessageConstants; 25 import com.presumo.jms.plugin.implementation.MemoryMessageQueue; 26 import com.presumo.jms.plugin.transport.Transport; 27 import com.presumo.jms.selector.JmsOperand; 28 import com.presumo.jms.selector.Parser; 29 30 31 import com.presumo.jms.resources.Resources; 32 import com.presumo.util.log.Logger; 33 import com.presumo.util.log.LoggerFactory; 34 35 import java.io.IOException ; 36 import java.util.ArrayList ; 37 38 import javax.jms.InvalidSelectorException ; 39 import javax.jms.JMSException ; 40 41 49 public class RemoteSession implements RoutingTarget 50 { 51 52 53 protected static final int MSG_BATCH_SIZE = 2000; 54 55 56 protected static final long latency = 500; 57 58 59 protected static final String FILTER_PROPERTY = "JMSX_FILTER"; 60 61 62 protected static final String ACKS_PROPERTY = "JMSX_ACKS"; 63 64 65 69 protected JmsOperand remoteFilter; 70 71 74 protected JmsOperand localFilter; 75 76 77 protected final Router router; 78 79 80 protected Transport transport; 81 82 83 protected ConnectionListener connxListener; 84 85 86 protected PersistentAckHandler persistentAckHandler; 87 88 89 protected MessageReader msgReader; 90 91 92 protected MessageWriter msgWriter; 93 94 95 protected int targetID; 96 97 98 private final Parser parser = Parser.getInstance(); 99 100 101 private final ArrayList outbox = new ArrayList (); 102 103 104 108 public RemoteSession(Router router, 109 Transport transport, 110 ConnectionListener listener) 111 { 112 logger.entry("RemoteSession<init>"); 113 114 this.router = router; 115 this.transport = transport; 116 this.connxListener = listener; 117 118 try { 119 remoteFilter = parser.parseFilter("true"); 120 } catch (InvalidSelectorException ise) {} 121 122 persistentAckHandler = new PersistentAckHandler(this); 123 router.addTarget(this); 124 start(); 125 126 logger.exit("RemoteSession<init>"); 127 } 128 129 130 134 public void setTransport(Transport t) 135 { 136 logger.entry("setTransport", t); 137 138 this.transport = t; 141 142 logger.exit("setTransport"); 143 } 144 145 146 public synchronized void start() 147 { 148 logger.entry("start"); 149 150 if (msgReader == null) { 151 msgReader = new MessageReader(); 152 msgReader.start(); 153 } 154 if (msgWriter == null) { 155 msgWriter = new MessageWriter(); 156 msgWriter.start(); 157 } 158 159 logger.exit("start"); 160 } 161 162 public synchronized void stop() 163 { 164 logger.entry("stop"); 165 166 if (msgReader != null) { 167 msgReader.stopReader(); 168 msgReader = null; 169 } 170 if (msgWriter != null) { 171 msgWriter.stopWriter(); 172 msgWriter = null; 173 } 174 logger.exit("stop"); 175 } 176 177 public void close() 178 { 179 logger.entry("close"); 180 this.stop(); 181 182 if (transport != null) { 183 transport.close(); 184 transport = null; 185 } 186 187 router.removeTarget(this); 188 logger.exit("close"); 189 } 190 191 195 public void setTargetID(int id) 196 { 197 logger.entry("setTargetID", new Integer (id)); 198 this.targetID = id; 199 logger.exit("setTargetID"); 200 } 201 202 public JmsOperand getRoutingFilter() 203 { 204 logger.entry("getRoutingFilter"); 205 JmsOperand retval = remoteFilter; 206 logger.exit("getRoutingFilter", retval); 207 return retval; 208 } 209 210 public void setRemoteRoutingFilter(JmsOperand filter, boolean add) 211 { 212 logger.entry("setRemoteRoutingFilter", filter, new Boolean (add)); 213 214 if (filter != localFilter) 215 { 216 JmsMessage msg = new JmsMessage(router.getName()); 217 try { 218 String newFilter = parser.unparse(filter); 219 msg.setJMSSystemMsgType(SystemMessageConstants.REMOTE_FILTER_CHANGE_TYPE); 220 msg.setStringProperty(FILTER_PROPERTY, parser.unparse(filter)); 221 localFilter = filter; 222 223 if (logger.isDebugEnabled()) { 224 logger.debug("setRemoteRoutingFilter() local filter changed to:\n"+newFilter); 225 } 226 queueMessage(msg); 227 } catch (Exception e) { 228 logger.exception(e); 230 } 231 } 232 logger.exit("setRemoteRoutingFilter"); 233 } 234 235 public boolean needsFilterUpdates() 236 { 237 return true; 238 } 239 240 246 public JmsMessage takeMessage(JmsMessage msg) 247 { 248 boolean taken = false; 249 if (msg.getSendingTargetID() != this.targetID) { 251 taken = parser.evaluate(remoteFilter, msg); 252 if (taken) { 253 persistentAckHandler.handleOutgoingMsg(msg); 254 255 synchronized(outbox) { 256 outbox.add(msg); 257 outbox.notifyAll(); 258 } 259 260 } 261 } 262 return msg; 263 } 264 265 266 270 271 274 public void receiveMessages(JmsMessage [] msgs) 275 { 276 int length = msgs.length; 277 int msgsRemoved = 0; 278 for (int i=0; i < length; ++i) 279 { 280 if (logger.isDebugEnabled()) 281 logger.debug("receiveMessages() \n" + msgs[i].toString()); 282 283 boolean systemMsg = false; 284 int type = msgs[i].getJMSSystemMsgType(); 285 switch (type) 286 { 287 case(SystemMessageConstants.REMOTE_FILTER_CHANGE_TYPE): 288 handleRemoteFilterMsg(msgs[i]); 289 systemMsg = true; 290 break; 291 case(SystemMessageConstants.ACKS_MSG_TYPE): 292 String acks = (String ) msgs[i].getObjectProperty(ACKS_PROPERTY); 293 persistentAckHandler.handleAcks(acks); 294 systemMsg = true; 295 break; 296 } 297 298 if (systemMsg) { 299 ++msgsRemoved; 300 msgs[i] = null; 301 } 302 else { 303 304 boolean duplicate = persistentAckHandler.isDuplicate(msgs[i]); 306 if (duplicate) { 307 msgs[i] = null; 308 ++msgsRemoved; 309 } 310 else { 311 msgs[i].setSendingTargetID( targetID ); 313 persistentAckHandler.setOriginator(msgs[i]); 314 } 315 } 316 } 317 318 if (msgsRemoved > 0) { 319 msgs = collapseMsgArray(msgs, length - msgsRemoved); 320 } 321 322 try { 323 router.routeMessages(msgs); 324 persistentAckHandler.handleIncomingMsgs(msgs); 325 } catch (IOException ioe) { 326 logger.error("An exception occurred while routing msgs: \n", ioe); 328 } 329 330 } 331 332 333 337 void acksAvailable() 338 { 339 synchronized(outbox) { 340 outbox.notifyAll(); 341 } 342 } 343 344 348 private void queueMessage(JmsMessage msg) 349 { 350 synchronized(outbox) { 351 outbox.add(msg); 352 outbox.notifyAll(); 353 } 354 } 355 356 357 private void handleRemoteFilterMsg(JmsMessage msg) 358 { 359 String filter = (String ) msg.getObjectProperty(FILTER_PROPERTY); 360 try { 361 remoteFilter = parser.parseFilter(filter); 362 router.recalculateFilters(true); 363 } catch (javax.jms.InvalidSelectorException ex) { 364 logger.error("Remote client sent invalid routing filter: " + 365 transport.getRemoteID() + " :: " + filter); 366 } 367 } 368 369 372 private JmsMessage [] collapseMsgArray(JmsMessage [] msgs, int size) 373 { 374 JmsMessage [] retval = new JmsMessage[size]; 375 for (int i=0, j=0; i < msgs.length; ++i) { 376 if (msgs[i] != null) { 377 retval[j] = msgs[i]; 378 ++j; 379 } 380 } 381 return retval; 382 } 383 384 385 393 private void connectionLost(IOException ioe) 394 { 395 Thread t = new Thread ("ConnectionLost Thread") { 396 public void run() { 397 logger.entry("connectionLostThread.run()"); 398 RemoteSession.this.stop(); 399 if (transport != null) { 400 transport.close(); 401 transport = null; 402 } 403 404 connxListener.connectionLost(RemoteSession.this); 405 logger.exit("connectionLostThread.run()"); 406 } 407 }; 408 t.start(); 409 410 } 411 412 413 private JmsMessage createAckMessage() 414 { 415 JmsMessage ackMessage = null; 416 String acks = persistentAckHandler.getAckString(); 417 418 if (acks != null) { 419 420 ackMessage = new JmsMessage(router.getName()); 421 422 try { 423 ackMessage.setJMSSystemMsgType(SystemMessageConstants.ACKS_MSG_TYPE); 424 ackMessage.setStringProperty(ACKS_PROPERTY, acks); 425 } catch (JMSException jmsex) {} 426 } 427 428 return ackMessage; 429 } 430 431 432 439 protected class MessageReader extends Thread 440 { 441 442 private volatile boolean stopped = false; 443 444 public MessageReader() { super("MessageReader"); } 445 446 public void stopReader() 447 { 448 if (! stopped) stopped = true; 449 } 450 451 public void run() 452 { 453 JmsMessage [] msgs; 454 while (stopped == false) { 455 try { 456 msgs = transport.receiveMessages(); 457 if (msgs != null && stopped == false) { 458 receiveMessages(msgs); 459 msgs = null; 460 } 461 } catch (IOException ioe) { 462 if (! stopped) { 463 stopped = true; 465 connectionLost(ioe); 466 } 467 } 468 } 469 } 470 } 471 475 479 482 protected class MessageWriter extends Thread 483 { 484 private volatile boolean stopped = false; 485 private volatile boolean sendMessages; 486 487 public MessageWriter() { super("MessageWriter"); } 488 489 public final void stopWriter() 490 { 491 if (! stopped) { 492 synchronized (outbox) { 493 stopped = true; 494 outbox.notifyAll(); 495 } 496 } 497 } 498 499 public final void sendMessagesNow() 500 { 501 sendMessages = true; 502 } 503 504 public final void run() 505 { 506 long nextSend = System.currentTimeMillis() + latency; 507 long sleepTime = 0; 508 509 while (!stopped) { 510 511 synchronized (outbox) { 512 sendMessages = false; 513 514 if (outbox.size() >= MSG_BATCH_SIZE) { 515 sendMessages = true; 516 } else if(outbox.size() > 0 || 517 persistentAckHandler.acksAvailable() ) { 518 long currentTime = System.currentTimeMillis(); 519 long diff = nextSend - currentTime; 520 if (diff <= 0) { 521 sendMessages = true; 522 } 523 else { 524 sleepTime = diff; 525 } 526 } else { 527 sleepTime = 0; 528 } 529 530 if (sendMessages == false && !stopped) { 531 try { 532 outbox.wait(sleepTime); 533 } catch (InterruptedException ie) {} 534 } 535 536 } 537 538 if (sendMessages == true) { 539 send(); 540 nextSend = System.currentTimeMillis() + latency; 541 sleepTime = 0; 542 } 543 } 544 } 545 546 547 548 private final void send() 549 { 550 try { 551 JmsMessage [] msgs = null; 552 JmsMessage ackMessage = createAckMessage(); 553 554 synchronized(outbox) { 555 if (ackMessage != null) { 556 outbox.add(ackMessage); 557 } 558 msgs = new JmsMessage[outbox.size()]; 559 msgs = (JmsMessage[]) outbox.toArray(msgs); 560 outbox.clear(); 561 } 562 563 if (msgs != null && msgs.length != 0 && transport != null) { 564 transport.sendMessages(msgs); 565 } 566 567 } catch (IOException ioe) { 568 connectionLost(ioe); 569 } 570 } 571 572 573 } 574 578 579 private static Logger logger = 581 LoggerFactory.getLogger(RemoteSession.class, Resources.getBundle()); 582 584 } 585 | Popular Tags |