1 22 package org.objectweb.petals.component.common.listener; 23 24 import java.util.logging.Level ; 25 import java.util.logging.Logger ; 26 27 import javax.jbi.messaging.DeliveryChannel; 28 import javax.jbi.messaging.ExchangeStatus; 29 import javax.jbi.messaging.MessageExchange; 30 import javax.jbi.messaging.MessagingException; 31 32 47 public class MessageExchangeListener extends Thread { 48 49 public enum IgnoredStatus { 50 NOTHING_IGNORED, DONE_IGNORED, ERROR_IGNORED, DONE_AND_ERROR_IGNORED 51 }; 52 53 protected DeliveryChannel channel; 54 55 protected boolean running = false; 56 57 protected IgnoredStatus ignoredStatus; 58 59 protected long period; 60 61 protected Logger logger; 62 63 protected IMessageExchangeProcessor processor; 64 65 72 public MessageExchangeListener(DeliveryChannel channel, 73 IMessageExchangeProcessor processor) { 74 this(channel, processor, IgnoredStatus.NOTHING_IGNORED); 75 } 76 77 86 public MessageExchangeListener(DeliveryChannel channel, 87 IMessageExchangeProcessor processor, IgnoredStatus ignoredStatus) { 88 this(channel, processor, IgnoredStatus.NOTHING_IGNORED, 0); 89 } 90 91 101 public MessageExchangeListener(DeliveryChannel channel, 102 IMessageExchangeProcessor processor, IgnoredStatus ignoredStatus, 103 long period) { 104 this.ignoredStatus = ignoredStatus; 105 this.channel = channel; 106 this.period = period; 107 this.processor = processor; 108 } 109 110 121 public MessageExchangeListener(DeliveryChannel channel, 122 IMessageExchangeProcessor processor, IgnoredStatus ignoredStatus, 123 long period,String componentName) { 124 super(componentName+"-JBI listener thread"); 125 this.ignoredStatus = ignoredStatus; 126 this.channel = channel; 127 this.period = period; 128 this.processor = processor; 129 } 130 131 136 public void close() { 137 terminate(); 138 interrupt(); 139 } 140 141 146 public void listen() { 147 start(); 148 } 149 150 153 public void run() { 154 running = true; 155 156 while (running) { 157 MessageExchange exchange = null; 158 159 try { 161 exchange = channel.accept(); 162 } catch (MessagingException e) { 163 logError(e); 164 } 165 166 if (!isIgnored(exchange)) { 168 process(exchange); 169 } 170 171 if (period > 0) { 173 try { 174 wait(period); 175 } catch (InterruptedException e) { 176 } 178 } 179 } 180 } 181 182 public void setLogger(Logger logger) { 183 this.logger = logger; 184 } 185 186 191 public void terminate() { 192 running = false; 193 } 194 195 202 protected boolean isIgnored(MessageExchange exchange) { 203 boolean result = false; 204 205 if (exchange != null) { 206 ExchangeStatus status = exchange.getStatus(); 207 if (ExchangeStatus.DONE.equals(status)) { 208 result = IgnoredStatus.DONE_IGNORED == ignoredStatus 209 || IgnoredStatus.DONE_AND_ERROR_IGNORED == ignoredStatus; 210 } else if (ExchangeStatus.ERROR.equals(status)) { 211 result = IgnoredStatus.ERROR_IGNORED == ignoredStatus 212 || IgnoredStatus.DONE_AND_ERROR_IGNORED == ignoredStatus; 213 } 214 } else { 215 result = true; 216 } 217 return result; 218 } 219 220 protected void logError(Throwable e) { 221 if (logger != null) 222 logger.log(Level.SEVERE, e.getMessage(), e); 223 else 224 e.printStackTrace(); 225 } 226 227 232 protected void process(MessageExchange exchange) { 233 if (processor != null) 234 try { 235 processor.process(exchange); 236 } catch (Exception e) { 237 logError(e); 238 } 239 } 240 } 241 | Popular Tags |