1 package net.walend.somnifugi; 2 3 import java.util.Set ; 4 import java.util.HashSet ; 5 import java.util.Iterator ; 6 7 import javax.jms.MessageConsumer ; 8 import javax.jms.JMSException ; 9 import javax.jms.MessageListener ; 10 import javax.jms.Message ; 11 import javax.jms.Session ; 12 13 import net.walend.somnifugi.channel.Takable; 14 15 21 22 public abstract class SomniMessageConsumer 23 implements MessageConsumer 24 { 25 private Takable<Message > feed; 26 private SomniMessageListenerRunner messageListenerRunner = null; 27 private SomniExceptionListener exceptionListener; 28 private Set <Thread > waitingThreads = new HashSet <Thread >(); 29 private SomniMessageSelector messageSelector; 30 31 private boolean started = false; 32 private boolean closed = false; 33 34 37 private final Object threadsGuard = new Object (); 39 private final Object partsGuard = new Object (); private final Object feedGuard = new Object (); 42 private final Object stateGuard = new Object (); 44 47 private SomniSession session; 48 49 50 protected SomniMessageConsumer(Takable<Message > feed,SomniExceptionListener somniExceptionListener,SomniSession session) 51 { 52 if(feed==null) 53 { 54 throw new NullPointerException ("feed can not be null."); 55 } 56 57 this.feed = feed; 58 this.exceptionListener = somniExceptionListener; 59 this.session = session; 60 } 61 62 protected SomniMessageConsumer(Takable<Message > feed,SomniExceptionListener somniExceptionListener,SomniMessageSelector messageSelector,SomniSession session) 63 { 64 this(feed,somniExceptionListener,session); 65 this.messageSelector = messageSelector; 66 } 67 68 80 public String getMessageSelector() 82 throws JMSException 83 { 84 if(messageSelector == null) 85 { 86 return null; 87 } 88 else 89 { 90 return messageSelector.toString(); 91 } 92 } 93 94 97 public SomniMessageSelector getSomniMessageSelector() 98 { 99 return messageSelector; 100 } 101 102 111 public MessageListener getMessageListener() 112 throws JMSException 113 { 114 synchronized(partsGuard) 115 { 116 if(messageListenerRunner==null) 117 { 118 return null; 119 } 120 else 121 { 122 return messageListenerRunner.getMessageListener(); 123 } 124 } 125 } 126 127 151 public void setMessageListener(MessageListener listener) 152 throws JMSException 153 { 154 synchronized(partsGuard) 155 { 156 checkClosed(); 157 if(messageListenerRunner!=null) 159 { 160 messageListenerRunner.close(); 161 } 162 if(listener!=null) 164 { 165 SomniLogger.IT.fine(getName()+" MessageListener set to "+listener.toString()); 166 167 messageListenerRunner = new SomniMessageListenerRunner(this,listener,exceptionListener); 168 Thread thread = new Thread (messageListenerRunner); 169 thread.start(); 170 } 171 } 172 } 173 174 private static boolean expired(Message message) 175 throws JMSException 176 { 177 if(message==null) 178 { 179 return false; 180 } 181 if(message.getJMSExpiration()!=0) 182 { 183 if(message.getJMSExpiration()<System.currentTimeMillis()) 184 { 185 StringBuffer buffy = new StringBuffer (); 187 buffy.append("Message "); 188 buffy.append(message.toString()); 189 buffy.append(" received "+(System.currentTimeMillis()-message.getJMSExpiration())); 190 buffy.append(" ms late. Dropped due to timeout."); 191 192 SomniLogger.IT.warning(buffy.toString()); 193 194 return true; 195 } 196 else 197 { 198 return false; 199 } 200 } 201 else 202 { 203 return false; 204 } 205 } 206 207 private void logReceived(Message message) 208 { 209 if(message!=null) 210 { 211 StringBuffer buffy = new StringBuffer (); 213 214 buffy.append(getName()); 215 buffy.append(" received "); 216 buffy.append(message.toString()); 217 218 SomniLogger.IT.finest(buffy.toString()); 219 } 220 } 221 222 private SomniMessage processReceivedMessage(SomniMessage message) 223 throws JMSException 224 { 225 SomniMessage result = message; 226 227 if(session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) 229 { 230 session.addMessageToAcknowledge(result); 231 result.setConsumer(this); 232 } 233 234 if (session.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE) 236 { 237 result.acknowledge(); 238 } 239 240 return result; 241 } 242 243 259 public Message receive() 260 throws JMSException 261 { 262 synchronized(threadsGuard) 263 { 264 waitingThreads.add(Thread.currentThread()); 265 } 266 try 267 { 268 synchronized(feedGuard) 269 { 270 synchronized(stateGuard) 271 { 272 checkClosed(); 273 if(!isStarted()) 274 { 275 stateGuard.wait(); 276 checkClosed(); 277 } 278 } 279 SomniMessage result = null; 281 while(result==null) 282 { 283 result = (SomniMessage)feed.take(); 284 285 if(expired(result)) 286 { 287 result = null; 288 } 289 else 290 { 291 result = processReceivedMessage(result); 292 } 293 } 294 logReceived(result); 295 return result; 296 } 297 } 298 catch(InterruptedException ie) 299 { 300 throw new SomniInterruptedException(ie); 301 } 302 finally 303 { 304 synchronized(threadsGuard) 305 { 306 waitingThreads.remove(Thread.currentThread()); 307 } 308 } 309 } 310 311 329 public Message receive(long timeout) 330 throws JMSException 331 { 332 long timesUp = System.currentTimeMillis()+timeout; 333 synchronized(threadsGuard) 334 { 335 waitingThreads.add(Thread.currentThread()); 336 } 337 try 338 { 339 synchronized(feedGuard) 340 { 341 synchronized(stateGuard) 342 { 343 checkClosed(); 344 if(!isStarted()) 345 { 346 stateGuard.wait(timesUp-System.currentTimeMillis()); 347 checkClosed(); 348 } 349 } 350 351 SomniLogger.IT.finest("polling for "+(timesUp-System.currentTimeMillis())); 352 SomniMessage result = null; 354 while(result==null&×Up>System.currentTimeMillis()) 355 { 356 result = (SomniMessage)feed.poll(timesUp-System.currentTimeMillis()); 357 if(expired(result)) 358 { 359 result = null; 360 } 361 } 362 if(result==null) 363 { 364 SomniLogger.IT.finer("returned null after "+timeout+" ms."); 365 } 366 else 367 { 368 result = processReceivedMessage(result); 369 logReceived(result); 370 } 371 return result; 372 } 373 } 374 catch(InterruptedException ie) 375 { 376 throw new SomniInterruptedException(ie); 377 } 378 finally 379 { 380 synchronized(threadsGuard) 381 { 382 waitingThreads.remove(Thread.currentThread()); 383 } 384 } 385 } 386 387 396 public Message receiveNoWait() 397 throws JMSException 398 { 399 synchronized(feedGuard) 400 { 401 if(!isStarted()) 402 { 403 SomniLogger.IT.finer("returned null"); 404 return null; 405 } 406 checkClosed(); 407 boolean succeeded = false; 408 SomniMessage result = null; 410 411 do 413 { 414 result = (SomniMessage)feed.poll(); 415 succeeded = !expired(result); 416 } 417 while((!succeeded)&&(result!=null)); 418 419 if(result==null) 420 { 421 SomniLogger.IT.finer("returned null"); 422 } 423 else 424 { 425 result = processReceivedMessage(result); 426 logReceived(result); 427 } 428 return result; 429 } 430 } 431 432 449 public void close() 450 throws JMSException 451 { 452 if(messageListenerRunner!=null) 453 { 454 messageListenerRunner.close(); 455 } 456 457 synchronized(stateGuard) 458 { 459 closed = true; 460 stop(); 461 } 462 463 synchronized(threadsGuard) 464 { 465 Iterator it = waitingThreads.iterator(); 466 while(it.hasNext()) 467 { 468 Thread thread = (Thread )it.next(); 469 thread.interrupt(); 470 it.remove(); 471 } 472 } 473 474 if(messageListenerRunner!=null) 475 { 476 messageListenerRunner.joinThread(); 477 } 478 479 synchronized(stateGuard) 480 { 481 stateGuard.notifyAll(); 482 } 483 SomniLogger.IT.finer(getName()+" closed"); 484 } 485 486 489 public int guessSize() 490 { 491 return feed.guessSize(); 492 } 493 494 497 public long pendingMessageTimestamp() 498 { 499 Message pendingMessage = feed.peek(); 500 501 if(pendingMessage==null) 502 { 503 return 0; 504 } 505 try 506 { 507 return pendingMessage.getJMSTimestamp(); 508 } 509 catch(JMSException jmse) 510 { 511 throw new SomniRuntimeException("Trouble getting the time stamp.",jmse); 512 } 513 } 514 515 518 public SomniConsumerReport createSomniConsumerReport() 519 { 520 return new SomniConsumerReport(guessSize(),pendingMessageTimestamp()); 521 } 522 523 protected void checkClosed() 524 { 525 synchronized(stateGuard) 526 { 527 if(closed) 528 { 529 throw new IllegalStateException ("This MessageConsumer is closed."); 530 } 531 } 532 } 533 534 protected void start() 535 { 536 synchronized(stateGuard) 537 { 538 started = true; 539 stateGuard.notifyAll(); 540 } 541 SomniLogger.IT.finer(getName()+" started"); 542 } 543 544 protected void stop() 545 { 546 synchronized(stateGuard) 547 { 548 started = false; 549 } 550 SomniLogger.IT.finer(getName()+" stopped"); 551 } 552 553 protected boolean isStarted() 554 { 555 synchronized(stateGuard) 556 { 557 return started; 558 } 559 } 560 561 protected abstract String getName(); 562 563 protected abstract SomniDestination getDestination(); 564 565 protected void redeliver(Message message) 566 { 567 try 568 { 569 if(message!=null) 570 { 571 if(message.getJMSRedelivered()) 572 { 573 StringBuffer buffy = new StringBuffer (); 575 buffy.append(getName()); 576 buffy.append(" can not deliver "); 577 buffy.append(message.toString()); 578 579 SomniLogger.IT.warning(buffy.toString()); 580 } 581 else 582 { 583 StringBuffer buffy = new StringBuffer (); 585 buffy.append(getName()); 586 buffy.append(" attempting to redeliver "); 587 buffy.append(message.toString()); 588 SomniLogger.IT.warning(buffy.toString()); 589 590 message.setJMSRedelivered(true); 591 try 592 { 593 feed.pushBack(message); 594 } 595 catch(InterruptedException ie) 596 { 597 throw new SomniInterruptedException(ie); 598 } 599 } 600 } 601 } 602 catch(JMSException jmse) 603 { 604 synchronized(partsGuard) 605 { 606 exceptionListener.onException(jmse); 607 } 608 } 609 } 610 611 void acknowledge() 612 throws JMSException 613 { 614 session.acknowledge(); 615 } 616 } 617 618 638 | Popular Tags |