KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > petals > jbi > messaging > DeliveryChannelImpl


1 /**
2  * PETALS: PETALS Services Platform
3  * Copyright (C) 2005 EBM WebSourcing
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): EBM WebSourcing
21  * --------------------------------------------------------------------------
22  * $Id: DeliveryChannelImpl.java,v 1.2 2005/07/22 10:24:27 alouis Exp $
23  * --------------------------------------------------------------------------
24  */

25
26 package org.objectweb.petals.jbi.messaging;
27
28 import java.util.HashMap JavaDoc;
29 import java.util.Map JavaDoc;
30 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
31 import java.util.concurrent.TimeUnit JavaDoc;
32
33 import javax.jbi.JBIException;
34 import javax.jbi.messaging.DeliveryChannel;
35 import javax.jbi.messaging.ExchangeStatus;
36 import javax.jbi.messaging.MessageExchange;
37 import javax.jbi.messaging.MessageExchangeFactory;
38 import javax.jbi.messaging.MessagingException;
39 import javax.jbi.servicedesc.ServiceEndpoint;
40 import javax.xml.namespace.QName JavaDoc;
41
42 import org.objectweb.petals.jbi.component.context.ComponentContextImpl;
43 import org.objectweb.petals.jbi.routing.Router;
44 import org.objectweb.petals.util.LoggingUtil;
45 import org.objectweb.util.monolog.api.Logger;
46
47 /**
48  * TODO DeliveryChannelImpl:check multi thread use
49  *
50  * @author Adrien LOUIS - EBM WebSourcing
51  * @author Gael BLONDELLE - EBM WebSourcing
52  * @author ofabre - EBM Websourcing
53  */

