1 16 17 package org.apache.axis.ime.internal; 18 19 import org.apache.axis.AxisFault; 20 import org.apache.axis.Handler; 21 import org.apache.axis.components.logger.LogFactory; 22 import org.apache.axis.components.threadpool.ThreadPool; 23 import org.apache.axis.i18n.Messages; 24 import org.apache.axis.ime.MessageExchange; 25 import org.apache.axis.ime.MessageExchangeEventListener; 26 import org.apache.axis.ime.MessageExchangeFactory; 27 import org.apache.axis.ime.event.MessageSendEvent; 28 import org.apache.axis.ime.internal.util.KeyedBuffer; 29 import org.apache.axis.ime.internal.util.NonPersistentKeyedBuffer; 30 import org.apache.commons.logging.Log; 31 32 import java.util.Hashtable ; 33 34 38 public abstract class MessageExchangeProvider 39 implements MessageExchangeFactory { 40 41 protected static Log log = 42 LogFactory.getLog(MessageExchangeProvider.class.getName()); 43 44 public static final long SELECT_TIMEOUT = 1000 * 30; 45 public static final long DEFAULT_THREAD_COUNT = 5; 46 47 protected final ThreadPool WORKERS = new ThreadPool(); 48 protected final KeyedBuffer SEND = new NonPersistentKeyedBuffer(WORKERS); 49 protected final KeyedBuffer RECEIVE = new NonPersistentKeyedBuffer(WORKERS); 50 protected final KeyedBuffer RECEIVE_REQUESTS = new NonPersistentKeyedBuffer(WORKERS); 51 protected Handler sendHandler = null; 52 protected Handler receiveHandler = null; 53 54 protected boolean initialized = false; 55 56 public Handler getSendHandler() { 57 return sendHandler; 58 } 59 60 public Handler getReceiveHandler() { 61 return receiveHandler; 62 } 63 64 public void setSendHandler(Handler handler) { 65 this.sendHandler = handler; 66 } 67 68 public void setReceiveHandler(Handler handler) { 69 this.receiveHandler = handler; 70 } 71 72 protected abstract MessageExchangeEventListener getMessageExchangeEventListener(); 73 74 protected abstract ReceivedMessageDispatchPolicy getReceivedMessageDispatchPolicy(); 75 76 public MessageExchange createMessageExchange() 77 throws AxisFault { 78 return new MessageExchangeImpl(this); 79 } 80 81 public MessageExchange createMessageExchange( 82 Hashtable options) 83 throws AxisFault { 84 MessageExchange msgex = new MessageExchangeImpl(this); 85 msgex.setOptions(options); 86 return msgex; 87 } 88 89 public void cleanup() 90 throws InterruptedException { 91 if (log.isDebugEnabled()) { 92 log.debug("Enter: MessageExchangeProvider::cleanup"); 93 } 94 WORKERS.cleanup(); 95 if (log.isDebugEnabled()) { 96 log.debug("Exit: MessageExchangeProvider::cleanup"); 97 } 98 } 99 100 public void init() { 101 init(DEFAULT_THREAD_COUNT); 102 } 103 104 public void init(long THREAD_COUNT) { 105 if (log.isDebugEnabled()) { 106 log.debug("Enter: MessageExchangeProvider::init"); 107 } 108 if (initialized) 109 throw new IllegalStateException (Messages.getMessage("illegalStateException00")); 110 for (int n = 0; n < THREAD_COUNT; n++) { 111 WORKERS.addWorker(new MessageSender(WORKERS, SEND, getMessageExchangeEventListener(), getSendHandler())); 112 WORKERS.addWorker(new MessageReceiver(WORKERS, RECEIVE, getReceivedMessageDispatchPolicy(), getReceiveHandler())); 113 } 114 initialized = true; 115 if (log.isDebugEnabled()) { 116 log.debug("Exit: MessageExchangeProvider::init"); 117 } 118 } 119 120 public void processReceive( 121 MessageExchangeReceiveContext context) { 122 if (log.isDebugEnabled()) { 123 log.debug("Enter: MessageExchangeProvider::processReceive"); 124 } 125 if (context.getMessageExchangeCorrelator() != null) { 126 RECEIVE_REQUESTS.put( 127 context.getMessageExchangeCorrelator(), 128 context); 129 } else { 130 RECEIVE_REQUESTS.put( 131 SimpleMessageExchangeCorrelator.NULL_CORRELATOR, context); 132 } 133 if (log.isDebugEnabled()) { 134 log.debug("Exit: MessageExchangeProvider::processReceive"); 135 } 136 } 137 138 public void processSend( 139 MessageExchangeSendContext context) { 140 if (log.isDebugEnabled()) { 141 log.debug("Enter: MessageExchangeProvider::processSend"); 142 } 143 SEND.put( 144 context.getMessageExchangeCorrelator(), 145 context); 146 if (log.isDebugEnabled()) { 147 log.debug("Exit: MessageExchangeProvider::processSend"); 148 } 149 } 150 151 public void shutdown() { 152 shutdown(false); 153 } 154 155 public void shutdown(boolean force) { 156 if (log.isDebugEnabled()) { 157 log.debug("Enter: MessageExchangeProvider::shutdown"); 158 } 159 if (!force) { 160 WORKERS.safeShutdown(); 161 } else { 162 WORKERS.shutdown(); 163 } 164 if (log.isDebugEnabled()) { 165 log.debug("Exit: MessageExchangeProvider::shutdown"); 166 } 167 } 168 169 public void awaitShutdown() 170 throws InterruptedException { 171 if (log.isDebugEnabled()) { 172 log.debug("Enter: MessageExchangeProvider::awaitShutdown"); 173 } 174 WORKERS.awaitShutdown(); 175 if (log.isDebugEnabled()) { 176 log.debug("Exit: MessageExchangeProvider::awaitShutdown"); 177 } 178 } 179 180 public void awaitShutdown(long shutdown) 181 throws InterruptedException { 182 if (log.isDebugEnabled()) { 183 log.debug("Enter: MessageExchangeProvider::awaitShutdown"); 184 } 185 WORKERS.awaitShutdown(shutdown); 186 if (log.isDebugEnabled()) { 187 log.debug("Exit: MessageExchangeProvider::awaitShutdown"); 188 } 189 } 190 191 195 public void setOption( 196 String propertyId, 197 Object propertyValue) { 198 throw new UnsupportedOperationException (Messages.getMessage("unsupportedOperationException00")); 199 } 200 201 205 public Object getOption( 206 String propertyId) { 207 throw new UnsupportedOperationException (Messages.getMessage("unsupportedOperationException00")); 208 } 209 210 214 public Object getOption( 215 String propertyId, 216 Object defaultValue) { 217 throw new UnsupportedOperationException (Messages.getMessage("unsupportedOperationException00")); 218 } 219 220 224 public Hashtable getOptions() { 225 throw new UnsupportedOperationException (Messages.getMessage("unsupportedOperationException00")); 226 } 227 228 232 public void setOptions(Hashtable properties) { 233 throw new UnsupportedOperationException (Messages.getMessage("unsupportedOperationException00")); 234 } 235 236 240 public void clearOptions() { 241 throw new UnsupportedOperationException (Messages.getMessage("unsupportedOperationException00")); 242 } 243 244 public static class MessageReceiver 246 implements Runnable { 247 248 protected static Log log = 249 LogFactory.getLog(MessageReceiver.class.getName()); 250 251 protected ThreadPool pool; 252 protected KeyedBuffer channel; 253 protected ReceivedMessageDispatchPolicy policy; 254 protected Handler handler; 255 256 protected MessageReceiver( 257 ThreadPool pool, 258 KeyedBuffer channel, 259 ReceivedMessageDispatchPolicy policy, 260 Handler handler) { 261 this.pool = pool; 262 this.channel = channel; 263 this.policy = policy; 264 this.handler = handler; 265 } 266 267 270 public void run() { 271 if (log.isDebugEnabled()) { 272 log.debug("Enter: MessageExchangeProvider.MessageReceiver::run"); 273 } 274 try { 275 while (!pool.isShuttingDown()) { 276 MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT); 277 if (context != null) { 278 if (handler != null) 279 handler.invoke(context.getMessageContext()); 280 policy.dispatch(context); 281 } 282 } 283 } catch (Throwable t) { 284 log.error(Messages.getMessage("fault00"), t); 285 } finally { 286 pool.workerDone(this,true); 287 if (log.isDebugEnabled()) { 288 log.debug("Exit: MessageExchangeProvider.MesageReceiver::run"); 289 } 290 } 291 } 292 293 } 294 295 296 297 public static class MessageSender 298 implements Runnable { 299 300 protected static Log log = 301 LogFactory.getLog(MessageReceiver.class.getName()); 302 303 protected ThreadPool pool; 304 protected KeyedBuffer channel; 305 protected MessageExchangeEventListener listener; 306 protected Handler handler; 307 308 protected MessageSender( 309 ThreadPool pool, 310 KeyedBuffer channel, 311 MessageExchangeEventListener listener, 312 Handler handler) { 313 this.pool = pool; 314 this.channel = channel; 315 this.listener = listener; 316 this.handler = handler; 317 } 318 319 322 public void run() { 323 if (log.isDebugEnabled()) { 324 log.debug("Enter: MessageExchangeProvider.MessageSender::run"); 325 } 326 try { 327 while (!pool.isShuttingDown()) { 328 MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT); 329 if (context != null) { 330 if (handler != null) 331 handler.invoke(context.getMessageContext()); 332 333 MessageSendEvent sendEvent = new MessageSendEvent( 334 context.getMessageExchangeCorrelator(), 335 context, 336 context.getMessageContext()); 337 listener.onEvent(sendEvent); 338 } 339 } 340 } catch (Throwable t) { 341 log.error(Messages.getMessage("fault00"), t); 342 } finally { 343 pool.workerDone(this,true); 344 if (log.isDebugEnabled()) { 345 log.debug("Exit: MessageExchangeProvider.MessageSender::run"); 346 } 347 } 348 } 349 350 } 351 352 } 353 | Popular Tags |