1 25 26 package org.objectweb.petals.jbi.messaging; 27 28 import java.util.HashMap ; 29 import java.util.Map ; 30 import java.util.concurrent.LinkedBlockingQueue ; 31 import java.util.concurrent.TimeUnit ; 32 33 import javax.jbi.JBIException; 34 import javax.jbi.messaging.DeliveryChannel; 35 import javax.jbi.messaging.ExchangeStatus; 36 import javax.jbi.messaging.MessageExchange; 37 import javax.jbi.messaging.MessageExchangeFactory; 38 import javax.jbi.messaging.MessagingException; 39 import javax.jbi.servicedesc.ServiceEndpoint; 40 import javax.xml.namespace.QName ; 41 42 import org.objectweb.petals.jbi.component.context.ComponentContextImpl; 43 import org.objectweb.petals.jbi.routing.Router; 44 import org.objectweb.petals.util.LoggingUtil; 45 import org.objectweb.util.monolog.api.Logger; 46 47 54 public class DeliveryChannelImpl implements DeliveryChannel { 55 56 public static final String PROPERTY_SENDSYNC="javax.jbi.messaging.sendSync"; 57 58 public static final String PROPERTY_NOACK="org.objectweb.petals.messaging.noack"; 59 60 protected ComponentContextImpl context; 61 62 protected LoggingUtil log; 63 64 67 protected Logger logger; 68 69 73 protected MessageExchangeFactoryImpl messageExchangeFactory; 74 75 protected boolean opened = true; 76 77 protected LinkedBlockingQueue <MessageExchangeImpl> queue; 78 79 protected Router router; 80 81 84 protected Map <String , MessageExchangeDecorator> waitingExchanges; 85 86 91 public DeliveryChannelImpl(ComponentContextImpl context, Router router, 92 Logger logger) { 93 this.logger = logger; 94 95 log = new LoggingUtil(logger); 96 97 queue = new LinkedBlockingQueue <MessageExchangeImpl>(); 98 99 messageExchangeFactory = new MessageExchangeFactoryImpl(this, context 100 .getAddress(), logger); 101 102 this.router = router; 103 this.context = context; 104 105 waitingExchanges = new HashMap <String , MessageExchangeDecorator>(); 106 } 107 108 114 public MessageExchange accept() throws MessagingException { 115 log.call(); 116 117 return poll(true,0); 118 } 119 120 124 public MessageExchange accept(long timeoutMS) throws MessagingException { 125 log.call(); 126 127 return poll(false,timeoutMS); 128 } 129 130 public MessageExchange poll(boolean block, long timeoutMS) throws MessagingException { 131 log.start(); 132 133 checkDeliveryChannelIsOpened(); 134 135 MessageExchangeImpl msg=null; 136 137 try { 138 if(block){ 139 msg = queue.take(); 140 }else{ 141 msg = queue.poll(timeoutMS,TimeUnit.MILLISECONDS); 142 } 143 } catch (InterruptedException e) { 144 e.printStackTrace(); 146 } 147 148 log.end(); 149 150 MessageExchange result = null; 151 if (msg != null) { 152 result = messageExchangeFactory.createExchangeDecorator(msg, msg 153 .getPattern()); 154 } 155 156 return result; 157 } 158 159 164 public synchronized void checkDeliveryChannelIsOpened() 165 throws MessagingException { 166 if (!opened) { 167 throw new MessagingException("DeliveryChannel is closed."); 168 } 169 } 170 171 177 public void checkMessageExchange(MessageExchange exchange) 178 throws MessagingException { 179 if (exchange == null) { 180 throw new MessagingException("MessageExchange is null."); 181 } 182 MessageExchangeDecorator exchangeDecorator = (MessageExchangeDecorator) exchange; 183 184 if (exchangeDecorator.getMessageExchange().isTerminated()) { 185 throw new MessagingException("MessageExchange is terminated."); 186 } 187 } 188 189 197 public synchronized void close() throws MessagingException { 198 log.call(); 199 200 checkDeliveryChannelIsOpened(); 201 202 try { 203 context.deregisterAllEndpoints(); 204 } catch (JBIException e) { 205 throw new MessagingException(e); 206 } 207 208 opened = false; 209 } 210 211 214 public MessageExchangeFactory createExchangeFactory() { 215 log.call(); 216 217 MessageExchangeFactoryImpl mef = new MessageExchangeFactoryImpl(this, 218 context.getAddress(), logger); 219 220 return mef; 221 } 222 223 226 public MessageExchangeFactory createExchangeFactory(QName interfaceName) { 227 log.call(interfaceName); 228 229 MessageExchangeFactoryImpl mef = new MessageExchangeFactoryImpl(this, 230 context.getAddress(), logger); 231 232 mef.setDefaultInterfaceName(interfaceName); 233 234 return mef; 235 } 236 237 240 public MessageExchangeFactory createExchangeFactory(ServiceEndpoint endpoint) { 241 log.call(endpoint); 242 243 MessageExchangeFactoryImpl mef = new MessageExchangeFactoryImpl(this, 244 context.getAddress(), logger); 245 246 mef.setDefaultServiceEndpoint(endpoint); 247 248 return mef; 249 } 250 251 254 public MessageExchangeFactory createExchangeFactoryForService( 255 QName serviceName) { 256 log.call(serviceName); 257 258 MessageExchangeFactoryImpl mef = new MessageExchangeFactoryImpl(this, 259 context.getAddress(), logger); 260 261 mef.setDefaultServiceName(serviceName); 262 263 return mef; 264 } 265 266 public synchronized boolean isOpened() { 267 return opened; 268 } 269 270 274 287 public void push(MessageExchangeImpl msg) throws JBIException { 288 log.start(msg); 289 290 checkDeliveryChannelIsOpened(); 291 292 295 298 MessageExchangeDecorator decorator = waitingExchanges.get(waitingExchangeKey(msg)); 299 300 if (decorator != null) { 301 synchronized (decorator) { 304 decorator.setMessageExchange(msg); 305 decorator.setWaitingOnSynchronousSend(false); 306 decorator.notifyAll(); 307 } 308 } else { 309 try { 312 queue.put(msg); 313 } catch (InterruptedException e) { 314 log.error(e.getMessage(),e); 315 throw new JBIException(e); 316 } 317 } 318 319 log.end(); 320 } 321 322 325 public void send(MessageExchange exchange) throws MessagingException { 326 log.call(exchange); 327 328 exchange.setProperty(PROPERTY_SENDSYNC,null); 330 331 sendExchange(exchange,0); 332 } 333 334 337 public boolean sendSync(MessageExchange exchange) throws MessagingException { 338 log.call(); 339 return sendSync(exchange, 0); 340 } 341 342 346 public boolean sendSync(MessageExchange exchange, long timeoutMS) 347 throws MessagingException { 348 log.start(); 349 350 MessageExchangeDecorator exchangeDecorator = (MessageExchangeDecorator) exchange; 351 352 synchronized (exchangeDecorator) { 353 try { 354 exchangeDecorator.setWaitingOnSynchronousSend(true); 356 exchangeDecorator.setProperty(PROPERTY_SENDSYNC,"true"); 357 358 waitingExchanges.put(waitingExchangeKey(exchangeDecorator),exchangeDecorator); 362 363 sendExchange(exchange, timeoutMS); 364 365 exchangeDecorator.wait(timeoutMS); 367 368 371 waitingExchanges.remove(waitingExchangeKey(exchangeDecorator)); 373 374 if (exchangeDecorator.isWaitingOnSynchronousSend()) { 376 378 exchangeDecorator.getMessageExchange().setRole( 380 exchangeDecorator.getObserverRole()); 381 382 exchangeDecorator.setStatus(ExchangeStatus.ERROR); 383 384 exchangeDecorator.getMessageExchange().setTerminated(true); 385 } 386 387 } catch (InterruptedException e) { 388 log.error(e.getMessage(), e); 389 exchangeDecorator.getMessageExchange().setRole( 391 exchangeDecorator.getObserverRole()); 392 393 exchangeDecorator.setStatus(ExchangeStatus.ERROR); 394 395 exchangeDecorator.getMessageExchange().setTerminated(true); 396 397 throw new MessagingException(e); 398 } 399 } 400 log.end(); 401 402 return !exchangeDecorator.isWaitingOnSynchronousSend(); 403 } 404 405 414 protected void sendExchange(MessageExchange exchange, long timeOut) throws MessagingException { 415 log.start(exchange); 416 417 checkDeliveryChannelIsOpened(); 418 checkMessageExchange(exchange); 419 420 MessageExchangeDecorator exchangeDecorator = (MessageExchangeDecorator) exchange; 421 MessageExchangeImpl exchangeImpl = exchangeDecorator 422 .getMessageExchange(); 423 424 try { 425 exchangeImpl.cleanMessages(); 427 428 if (!ExchangeStatus.ACTIVE.equals(exchangeImpl.getStatus())) { 430 exchangeImpl.setTerminated(true); 431 } 432 433 boolean sendMessage = true; 435 if(exchangeImpl.isTerminated()){ 436 Object noAck = exchange.getProperty(PROPERTY_NOACK); 437 sendMessage= (noAck == null || ! noAck.toString().toLowerCase().equals("true")); 438 } 439 440 if(sendMessage){ 442 router.send(context, exchangeImpl,timeOut); 443 } 444 445 } catch (JBIException e) { 446 throw new MessagingException(e); 447 } 448 log.end(); 449 } 450 451 460 protected String waitingExchangeKey(MessageExchange exchange) 461 { 462 MessageExchangeImpl meImpl = null; 463 464 if (exchange instanceof MessageExchangeDecorator){ 465 meImpl = ((MessageExchangeDecorator)exchange).getMessageExchange(); 466 }else{ 467 meImpl = (MessageExchangeImpl)exchange; 468 } 469 return meImpl.getRole()+meImpl.getExchangeId(); 470 } 471 } 472 | Popular Tags |