54 public class DeliveryChannelImpl implements DeliveryChannel {
55
56     public static final String JavaDoc PROPERTY_SENDSYNC="javax.jbi.messaging.sendSync";
57     
58     public static final String JavaDoc PROPERTY_NOACK="org.objectweb.petals.messaging.noack";
59     
60     protected ComponentContextImpl context;
61
62     protected LoggingUtil log;
63
64     /**
65      * Monolog Logger instance.
66      */

67     protected Logger logger;
68
69     /**
70      * This messageExchangeFactory is used to return
71      * <code>MessageExchangeDecorator</code> on accept() methods.
72      */

73     protected MessageExchangeFactoryImpl messageExchangeFactory;
74
75     protected boolean opened = true;
76
77     protected LinkedBlockingQueue JavaDoc<MessageExchangeImpl> queue;
78
79     protected Router router;
80
81     /**
82      * Map of MsgExchange that are blocked during a Synchronous send
83      */

84     protected Map JavaDoc<String JavaDoc, MessageExchangeDecorator> waitingExchanges;
85
86     /**
87      * Constructor
88      *
89      * @param c
90      */

91     public DeliveryChannelImpl(ComponentContextImpl context, Router router,
92         Logger logger) {
93         this.logger = logger;
94
95         log = new LoggingUtil(logger);
96
97         queue = new LinkedBlockingQueue JavaDoc<MessageExchangeImpl>();
98
99         messageExchangeFactory = new MessageExchangeFactoryImpl(this, context
100             .getAddress(), logger);
101
102         this.router = router;
103         this.context = context;
104
105         waitingExchanges = new HashMap JavaDoc<String JavaDoc, MessageExchangeDecorator>();
106     }
107
108     /**
109      * return a MessageExchangeDecorator with 'consumer' or 'provider' observer
110      * role (depending on the messageExchange getRole())
111      *
112      * @see javax.jbi.messaging.DeliveryChannel#accept()
113      */

114     public MessageExchange accept() throws MessagingException {
115         log.call();
116
117         return poll(true,0);
118     }
119
120     /**
121      * @see javax.jbi.messaging.DeliveryChannel#accept(long) TODO
122      * accept:synchronize
123      */

124     public MessageExchange accept(long timeoutMS) throws MessagingException {
125         log.call();
126
127         return poll(false,timeoutMS);
128     }
129     
130     public MessageExchange poll(boolean block, long timeoutMS) throws MessagingException {
131         log.start();
132
133         checkDeliveryChannelIsOpened();
134
135         MessageExchangeImpl msg=null;
136
137             try {
138                 if(block){
139                     msg = queue.take();
140                 }else{
141                     msg = queue.poll(timeoutMS,TimeUnit.MILLISECONDS);
142                 }
143             } catch (InterruptedException JavaDoc e) {
144                 // TODO Auto-generated catch block
145
e.printStackTrace();
146             }
147
148         log.end();
149
150         MessageExchange result = null;
151         if (msg != null) {
152             result = messageExchangeFactory.createExchangeDecorator(msg, msg
153                 .getPattern());
154         }
155
156         return result;
157     }
158
159     /**
160      * Check if the <code>DeliveryChannel</code> is opened
161      *
162      * @throws MessagingException
163      */

164     public synchronized void checkDeliveryChannelIsOpened()
165         throws MessagingException {
166         if (!opened) {
167             throw new MessagingException("DeliveryChannel is closed.");
168         }
169     }
170
171     /**
172      * Check if the <code>MessageExchange</code> is Ok (not null and not
173      * terminated)
174      *
175      * @throws MessagingException
176      */

177     public void checkMessageExchange(MessageExchange exchange)
178         throws MessagingException {
179         if (exchange == null) {
180             throw new MessagingException("MessageExchange is null.");
181         }
182         MessageExchangeDecorator exchangeDecorator = (MessageExchangeDecorator) exchange;
183
184         if (exchangeDecorator.getMessageExchange().isTerminated()) {
185             throw new MessagingException("MessageExchange is terminated.");
186         }
187     }
188
189     /**
190      * This method wake up all threads that are waiting for a message. This
191      * method deregister all endpoints for this component.
192      * FIXME this method does not wakeup waiting threads anymore
193      * @see javax.jbi.messaging.DeliveryChannel#close() TODO close:manage the
194      * received messages which are not consummed and are still in the
195      * incoming message queue.
196      */

197     public synchronized void close() throws MessagingException {
198         log.call();
199
200         checkDeliveryChannelIsOpened();
201
202         try {
203             context.deregisterAllEndpoints();
204         } catch (JBIException e) {
205             throw new MessagingException(e);
206         }
207        
208         opened = false;
209     }
210
211     /**
212      * @see javax.jbi.messaging.DeliveryChannel#createExchangeFactory()
213      */

214     public MessageExchangeFactory createExchangeFactory() {
215         log.call();
216
217         MessageExchangeFactoryImpl mef = new MessageExchangeFactoryImpl(this,
218             context.getAddress(), logger);
219
220         return mef;
221     }
222
223     /**
224      * @see javax.jbi.messaging.DeliveryChannel#createExchangeFactory(javax.xml.namespace.QName)
225      */

226     public MessageExchangeFactory createExchangeFactory(QName JavaDoc interfaceName) {
227         log.call(interfaceName);
228
229         MessageExchangeFactoryImpl mef = new MessageExchangeFactoryImpl(this,
230             context.getAddress(), logger);
231
232         mef.setDefaultInterfaceName(interfaceName);
233
234         return mef;
235     }
236
237     /**
238      * @see javax.jbi.messaging.DeliveryChannel#createExchangeFactory(javax.jbi.servicedesc.ServiceEndpoint)
239      */

240     public MessageExchangeFactory createExchangeFactory(ServiceEndpoint endpoint) {
241         log.call(endpoint);
242
243         MessageExchangeFactoryImpl mef = new MessageExchangeFactoryImpl(this,
244             context.getAddress(), logger);
245
246         mef.setDefaultServiceEndpoint(endpoint);
247
248         return mef;
249     }
250
251     /**
252      * @see javax.jbi.messaging.DeliveryChannel#createExchangeFactoryForService(javax.xml.namespace.QName)
253      */

254     public MessageExchangeFactory createExchangeFactoryForService(
255         QName JavaDoc serviceName) {
256         log.call(serviceName);
257
258         MessageExchangeFactoryImpl mef = new MessageExchangeFactoryImpl(this,
259             context.getAddress(), logger);
260
261         mef.setDefaultServiceName(serviceName);
262
263         return mef;
264     }
265
266     public synchronized boolean isOpened() {
267         return opened;
268     }
269
270     // ///////////
271
// non jbi
272
// //////////
273

274     /**
275      * Used by the <code>Router</code> when a message is received by the
276      * <code>Transporter</code> and the recepient of the message is the
277      * component attached to this channel. The message is put in the channel
278      * queue, and will be consummed when the component calls an <i>accept</i>
279      * mehod; or if its a response of a sendSync() method, unblock it.
280      *
281      * @param msg
282      * the message received by the <code>Transporter</code>
283      * @throws JBIException
284      * the channel has been closed by the component. The message can
285      * not be accepted
286      */

287     public void push(MessageExchangeImpl msg) throws JBIException {
288         log.start(msg);
289
290         checkDeliveryChannelIsOpened();
291
292         // check if this ME is in a synchronized state
293
// so we must unblock the sendSync() method,
294

295         // or if this ME is asynchronized,
296
// and we just do the accept().
297

298         MessageExchangeDecorator decorator = waitingExchanges.get(waitingExchangeKey(msg));
299
300         if (decorator != null) {
301             // we have to notify the ProviderMEDecorator
302
// that its response is received.
303
synchronized (decorator) {
304                 decorator.setMessageExchange(msg);
305                 decorator.setWaitingOnSynchronousSend(false);
306                 decorator.notifyAll();
307             }
308         } else {
309             // the providerMEDecorator is null
310
// or was not waiting in a synchronously way
311
try {
312                 queue.put(msg);
313             } catch (InterruptedException JavaDoc e) {
314                 log.error(e.getMessage(),e);
315                 throw new JBIException(e);
316             }
317         }
318
319         log.end();
320     }
321
322     /**
323      * @see javax.jbi.messaging.DeliveryChannel#send(javax.jbi.messaging.MessageExchange)
324      */

325     public void send(MessageExchange exchange) throws MessagingException {
326         log.call(exchange);
327         
328         // set the 'sendSync' property to nothing
329
exchange.setProperty(PROPERTY_SENDSYNC,null);
330         
331         sendExchange(exchange,0);
332     }
333     
334     /**
335      * @see javax.jbi.messaging.DeliveryChannel#sendSynch(javax.jbi.messaging.MessageExchange)
336      */

337     public boolean sendSync(MessageExchange exchange) throws MessagingException {
338         log.call();
339         return sendSync(exchange, 0);
340     }
341
342     /**
343      * @see javax.jbi.messaging.DeliveryChannel#sendSynch(javax.jbi.messaging.MessageExchange,
344      * long)
345      */

346     public boolean sendSync(MessageExchange exchange, long timeoutMS)
347         throws MessagingException {
348         log.start();
349
350         MessageExchangeDecorator exchangeDecorator = (MessageExchangeDecorator) exchange;
351
352         synchronized (exchangeDecorator) {
353             try {
354                 // Prepare the send
355
exchangeDecorator.setWaitingOnSynchronousSend(true);
356                 exchangeDecorator.setProperty(PROPERTY_SENDSYNC,"true");
357                 
358                 // Reference this exchange in the Map of waitingExchanges
359
// It will be used to wake up the exchange when the response
360
// will arrive
361
waitingExchanges.put(waitingExchangeKey(exchangeDecorator),exchangeDecorator);
362
363                 sendExchange(exchange, timeoutMS);
364
365                 // wait for response or timeout
366
exchangeDecorator.wait(timeoutMS);
367
368                 // Now, the response has been sent (wait unblocked),
369
// or timeout occured.
370

371                 // First, unreference the exchange
372
waitingExchanges.remove(waitingExchangeKey(exchangeDecorator));
373
374                 // Next, check if timeout or response
375
if (exchangeDecorator.isWaitingOnSynchronousSend()) {
376                     // we were always waiting for the response
377

378                     // get the ownership of the ME
379
exchangeDecorator.getMessageExchange().setRole(
380                         exchangeDecorator.getObserverRole());
381
382                     exchangeDecorator.setStatus(ExchangeStatus.ERROR);
383
384                     exchangeDecorator.getMessageExchange().setTerminated(true);
385                 }
386
387             } catch (InterruptedException JavaDoc e) {
388                 log.error(e.getMessage(), e);
389                 // get the ownership of the ME
390
exchangeDecorator.getMessageExchange().setRole(
391                     exchangeDecorator.getObserverRole());
392
393                 exchangeDecorator.setStatus(ExchangeStatus.ERROR);
394
395                 exchangeDecorator.getMessageExchange().setTerminated(true);
396
397                 throw new MessagingException(e);
398             }
399         }
400         log.end();
401
402         return !exchangeDecorator.isWaitingOnSynchronousSend();
403     }
404
405     /**
406      * Send the exchange to the underlying layer.<br>
407      * Unused data in the exchange are cleaned before.<br>
408      * If the message to exchange is a mere status information, and the exchange
409      * is set to not send acknowledgment, the message is not sent.<br>
410      *
411      * @see javax.jbi.messaging.DeliveryChannel#send(javax.jbi.messaging.MessageExchange)
412      * @see org.objectweb.petals.jbi.messaging.MessageExchangeImpl#cleanMessages()
413      */

414     protected void sendExchange(MessageExchange exchange, long timeOut) throws MessagingException {
415         log.start(exchange);
416
417         checkDeliveryChannelIsOpened();
418         checkMessageExchange(exchange);
419
420         MessageExchangeDecorator exchangeDecorator = (MessageExchangeDecorator) exchange;
421         MessageExchangeImpl exchangeImpl = exchangeDecorator
422             .getMessageExchange();
423
424         try {
425             // clean unused messages
426
exchangeImpl.cleanMessages();
427             
428             // check if the the exchange has to become terminated
429
if (!ExchangeStatus.ACTIVE.equals(exchangeImpl.getStatus())) {
430                 exchangeImpl.setTerminated(true);
431             }
432             
433             // check is status information message have to be sent
434
boolean sendMessage = true;
435             if(exchangeImpl.isTerminated()){
436                 Object JavaDoc noAck = exchange.getProperty(PROPERTY_NOACK);
437                 sendMessage= (noAck == null || ! noAck.toString().toLowerCase().equals("true"));
438             }
439             
440             // send the message
441
if(sendMessage){
442                 router.send(context, exchangeImpl,timeOut);
443             }
444             
445         } catch (JBIException e) {
446             throw new MessagingException(e);
447         }
448         log.end();
449     }
450
451     /**
452      * Return the string key associated with this exchange :
453      * role+exchangeId. It is used to store synchronized exchanges.
454      * <br>Note: if the exchange object is <code>MessageExchangeDecorator</code>,
455      * we retreive the underlying <code>MessageExchangeImpl</code> object,
456      * to avoid the checks made on getExchangeId() or getRole().
457      * @param exchange
458      * @return
459      */

460     protected String JavaDoc waitingExchangeKey(MessageExchange exchange)
461     {
462         MessageExchangeImpl meImpl = null;
463         
464         if (exchange instanceof MessageExchangeDecorator){
465             meImpl = ((MessageExchangeDecorator)exchange).getMessageExchange();
466         }else{
467             meImpl = (MessageExchangeImpl)exchange;
468         }
469         return meImpl.getRole()+meImpl.getExchangeId();
470     }
471 }
472
Popular Tags