1 package net.walend.somnifugi; 2 3 import javax.jms.MessageListener ; 4 import javax.jms.JMSException ; 5 import javax.jms.Message ; 6 7 16 17 class SomniMessageListenerRunner 18 implements Runnable 19 { 20 private final SomniMessageConsumer consumer; 21 private final MessageListener listener; 22 private Thread thread; 23 private boolean closed = false; 24 private final SomniExceptionListener exceptionListener; 25 26 27 private final Object guard = new Object (); 28 29 protected SomniMessageListenerRunner(SomniMessageConsumer consumer,MessageListener listener,SomniExceptionListener exceptionListener) 30 { 31 this.consumer = consumer; 32 this.listener = listener; 33 this.exceptionListener = exceptionListener; 34 } 35 36 protected MessageListener getMessageListener() 37 { 38 return listener; 39 } 40 41 void close() 42 { 43 synchronized(guard) 44 { 45 closed = true; 46 } 47 SomniLogger.IT.finer(consumer.getName()+"'s MessageListener closed."); 48 } 49 50 void joinThread() 51 throws SomniCannotCloseException 52 { 53 if((thread!=null)&&(thread!=Thread.currentThread())) 54 { 55 try 56 { 57 SomniLogger.IT.fine(consumer.getName()+"'s thread, "+thread.getName()+" is joining "+Thread.currentThread().getName()); 58 thread.interrupt(); 59 thread.join(); 60 SomniLogger.IT.fine(consumer.getName()+"'s thread, "+thread.getName()+" joined "+Thread.currentThread().getName()); 61 } 62 catch(InterruptedException ie) 63 { 64 throw new SomniCannotCloseException(ie); 65 } 66 } 67 } 68 69 private boolean isClosed() 70 { 71 synchronized(guard) 72 { 73 return closed; 74 } 75 } 76 77 public void run() 79 { 80 synchronized(guard) 81 { 82 thread = Thread.currentThread(); 83 } 84 85 Message message = null; 86 try 87 { 88 while(!thread.isInterrupted()&&!isClosed()) 89 { 90 try 91 { 92 message = null; 93 message = consumer.receive(); 94 listener.onMessage(message); 95 } 96 catch(SomniInterruptedException aie) 97 { 98 SomniLogger.IT.fine(consumer.getName()+"'s MessageListener reinterrupting thread "+thread.getName()); 99 thread.interrupt(); 100 } 101 catch(JMSException jmse) 102 { 103 exceptionListener.onException(jmse); 104 } 105 } 106 } 107 catch(RuntimeException re) 108 { 109 exceptionListener.onException(re); 110 consumer.redeliver(message); 111 } 112 } 113 } 114 115 135 | Popular Tags |