1 16 17 package org.apache.axis.ime.internal; 18 19 import org.apache.axis.AxisFault; 20 import org.apache.axis.MessageContext; 21 import org.apache.axis.components.logger.LogFactory; 22 import org.apache.axis.components.uuid.UUIDGenFactory; 23 import org.apache.axis.ime.MessageExchange; 24 import org.apache.axis.ime.MessageExchangeConstants; 25 import org.apache.axis.ime.MessageExchangeCorrelator; 26 import org.apache.axis.ime.MessageExchangeEvent; 27 import org.apache.axis.ime.MessageExchangeEventListener; 28 import org.apache.axis.ime.MessageExchangeLifecycle; 29 import org.apache.axis.ime.event.MessageFaultEvent; 30 import org.apache.axis.ime.event.MessageReceiveEvent; 31 import org.apache.commons.logging.Log; 32 33 import java.util.Hashtable ; 34 35 39 public class MessageExchangeImpl 40 implements MessageExchange, MessageExchangeLifecycle { 41 42 protected static Log log = 43 LogFactory.getLog(MessageExchangeImpl.class.getName()); 44 45 public static final long NO_TIMEOUT = -1; 46 public static final long DEFAULT_TIMEOUT = 1000 * 30; 47 48 private MessageExchangeEventListener eventListener; 49 private MessageExchangeProvider provider; 50 protected Holder holder; 51 52 public MessageExchangeImpl( 53 MessageExchangeProvider provider) { 54 } 55 56 59 public MessageExchangeCorrelator send( 60 MessageContext context) 61 throws AxisFault { 62 return send(context,null); 63 } 64 65 68 public MessageExchangeCorrelator send( 69 MessageContext context, 70 MessageExchangeEventListener listener) 71 throws AxisFault { 72 if (log.isDebugEnabled()) { 73 log.debug("Enter: MessageExchangeImpl::send"); 74 } 75 MessageExchangeCorrelator correlator = 76 (MessageExchangeCorrelator) context.getProperty( 77 MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY); 78 if (correlator == null) { 79 correlator = new SimpleMessageExchangeCorrelator( 80 UUIDGenFactory.getUUIDGen().nextUUID()); 81 context.setProperty( 82 MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY, 83 correlator); 84 } 85 MessageExchangeSendContext sendContext = 86 MessageExchangeSendContext.newInstance( 87 correlator, 88 context, 89 listener); 90 if (listener != null) { 91 provider.processReceive(sendContext); 92 } 93 provider.processSend(sendContext); 94 if (log.isDebugEnabled()) { 95 log.debug("Exit: MessageExchangeImpl::send"); 96 } 97 return correlator; 98 } 99 100 103 public MessageContext receive() 104 throws AxisFault { 105 return receive(null,NO_TIMEOUT); 106 } 107 108 111 public MessageContext receive( 112 long timeout) 113 throws AxisFault { 114 return receive(null,timeout); 115 } 116 117 120 public MessageContext receive( 121 MessageExchangeCorrelator correlator) 122 throws AxisFault { 123 return receive(correlator,NO_TIMEOUT); 124 } 125 126 129 public MessageContext receive( 130 MessageExchangeCorrelator correlator, 131 long timeout) 132 throws AxisFault { 133 if (log.isDebugEnabled()) { 134 log.debug("Enter: MessageExchangeImpl::receive"); 135 } 136 holder = new Holder(); 137 MessageExchangeEventListener oldListener = 138 getMessageExchangeEventListener(); 139 Listener listener = new Listener(holder); 140 setMessageExchangeEventListener(listener); 141 try { 142 this.receive(correlator,listener); 143 if (timeout != NO_TIMEOUT) 144 holder.waitForNotify(timeout); 145 else 146 holder.waitForNotify(); 147 } catch (InterruptedException ie) { 148 throw AxisFault.makeFault(ie); 149 } finally { 150 setMessageExchangeEventListener(oldListener); 151 } 152 if (log.isDebugEnabled()) { 153 log.debug("Exit: MessageExchangeImpl::receive"); 154 } 155 if (holder.context != null) { 156 return holder.context; 157 } 158 if (holder.exception != null) { 159 throw AxisFault.makeFault((Exception ) holder.exception); 160 } 161 return null; 162 } 163 164 167 public void receive( 168 MessageExchangeEventListener listener) 169 throws AxisFault { 170 receive(null,listener); 171 } 172 173 176 public void receive( 177 MessageExchangeCorrelator correlator, 178 MessageExchangeEventListener listener) 179 throws AxisFault { 180 if (log.isDebugEnabled()) { 181 log.debug("Enter: MessageExchangeImpl::receive"); 182 } 183 provider.processReceive( 184 MessageExchangeReceiveContext.newInstance( 185 correlator, 186 listener)); 187 if (log.isDebugEnabled()) { 188 log.debug("Exit: MessageExchangeImpl::receive"); 189 } 190 191 } 192 193 196 public MessageContext sendAndReceive( 197 MessageContext context) 198 throws AxisFault { 199 return sendAndReceive(context,NO_TIMEOUT); 200 } 201 202 205 public MessageContext sendAndReceive( 206 MessageContext context, 207 long timeout) 208 throws AxisFault { 209 if (log.isDebugEnabled()) { 210 log.debug("Enter: MessageExchangeImpl::sendAndReceive"); 211 } 212 holder = new Holder(); 213 MessageExchangeEventListener oldListener = 214 getMessageExchangeEventListener(); 215 Listener listener = new Listener(holder); 216 setMessageExchangeEventListener(listener); 217 try { 218 this.send(context,listener); 219 if (timeout != NO_TIMEOUT) 220 holder.waitForNotify(timeout); 221 else 222 holder.waitForNotify(); 223 } catch (InterruptedException ie) { 224 throw AxisFault.makeFault(ie); 225 } finally { 226 setMessageExchangeEventListener(oldListener); 227 } 228 if (log.isDebugEnabled()) { 229 log.debug("Exit: MessageExchangeImpl::sendAndReceive"); 230 } 231 if (holder.context != null) { 232 return holder.context; 233 } 234 if (holder.exception != null) { 235 throw AxisFault.makeFault((Exception ) holder.exception); 236 } 237 return null; 238 } 239 240 243 public synchronized void setMessageExchangeEventListener( 244 MessageExchangeEventListener listener) { 245 this.eventListener = listener; 246 } 247 248 251 public synchronized MessageExchangeEventListener getMessageExchangeEventListener() { 252 return this.eventListener; 253 } 254 255 259 public void setOption( 260 String OptionId, 261 Object OptionValue) { 262 provider.setOption(OptionId, OptionValue); 263 } 264 265 269 public Object getOption( 270 String OptionId) { 271 return provider.getOption(OptionId); 272 } 273 274 278 public Object getOption( 279 String OptionId, 280 Object defaultValue) { 281 return provider.getOption(OptionId, defaultValue); 282 } 283 284 288 public Hashtable getOptions() { 289 return provider.getOptions(); 290 } 291 292 296 public void setOptions(Hashtable options) { 297 provider.setOptions(options); 298 } 299 300 304 public void clearOptions() { 305 provider.clearOptions(); 306 } 307 308 309 310 312 private class Holder { 313 private MessageExchangeCorrelator correlator; 314 private MessageContext context; 315 private Throwable exception; 316 private boolean done = false; 317 318 public synchronized void set( 319 MessageExchangeCorrelator correlator, 320 MessageContext context) { 321 this.correlator = correlator; 322 this.context = context; 323 done = true; 324 notifyAll(); 325 } 326 327 public synchronized void set( 328 MessageExchangeCorrelator correlator, 329 Throwable throwable) { 330 this.correlator = correlator; 331 this.exception = throwable; 332 done = true; 333 notifyAll(); 334 } 335 336 public synchronized void waitForNotify() 337 throws InterruptedException { 338 if (!done) wait(); 339 return; 340 } 341 342 public synchronized void waitForNotify(long timeout) 343 throws InterruptedException { 344 if (!done) wait(timeout); 345 return; 346 } 347 348 } 349 350 public class Listener 351 implements MessageExchangeEventListener { 352 353 protected Holder holder; 354 355 public Listener(Holder holder) { 356 this.holder = holder; 357 } 358 359 362 public void onEvent( 363 MessageExchangeEvent event) { 364 if (event instanceof MessageReceiveEvent) { 365 MessageReceiveEvent receiveEvent = (MessageReceiveEvent)event; 366 holder.set( 367 receiveEvent.getMessageExchangeCorrelator(), 368 receiveEvent.getMessageContext()); 369 } 370 else if (event instanceof MessageFaultEvent) { 371 MessageFaultEvent faultEvent = (MessageFaultEvent)event; 372 holder.set(faultEvent.getMessageExchangeCorrelator(), faultEvent.getException()); 373 } 374 } 375 } 376 377 378 379 381 384 public void awaitShutdown() 385 throws InterruptedException { 386 if (log.isDebugEnabled()) { 387 log.debug("Enter: MessageExchangeImpl::awaitShutdown"); 388 } 389 provider.awaitShutdown(); 390 if (log.isDebugEnabled()) { 391 log.debug("Exit: MessageExchangeImpl::awaitShutdown"); 392 } 393 } 394 395 398 public void cleanup() 399 throws InterruptedException { 400 if (log.isDebugEnabled()) { 401 log.debug("Enter: MessageExchangeImpl::cleanup"); 402 } 403 provider.cleanup(); 404 if (log.isDebugEnabled()) { 405 log.debug("Exit: MessageExchangeImpl::cleanup"); 406 } 407 } 408 409 412 public void awaitShutdown(long timeout) 413 throws InterruptedException { 414 if (log.isDebugEnabled()) { 415 log.debug("Enter: MessageExchangeImpl::awaitShutdown"); 416 } 417 provider.awaitShutdown(timeout); 418 if (log.isDebugEnabled()) { 419 log.debug("Exit: MessageExchangeImpl::awaitShutdown"); 420 } 421 } 422 423 426 public void init() { 427 if (log.isDebugEnabled()) { 428 log.debug("Enter: MessageExchangeImpl::init"); 429 } 430 provider.init(); 431 if (log.isDebugEnabled()) { 432 log.debug("Exit: MessageExchangeImpl::init"); 433 } 434 } 435 436 439 public void shutdown() { 440 if (log.isDebugEnabled()) { 441 log.debug("Enter: MessageExchangeImpl::shutdown"); 442 } 443 provider.shutdown(); 444 if (log.isDebugEnabled()) { 445 log.debug("Exit: MessageExchangeImpl::shutdown"); 446 } 447 } 448 449 452 public void shutdown(boolean force) { 453 if (log.isDebugEnabled()) { 454 log.debug("Enter: MessageExchangeImpl::shutdown"); 455 } 456 provider.shutdown(force); 457 if (log.isDebugEnabled()) { 458 log.debug("Exit: MessageExchangeImpl::shutdown"); 459 } 460 } 461 462 } 463 | Popular Tags |