KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > rift > coad > daemon > messageservice > MessageProcessor


1 /*
2  * MessageService: The message service daemon
3  * Copyright (C) 2007 Rift IT Contracting
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * MessageProcessor.java
20  */

21
22 // package path
23
package com.rift.coad.daemon.messageservice;
24
25 // java imports
26
import com.rift.coad.daemon.messageservice.named.NamedQueueManagerImpl;
27 import java.lang.reflect.Method JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Date JavaDoc;
30 import java.util.HashSet JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.Set JavaDoc;
33 import java.util.StringTokenizer JavaDoc;
34 import javax.naming.Context JavaDoc;
35 import javax.naming.InitialContext JavaDoc;
36
37 // logging import
38
import org.apache.log4j.Logger;
39
40 // coadunation imports
41
import com.rift.coad.daemon.messageservice.message.MessageImpl;
42 import com.rift.coad.daemon.messageservice.message.RPCMessageImpl;
43 import com.rift.coad.daemon.messageservice.message.MessageManagerImpl;
44 import com.rift.coad.daemon.messageservice.message.MessageManagerFactory;
45 import com.rift.coad.daemon.messageservice.named.NamedMemoryQueue;
46 import com.rift.coad.daemon.servicebroker.ServiceBroker;
47 import com.rift.coad.lib.common.RandomGuid;
48 import com.rift.coad.lib.configuration.Configuration;
49 import com.rift.coad.lib.configuration.ConfigurationFactory;
50 import com.rift.coad.lib.deployment.DeploymentMonitor;
51 import com.rift.coad.lib.naming.NamingDirector;
52 import com.rift.coad.lib.naming.NamingConstants;
53 import com.rift.coad.lib.interceptor.InterceptorWrapper;
54 import com.rift.coad.lib.interceptor.credentials.Session;
55 import com.rift.coad.lib.thread.pool.Task;
56 import com.rift.coad.lib.thread.pool.ThreadPoolManager;
57 import com.rift.coad.lib.thread.pool.PoolException;
58 import com.rift.coad.lib.deployment.BeanInfo;
59 import com.rift.coad.lib.deployment.bean.BeanConnector;
60 import com.rift.coad.lib.deployment.bean.BeanManager;
61 import com.rift.coad.lib.deployment.jmxbean.JMXBeanConnector;
62 import com.rift.coad.lib.deployment.jmxbean.JMXBeanManager;
63 import com.rift.coad.util.connection.ConnectionManager;
64 import com.rift.coad.util.transaction.UserTransactionWrapper;
65
66
67 /**
68  * This object is responsible for processing the message that are sent to the
69  * message service.
70  *
71  * @author Brett Chaldecott
72  */

73 public class MessageProcessor extends InterceptorWrapper implements Task {
74     
75     // class constants
76
private final static long BACK_OFF_PERIOD = 1000;
77     private final static String JavaDoc RETRY_DELAY = "retry_delay";
78     private final static long DEFAULT_RETRY_DELAY = 60000;
79     private final static String JavaDoc PARENT_INSTANCE = "../";
80     
81     // the logger reference
82
protected static Logger log =
83             Logger.getLogger(MessageProcessor.class.getName());
84     
85     // private member variables
86
private Context JavaDoc context = null;
87     private UserTransactionWrapper utw = null;
88     private MessageProcessInfo messageProcessInfo = null;
89     private NamingDirector namingDirector = null;
90     private long delay = 0;
91     
92     /**
93      * Creates a new instance of MessageProcessor
94      *
95      * @exception Exception
96      */

97     public MessageProcessor() throws Exception JavaDoc {
98         try {
99             context = new InitialContext JavaDoc();
100             utw = new UserTransactionWrapper();
101             namingDirector = NamingDirector.getInstance();
102             Configuration config = ConfigurationFactory.getInstance().
103                     getConfig(MessageProcessor.class);
104             delay = config.getLong(RETRY_DELAY,DEFAULT_RETRY_DELAY);
105         } catch (Exception JavaDoc ex) {
106             log.error("Failed init the message processor : " + ex.getMessage(),
107                     ex);
108             throw new Exception JavaDoc("Failed init the message processor : " +
109                     ex.getMessage(),ex);
110         }
111     }
112     
113     
114     /**
115      * The implementation of the process method used by the coadunation
116      * threading pool.
117      *
118      * @param poolManager The reference to the thread pool manager.
119      * @exception Exception
120      */

121     public void process(ThreadPoolManager poolManager) throws Exception JavaDoc {
122         DeploymentMonitor.getInstance().waitUntilInitDeployComplete();
123         if (DeploymentMonitor.getInstance().isTerminated()) {
124             return;
125         }
126         boolean foundMessage = getMessageManager();
127         poolManager.releaseThread();
128         if (foundMessage) {
129             processMessage();
130         }
131     }
132     
133     
134     /**
135      * This method retrieves the next message from the message queue manager
136      * for processing.
137      *
138      * @return True if a message was found false if not.
139      */

140     private boolean getMessageManager() {
141         try {
142             Date JavaDoc currentTime = new Date JavaDoc();
143             Date JavaDoc delayTime = new Date JavaDoc();
144             messageProcessInfo = MessageQueueManager.
145                     getInstance().getNextMessage(currentTime);
146             if (messageProcessInfo != null) {
147                 return true;
148             }
149             long difference = delayTime.getTime() - currentTime.getTime();
150             if ((delayTime.getTime() == currentTime.getTime()) ||
151                     (difference > BACK_OFF_PERIOD) || (difference < 0)) {
152                 ProcessMonitor.getInstance().monitor(BACK_OFF_PERIOD);
153             } else {
154                 ProcessMonitor.getInstance().monitor(difference);
155             }
156         } catch (Exception JavaDoc ex) {
157             log.error("Failed to retrieve a message : " + ex.getMessage(),ex);
158         }
159         return false;
160     }
161     
162     
163     /**
164      * This method is called to process a message.
165      */

166     private void processMessage() {
167         Message message = null;
168         try {
169             message = getMessage();
170             if (message.getState() == Message.UNDELIVERED) {
171                 processUndelivered(message);
172             } else if (message.getState() == Message.DELIVERED) {
173                 processDelivered(message);
174             } else if (message.getState() == Message.UNDELIVERABLE) {
175                 processUndeliverable(message);
176             }
177         } catch (Exception JavaDoc ex) {
178             log.error("Failed to process the message : " + ex.getMessage(),ex);
179             if (message == null) {
180                 pushMessage(messageProcessInfo);
181             } else {
182                 pushMessage(message,messageProcessInfo);
183             }
184         }
185     }
186     
187     
188     /**
189      * This method returns the message contained within.
190      *
191      * @return Message The message.
192      * @exception MessageServiceException
193      */

194     private Message getMessage() throws MessageServiceException {
195         Message message = null;
196         try {
197             utw.begin();
198             MessageManager messageManager = messageProcessInfo.
199                     getMessageManager();
200             message = messageManager.getMessage();
201             utw.commit();
202         } catch (Exception JavaDoc ex) {
203             log.error("Failed to retrieve the message : " + ex.getMessage(),ex);
204             throw new MessageServiceException(
205                     "Failed to retrieve the message : " + ex.getMessage(),ex);
206         } finally {
207             utw.release();
208         }
209         
210         return message;
211     }
212     
213     
214     /**
215      * This method will process the message.
216      *
217      * @param message The message to process.
218      * @exception MessageServiceException
219      */

220     private void processUndelivered(Message message) throws
221             MessageServiceException {
222         try {
223             if (message.getMessageType() == Message.POINT_TO_POINT) {
224                 if (checkIfTargetLocal(message) &&
225                         checkIfMessageInQueue(message)) {
226                     deliverMessage(message);
227                 }
228             } else if (message.getMessageType() == Message.POINT_TO_SERVICE) {
229                 if (checkIfServiceLocal(message) &&
230                         checkIfMessageInQueue(message)) {
231                     deliverMessage(message);
232                 }
233             } else if (message.getMessageType() ==
234                     Message.POINT_TO_MULTI_SERVICE) {
235                 if (!namingDirector.isPrimary()) {
236                     deliverToParent(message);
237                 } else {
238                     cloneMessageForServices(message);
239                 }
240             }
241         } catch (Exception JavaDoc ex) {
242             log.error("Failed to process the undelivered message : " +
243                     ex.getMessage(),ex);
244             throw new MessageServiceException(
245                     "Failed to process the undelivered message : " +
246                     ex.getMessage(),ex);
247         }
248     }
249     
250     
251     /**
252      * This method will process the message.
253      *
254      * @param message The message to process.
255      * @exception MessageServiceException
256      */

257     private void processDelivered(Message message) throws
258             MessageServiceException {
259         try {
260             if (checkIfReplyLocal(message) &&
261                     checkIfReplyMessageInQueue(message)) {
262                 deliverReplyMessage(message);
263             }
264         } catch (Exception JavaDoc ex) {
265             log.error("Failed to process the undelivered message : " +
266                     ex.getMessage(),ex);
267             throw new MessageServiceException(
268                     "Failed to process the undelivered message : " +
269                     ex.getMessage(),ex);
270         }
271     }
272     
273     
274     /**
275      * This method will process the undeliverable message.
276      *
277      * @param message The message to process.
278      * @exception MessageServiceException
279      */

280     private void processUndeliverable(Message message) throws
281             MessageServiceException {
282         try {
283             if (checkIfReplyLocal(message)) {
284                 utw.begin();
285                 if (NamedQueueManagerImpl.getInstance().checkForNamedQueue(
286                         MessageQueueManager.DEAD_LETTER,true)) {
287                     log.info("Assign message to dead letter queue.");
288                     messageProcessInfo.getMessageQueue().removeMessage(
289                             message.getMessageId());
290                     ((MessageManagerImpl)messageProcessInfo.getMessageManager()).
291                             assignToQueue(MessageQueueManager.DEAD_LETTER);
292                     NamedMemoryQueue.getInstance(
293                             MessageQueueManager.DEAD_LETTER).
294                             addMessage(messageProcessInfo.getMessageManager());
295                     log.info("Added the value to the dead letter queue");
296                 } else {
297                     log.error("Failed to add to the dead letter queue.");
298                 }
299                 utw.commit();
300             }
301         } catch (Exception JavaDoc ex) {
302             log.error("Failed to process the Undeliverable message : " +
303                     ex.getMessage(),ex);
304             throw new MessageServiceException(
305                     "Failed to process the Undeliverable message : " +
306                     ex.getMessage(),ex);
307         } finally {
308             utw.release();
309         }
310     }
311     
312     
313     /**
314      * This method pushes the message back onto the queue from which it was
315      * retrieved.
316      *
317      * @param messageProcessInfo The processing information for this thread.
318      */

319     private void pushMessage(MessageProcessInfo messageProcessInfo) {
320         try {
321             messageProcessInfo.getMessageQueue().pushBackMessage(
322                     messageProcessInfo.getMessageManager());
323             ProcessMonitor.getInstance().notifyProcessor();
324         } catch (Exception JavaDoc ex) {
325             log.error("Failed to push the message manager onto a queue : " +
326                     ex.getMessage(),ex);
327         }
328     }
329     
330     
331     /**
332      * This method pushes the message back onto the queue from which it was
333      * retrieved.
334      *
335      * @param message The message to push back.
336      * @param messageProcessInfo The processing information for this thread.
337      */

338     private void pushMessage(Message message,
339             MessageProcessInfo messageProcessInfo) {
340         try {
341             try {
342                 utw.begin();
343                 Date JavaDoc nextDate = new Date JavaDoc();
344                 nextDate.setTime(nextDate.getTime() + delay);
345                 ((MessageImpl)message).setNextProcessDate(nextDate);
346                 messageProcessInfo.getMessageManager().updateMessage(message);
347                 utw.commit();
348             } catch (Exception JavaDoc ex) {
349                 log.error("Failed to process the message : " +
350                         ex.getMessage(),ex);
351             } finally {
352                 utw.release();
353             }
354             messageProcessInfo.getMessageQueue().pushBackMessage(
355                     messageProcessInfo.getMessageManager());
356             ProcessMonitor.getInstance().notifyProcessor();
357         } catch (Exception JavaDoc ex) {
358             log.error("Failed to push the message manager onto a queue : " +
359                     ex.getMessage(),ex);
360         }
361     }
362     
363     
364     /**
365      * This method checks if the target this message is going to is local to
366      * this Coadunation Instance.
367      *
368      * @return TRUE if local, FALSE if not.
369      * @param message The message to perform the test on.
370      */

371     private boolean checkIfTargetLocal(Message message) throws
372             MessageServiceException {
373         try {
374             String JavaDoc target = message.getTarget();
375             if (target == null) {
376                 message.addError(Message.ERROR,
377                         "There is no target for this message");
378                 initUndeliverableProcess(message);
379                 return false;
380             }
381             
382             // check if this fall withing this part of the tree or below
383
String JavaDoc jndiBase = NamingDirector.getInstance().getJNDIBase() + "/";
384             String JavaDoc parentUrl = NamingDirector.getInstance().getPrimaryJNDIUrl();
385             String JavaDoc instanceURL = NamingDirector.getInstance().
386                     getInstanceId() + "/";
387             int pos = target.indexOf(jndiBase);
388             int instancePos = target.indexOf(instanceURL);
389             if ((((target.indexOf(parentUrl)) != -1) &&
390                     (target.indexOf(jndiBase) == -1)) ||
391                     (target.indexOf(PARENT_INSTANCE) == 0) ||
392                     ((target.indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0) &&
393                     !NamingDirector.getInstance().isPrimary())) {
394                 deliverToParent(message);
395                 return false;
396             } else if (((instancePos != -1) && (
397                     target.indexOf(NamingConstants.SUBCONTEXT,(instancePos +
398                     instanceURL.length())) != -1)) ||
399                     (target.indexOf(NamingConstants.SUBCONTEXT) == 0)) {
400                 deliverToChild(message.getTarget(),message);
401                 return false;
402             } else if (instancePos != -1) {
403                 utw.begin();
404                 target = target.substring(instancePos + instanceURL.length());
405                 message.setTarget(target);
406                 messageProcessInfo.getMessageManager().updateMessage(message);
407                 utw.commit();
408                 messageProcessInfo.getMessageQueue().pushBackMessage(
409                         messageProcessInfo.getMessageManager());
410                 ProcessMonitor.getInstance().notifyProcessor();
411                 return false;
412             }
413             return true;
414         } catch (MessageServiceException ex) {
415             throw ex;
416         } catch (Exception JavaDoc ex) {
417             log.error("Failed to check the message is local : " +
418                     ex.getMessage(),ex);
419             throw new MessageServiceException(
420                     "Failed to check the message is local : " +
421                     ex.getMessage(),ex);
422         } finally {
423             utw.release();
424         }
425     }
426     
427     
428     /**
429      * This method checks if the target this message is going to is local to
430      * this Coadunation Instance.
431      *
432      * @return TRUE if local, FALSE if not.
433      * @param message The message to perform the test on.
434      */

435     private boolean checkIfReplyLocal(Message message) throws
436             MessageServiceException {
437         try {
438             String JavaDoc reply = message.getReplyTo();
439             if (reply == null) {
440                 reply = message.getFrom();
441                 if (reply == null) {
442                     message.addError(Message.ERROR,
443                             "There is no reply for this message");
444                     initUndeliverableProcess(message);
445                     return false;
446                 }
447             }
448             
449             // check if this fall withing this part of the tree or below
450
String JavaDoc jndiBase = NamingDirector.getInstance().getJNDIBase() + "/";
451             String JavaDoc parentUrl = NamingDirector.getInstance().getPrimaryJNDIUrl();
452             String JavaDoc instanceURL = NamingDirector.getInstance().
453                     getInstanceId() + "/";
454             int pos = reply.indexOf(jndiBase);
455             int instancePos = reply.indexOf(instanceURL);
456             if ((((reply.indexOf(parentUrl)) != -1) &&
457                     (reply.indexOf(jndiBase) == -1)) ||
458                     (reply.indexOf(PARENT_INSTANCE) == 0) ||
459                     ((reply.indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0) &&
460                     !NamingDirector.getInstance().isPrimary())) {
461                 deliverToParent(message);
462                 return false;
463             } else if (((instancePos != -1) && (
464                     reply.indexOf(NamingConstants.SUBCONTEXT,(instancePos +
465                     instanceURL.length())) != -1)) ||
466                     (reply.indexOf(NamingConstants.SUBCONTEXT) == 0)) {
467                 deliverToChild(reply,message);
468                 return false;
469             } else if (instancePos != -1) {
470                 utw.begin();
471                 reply = reply.substring(instancePos + instanceURL.length());
472                 if (message.getReplyTo() != null) {
473                     message.setReplyTo(reply);
474                 } else {
475                     message.setFrom(reply);
476                 }
477                 messageProcessInfo.getMessageManager().updateMessage(message);
478                 utw.commit();
479                 messageProcessInfo.getMessageQueue().pushBackMessage(
480                         messageProcessInfo.getMessageManager());
481                 ProcessMonitor.getInstance().notifyProcessor();
482                 return false;
483             }
484             return true;
485         } catch (MessageServiceException ex) {
486             throw ex;
487         } catch (Exception JavaDoc ex) {
488             log.error("Failed to check the message is local : " +
489                     ex.getMessage(),ex);
490             throw new MessageServiceException(
491                     "Failed to check the message is local : " +
492                     ex.getMessage(),ex);
493         }
494     }
495     
496     
497     /**
498      * This method checks if the target this message is going to is local to
499      * this Coadunation Instance.
500      *
501      * @return TRUE if local, FALSE if not.
502      * @param message The message to perform the test on.
503      */

504     private boolean checkIfServiceLocal(Message message) throws
505             MessageServiceException {
506         try {
507             String JavaDoc target = message.getTarget();
508             if (target != null) {
509                 return checkIfTargetLocal(message);
510             }
511             
512             String JavaDoc[] services = message.getServices();
513             if (services == null) {
514                 message.addError(Message.ERROR,
515                         "There are no services for this message");
516                 initUndeliverableProcess(message);
517                 return false;
518             }
519             List JavaDoc serviceList = new ArrayList JavaDoc();
520             for (int index = 0; index < services.length; index++) {
521                 serviceList.add(services[index]);
522             }
523             ServiceBroker serviceBroker = (ServiceBroker)ConnectionManager.
524                     getInstance().getConnection(ServiceBroker.class,
525                     ServiceBroker.JNDI_URL);
526             String JavaDoc service = serviceBroker.getServiceProvider(serviceList);
527             if (service.length() != 0) {
528                 message.setTarget(service);
529                 utw.begin();
530                 messageProcessInfo.getMessageManager().updateMessage(message);
531                 utw.commit();
532                 messageProcessInfo.getMessageQueue().pushBackMessage(
533                         messageProcessInfo.getMessageManager());
534             } else {
535                 deliverToParent(message);
536             }
537         } catch (MessageServiceException ex) {
538             throw ex;
539         } catch (Exception JavaDoc ex) {
540             log.error("Failed to check if the services are local : " +
541                     ex.getMessage(),ex);
542             throw new MessageServiceException(
543                     "Failed to check if the services are local : " +
544                     ex.getMessage(),ex);
545         } finally {
546             utw.release();
547         }
548         return false;
549     }
550     
551     
552     /**
553      * This method checks if the target this message is going to is local to
554      * this Coadunation Instance.
555      *
556      * @return TRUE if local, FALSE if not.
557      * @param message The message to perform the test on.
558      */

559     private boolean checkIfMessageInQueue(Message message) throws
560             MessageServiceException {
561         try {
562             if (messageProcessInfo.getMessageQueue().getName().equals(
563                     message.getTarget())) {
564                 return true;
565             }
566             utw.begin();
567             if (message.getTarget().equals(MessageService.JNDI_URL) &&
568                     (message.getTargetNamedQueue() != null)) {
569                 if (false == NamedQueueManagerImpl.getInstance().
570                         checkForNamedQueue(message.getTargetNamedQueue(),false)){
571                     utw.release();
572                     message.addError(Message.ERROR,"The named queue [" +
573                             message.getTargetNamedQueue() + "] does not exist.");
574                     initUndeliverableProcess(message);
575                     return false;
576                 }
577                 messageProcessInfo.getMessageQueue().removeMessage(
578                         message.getMessageId());
579                 ((MessageManagerImpl)messageProcessInfo.getMessageManager()).
580                         assignToQueue(message.getTargetNamedQueue());
581                 NamedMemoryQueue.getInstance(
582                         message.getTargetNamedQueue()).
583                         addMessage(messageProcessInfo.getMessageManager());
584             } else {
585                 MessageQueue messageQueue = MessageQueueManager.getInstance().
586                         getQueue(message.getTarget());
587                 messageProcessInfo.getMessageQueue().removeMessage(
588                         message.getMessageId());
589                 ((MessageManagerImpl)messageProcessInfo.getMessageManager()).
590                         assignToQueue(message.getTarget());
591                 messageQueue.addMessage(messageProcessInfo.getMessageManager());
592             }
593             utw.commit();
594             return false;
595         } catch (Exception JavaDoc ex) {
596             log.error("Failed to check the target : " + ex.getMessage(),ex);
597             throw new MessageServiceException(
598                     "Failed to check the target : " + ex.getMessage(),ex);
599         } finally {
600             utw.release();
601         }
602     }
603     
604     
605     /**
606      * Check if this message is in the correct queue.
607      *
608      * @return TRUE if local, FALSE if not.
609      * @param message The message to perform the test on.
610      * @exception MessageServiceException
611      */

612     private boolean checkIfReplyMessageInQueue(Message message) throws
613             MessageServiceException {
614         try {
615             String JavaDoc reply = message.getReplyTo();
616             if (reply == null) {
617                 reply = message.getFrom();
618                 if (reply == null) {
619                     message.addError(Message.ERROR,
620                             "There is no reply for this message");
621                     initUndeliverableProcess(message);
622                     return false;
623                 }
624             }
625             
626             if (messageProcessInfo.getMessageQueue().getName().equals(
627                     reply)) {
628                 return true;
629             }
630             utw.begin();
631             if (reply.equals(MessageService.JNDI_URL) &&
632                     (message.getTargetNamedQueue() != null)) {
633                 if (false == NamedQueueManagerImpl.getInstance().
634                         checkForNamedQueue(message.getReplyNamedQueue(),false)){
635                     utw.release();
636                     message.addError(Message.ERROR,"The named queue [" +
637                             message.getReplyNamedQueue() + "] does not exist.");
638                     initUndeliverableProcess(message);
639                     return false;
640                 }
641                 messageProcessInfo.getMessageQueue().removeMessage(
642                         message.getMessageId());
643                 ((MessageManagerImpl)messageProcessInfo.getMessageManager()).
644                         assignToQueue(message.getReplyNamedQueue());
645                 NamedMemoryQueue.getInstance(
646                         message.getTargetNamedQueue()).
647                         addMessage(messageProcessInfo.getMessageManager());
648             } else {
649                 MessageQueue messageQueue = MessageQueueManager.getInstance().
650                         getQueue(reply);
651                 messageProcessInfo.getMessageQueue().removeMessage(
652                         message.getMessageId());
653                 ((MessageManagerImpl)messageProcessInfo.getMessageManager()).
654                         assignToQueue(reply);
655                 messageQueue.addMessage(messageProcessInfo.getMessageManager());
656             }
657             utw.commit();
658             return false;
659         } catch (Exception JavaDoc ex) {
660             log.error("Failed to check the reply queue : " + ex.getMessage(),ex);
661             throw new MessageServiceException(
662                     "Failed to check the reply queue : " + ex.getMessage(),ex);
663         } finally {
664             utw.release();
665         }
666     }
667     
668     
669     /**
670      * This method clones the original message so that it can be sent to all
671      * the daemons suppliung the services.
672      *
673      * @param message The message to clone.
674      * @exception MessageServiceException
675      */

676     private void cloneMessageForServices(Message message) throws
677             MessageServiceException {
678         try {
679             String JavaDoc[] services = message.getServices();
680             List JavaDoc serviceList = new ArrayList JavaDoc();
681             for (int index = 0; index < services.length; index++) {
682                 serviceList.add(services[index]);
683             }
684             ServiceBroker serviceBroker = (ServiceBroker)ConnectionManager.
685                     getInstance().getConnection(ServiceBroker.class,
686                     ServiceBroker.JNDI_URL);
687             List JavaDoc daemonList = serviceBroker.getServiceProviders(serviceList);
688             if (daemonList.size() == 0) {
689                 message.addError(Message.ERROR,
690                         "There are no daemon providing these services.");
691                 initUndeliverableProcess(message);
692                 return;
693             }
694             
695             utw.begin();
696             for (int index = 0; index < daemonList.size(); index++) {
697                 MessageImpl newMessage =
698                         (MessageImpl)((MessageImpl)message).clone();
699                 newMessage.setMessageId(RandomGuid.getInstance().getGuid());
700                 newMessage.setTarget((String JavaDoc)daemonList.get(index));
701                 newMessage.setMessageType(Message.POINT_TO_POINT);
702                 newMessage.setNextProcessDate(new Date JavaDoc());
703                 MessageManager messageManager = MessageManagerFactory.getInstance().
704                         getMessageManager(newMessage);
705                 MessageQueue messageQueue = MessageQueueManager.getInstance().
706                         getQueue(MessageQueueManager.UNSORTED);
707                 ((MessageManagerImpl)messageManager).assignToQueue(
708                         MessageQueueManager.UNSORTED);
709                 messageQueue.addMessage(messageManager);
710             }
711             messageProcessInfo.getMessageManager().remove();
712             messageProcessInfo.getMessageQueue().removeMessage(
713                     message.getMessageId());
714             utw.commit();
715         } catch (Exception JavaDoc ex) {
716             log.error("Failed to clone the messages : " + ex.getMessage(),ex);
717             throw new MessageServiceException(
718                     "Failed to clone the messages : " + ex.getMessage(),ex);
719         } finally {
720             utw.release();
721         }
722     }
723     
724     
725     /**
726      * This method delivers the message to another coadunation instance.
727      *
728      * @param message The reference to the message object.
729      * @exception MessageServiceException
730      */

731     private void deliverToParent(Message message) throws
732             MessageServiceException {
733         try {
734             if (namingDirector.isPrimary()) {
735                 message.addError(Message.ERROR,
736                         "The primary has no parent cannot go further.");
737                 initUndeliverableProcess(message);
738                 return;
739             }
740             Message messageCopy = (Message)((MessageImpl)message).clone();
741             if (message.getTarget() != null) {
742                 messageCopy.setFrom(downJNDIUrl(message.getTarget()));
743             }
744             if (message.getReplyTo() != null) {
745                 messageCopy.setReplyTo(downJNDIUrl(message.getReplyTo()));
746             }
747             if (message.getFrom() != null) {
748                 messageCopy.setFrom(downJNDIUrl(message.getFrom()));
749             }
750             
751             log.debug("Deliver message to parent : " + message.getMessageId());
752             utw.begin();
753             MessageStore messageStore = (MessageStore)ConnectionManager.
754                     getInstance().getConnection(MessageStore.class,
755                     namingDirector.getPrimaryJNDIUrl() + "/" +
756                     MessageStore.JNDI_URL);
757             messageProcessInfo.getMessageManager().remove();
758             messageProcessInfo.getMessageQueue().removeMessage(
759                     message.getMessageId());
760             log.debug("The message has been deliverd to parent committing : " +
761                     message.getMessageId());
762             IDLock.getInstance().lock(message.getMessageId());
763             messageStore.addMessage(messageCopy);
764             utw.commit();
765             log.debug("Delivered message to parent : " + message.getMessageId());
766         } catch (Exception JavaDoc ex) {
767             log.error("Failed to deliver to a the parent : " +
768                     ex.getMessage(),ex);
769             throw new MessageServiceException(
770                     "Failed to deliver to a the parent : " +
771                     ex.getMessage(),ex);
772         } finally {
773             utw.release();
774         }
775     }
776     
777     
778     /**
779      * This method delivers the message to another coadunation instance.
780      *
781      * @param target The target of the message.
782      * @param message The reference to the message object.
783      */

784     private void deliverToChild(String JavaDoc target, Message message) throws
785             MessageServiceException {
786         try {
787             Message messageCopy = (Message)((MessageImpl)message).clone();
788             if (message.getTarget() != null) {
789                 messageCopy.setTarget(upJNDIUrl(target,message.getTarget()));
790             }
791             if (message.getReplyTo() != null) {
792                 messageCopy.setReplyTo(upJNDIUrl(target,message.getReplyTo()));
793             }
794             if (message.getFrom() != null) {
795                 messageCopy.setFrom(upJNDIUrl(target,message.getFrom()));
796             }
797             
798             String JavaDoc subContextUrl = NamingConstants.SUBCONTEXT + "/";
799             if (target.contains(namingDirector.getInstanceId())) {
800                 subContextUrl = namingDirector.getInstanceId() + "/" +
801                         NamingConstants.SUBCONTEXT + "/";
802             }
803             int pos = target.indexOf(subContextUrl);
804             if (pos == -1) {
805                 message.addError(Message.ERROR,
806                         "Cannot find the sub reference information : " + target);
807                 initUndeliverableProcess(message);
808                 return;
809             }
810             String JavaDoc subContext = target.substring(pos + subContextUrl.length());
811             subContext = NamingConstants.SUBCONTEXT + "/" +
812                     subContext.substring(0,subContext.indexOf('/')) + "/" +
813                     MessageStore.JNDI_URL;
814             log.debug("Deliver message to child : " + message.getMessageId());
815             utw.begin();
816             MessageStore messageStore = (MessageStore)ConnectionManager.
817                     getInstance().getConnection(MessageStore.class,
818                     subContext);
819             messageProcessInfo.getMessageManager().remove();
820             messageProcessInfo.getMessageQueue().removeMessage(
821                     message.getMessageId());
822             IDLock.getInstance().lock(message.getMessageId());
823             messageStore.addMessage(messageCopy);
824             log.debug("The message has been deliverd to child committing : " +
825                     message.getMessageId());
826             utw.commit();
827             log.debug("Delivered message to child : " + message.getMessageId());
828         } catch (Exception JavaDoc ex) {
829             log.error("Failed to deliver to a the child : " +
830                     ex.getMessage(),ex);
831             throw new MessageServiceException(
832                     "Failed to deliver to a the child : " +
833                     ex.getMessage(),ex);
834         } finally {
835             utw.release();
836         }
837     }
838     
839     
840     /**
841      * This method delivers the message.
842      *
843      * @param message The message to deliver
844      * @exception MessageServiceException
845      */

846     private void deliverMessage(Message message) throws
847             MessageServiceException {
848         initUserSession(message);
849         try {
850             if (message instanceof RPCMessage) {
851                 deliverRPCMessage(message.getTarget(),message);
852             } else if (message instanceof TextMessage) {
853                 deliverTextMessage(message.getTarget(),message);
854             }
855         } finally {
856             releaseUserSession();
857         }
858     }
859     
860     
861     /**
862      * This method delivers the reply message.
863      *
864      * @param message The message to deliver
865      * @exception MessageServiceException
866      */

867     private void deliverReplyMessage(Message message) throws
868             MessageServiceException {
869         initUserSession(message);
870         try {
871             String JavaDoc reply = message.getReplyTo();
872             if (reply == null) {
873                 reply = message.getFrom();
874                 if (reply == null) {
875                     message.addError(Message.ERROR,
876                             "There is no reply for this message");
877                     initUndeliverableProcess(message);
878                     return;
879                 }
880             }
881             if (message instanceof RPCMessage) {
882                 deliverReplyRPCMessage(reply,message);
883             } else if (message instanceof TextMessage) {
884                 deliverReplyTextMessage(reply,message);
885             }
886         } finally {
887             releaseUserSession();
888         }
889     }
890     
891     
892     /**
893      * This method delivers the rpc message to its target.
894      *
895      * @param message The message to deliver.
896      * @exception MessageServiceException
897      */

898     private void deliverRPCMessage(String JavaDoc target, Message message) throws
899             MessageServiceException {
900         ClassLoader JavaDoc original = null;
901         try {
902             Object JavaDoc ref = null;
903             if (((ref = BeanConnector.getInstance().getBean(target)) == null) &&
904                     ((ref = JMXBeanConnector.getInstance().getJMXBean(target))
905                     == null)) {
906                 message.addError(Message.ERROR,"The target [" + target
907                         + "] does not exist.");
908                 initUndeliverableProcess(message);
909                 return;
910             }
911             original = Thread.currentThread().getContextClassLoader();
912             Thread.currentThread().setContextClassLoader(
913                     ref.getClass().getClassLoader());
914             
915             
916             RPCMessageImpl rpcMessageImpl =
917                     (RPCMessageImpl)((RPCMessageImpl)message).clone();
918             Method JavaDoc method = null;
919             try {
920                 method = ref.getClass().getMethod(rpcMessageImpl.getMethodName(),
921                         rpcMessageImpl.getArgumentTypes());
922             } catch (Exception JavaDoc ex) {
923                 log.error("Failed to find the method on the target : "
924                         + ex.getMessage(),ex);
925                 if (original != null) {
926                     Thread.currentThread().setContextClassLoader(original);
927                     original = null;
928                 }
929                 message.addError(Message.ERROR,
930                         "Failed to find the method on the target : "
931                         + ex.getMessage());
932                 initUndeliverableProcess(message);
933                 return;
934             }
935             utw.begin();
936             
937             try {
938                 Object JavaDoc result = method.invoke(ref,rpcMessageImpl.getArguments());
939                 ((RPCMessage)message).setResult(result);
940             } catch (Throwable JavaDoc ex) {
941                 log.error("Caught an exception : "
942                         + ex.getMessage(),ex);
943                 ((RPCMessage)message).setThrowable(ex);
944             }
945             if (original != null) {
946                 Thread.currentThread().setContextClassLoader(original);
947                 original = null;
948             }
949             if (message.getReply()) {
950                 log.info("Init the process to deliver to the sender : " +
951                         message.getMessageId());
952                 ((RPCMessageImpl)message).setState(Message.DELIVERED);
953                 messageProcessInfo.getMessageManager().updateMessage(message);
954                 messageProcessInfo.getMessageQueue().removeMessage(
955                         message.getMessageId());
956                 MessageQueue messageQueue = MessageQueueManager.getInstance().
957                         getQueue(MessageQueueManager.UNSORTED);
958                 ((MessageManagerImpl)messageProcessInfo.getMessageManager()).
959                         assignToQueue(MessageQueueManager.UNSORTED);
960                 messageQueue.addMessage(messageProcessInfo.getMessageManager());
961             } else {
962                 log.info("Removing the completed rpc message : " +
963                         message.getMessageId());
964                 messageProcessInfo.getMessageManager().remove();
965                 messageProcessInfo.getMessageQueue().removeMessage(
966                         message.getMessageId());
967             }
968             utw.commit();
969         } catch (Exception JavaDoc ex) {
970             log.error("Failed to deliver the RPC Message : "
971                     + ex.getMessage(),ex);
972             throw new MessageServiceException(
973                     "Failed to deliver the RPC Message : "
974                     + ex.getMessage(),ex);
975         } finally {
976             utw.release();
977             if (original != null) {
978                 Thread.currentThread().setContextClassLoader(original);
979             }
980         }
981     }
982     
983     
984     /**
985      * This method delivers the rpc reply message.
986      *
987      * @param reply The reply address for the message.
988      * @param message The message to deliver.
989      * @exception MessageServiceException
990      */

991     private void deliverReplyRPCMessage(String JavaDoc reply, Message message) throws
992             MessageServiceException {
993         ClassLoader JavaDoc original = null;
994         try {
995             Object JavaDoc ref = null;
996             if (((ref = BeanConnector.getInstance().getBean(reply)) == null) &&
997                     ((ref = JMXBeanConnector.getInstance().getJMXBean(reply))
998                     == null)) {
999                 message.addError(Message.ERROR,"The reply [" + reply
1000                        + "] does not exist.");
1001                initUndeliverableProcess(message);
1002                return;
1003            }
1004            original = Thread.currentThread().getContextClassLoader();
1005            Thread.currentThread().setContextClassLoader(
1006                    ref.getClass().getClassLoader());
1007            
1008            utw.begin();
1009            RPCMessage rpcMessage = (RPCMessage)message;
1010            try {
1011                if (rpcMessage.generatedException()) {
1012                    Method JavaDoc method = ref.getClass().getMethod("onFailure",
1013                            new Class JavaDoc[] {String JavaDoc.class,String JavaDoc.class,
1014                            Throwable JavaDoc.class});
1015                    Throwable JavaDoc ex = rpcMessage.getThrowable();
1016                    if (ex instanceof
1017                            java.lang.reflect.InvocationTargetException JavaDoc) {
1018                        ex = ((java.lang.reflect.InvocationTargetException JavaDoc)ex).
1019                                getCause();
1020                    }
1021                    method.invoke(ref,new Object JavaDoc[] {rpcMessage.getMessageId(),
1022                    rpcMessage.getCorrelationId(),ex});
1023                } else {
1024                    Method JavaDoc method = ref.getClass().getMethod("onSuccess",
1025                            new Class JavaDoc[] {String JavaDoc.class,String JavaDoc.class,
1026                            Object JavaDoc.class});
1027                    method.invoke(ref,new Object JavaDoc[] {rpcMessage.getMessageId(),
1028                    rpcMessage.getCorrelationId(),
1029                    rpcMessage.getResult()});
1030                }
1031            } catch (Exception JavaDoc ex) {
1032                log.error("Failed to deliver the message ["
1033                        + reply + "] to the AsyncCallbackHandler method : " +
1034                        ex.getMessage(),ex);
1035                utw.release();
1036                message.addError(Message.ERROR,"Failed to deliver the message ["
1037                        + reply + "] to the AsyncCallbackHandler method : " +
1038                        ex.getMessage());
1039                initUndeliverableProcess(message);
1040                return;
1041            }
1042            if (original != null) {
1043                Thread.currentThread().setContextClassLoader(original);
1044                original = null;
1045            }
1046            log.info("Removing the completed rpc message : " +
1047                    rpcMessage.getMessageId());
1048            messageProcessInfo.getMessageManager().remove();
1049            messageProcessInfo.getMessageQueue().removeMessage(
1050                    rpcMessage.getMessageId());
1051            
1052            utw.commit();
1053        } catch (Exception JavaDoc ex) {
1054            log.error("Failed to deliver the reply RPC Message : "
1055                    + ex.getMessage(),ex);
1056            throw new MessageServiceException(
1057                    "Failed to deliver the reply RPC Message : "
1058                    + ex.getMessage(),ex);
1059        } finally {
1060            utw.release();
1061            if (original != null) {
1062                Thread.currentThread().setContextClassLoader(original);
1063            }
1064        }
1065    }
1066    
1067    
1068    /**
1069     * This method delivers the text message to its target.
1070     *
1071     * @param target The target to deliver the message to.
1072     * @param message The message to deliver.
1073     * @exception MessageServiceException
1074     */

1075    private void deliverTextMessage(String JavaDoc target, Message message) throws
1076            MessageServiceException {
1077        try {
1078            MessageHandler messageHandler = (MessageHandler)ConnectionManager.
1079                    getInstance().getConnection(MessageHandler.class,
1080                    target);
1081            utw.begin();
1082            Message result = messageHandler.processMessage(message);
1083            result.incrementRetries();
1084            if (result.isAcknowledged() && result.getReply() &&
1085                    (message.getState() == Message.UNDELIVERED)) {
1086                log.info("Init the process to deliver to the sender : " +
1087                        message.getMessageId());
1088                ((MessageImpl)result).setState(Message.DELIVERED);
1089                messageProcessInfo.getMessageManager().updateMessage(result);
1090                messageProcessInfo.getMessageQueue().removeMessage(
1091                        message.getMessageId());
1092                MessageQueue messageQueue = MessageQueueManager.getInstance().
1093                        getQueue(MessageQueueManager.UNSORTED);
1094                ((MessageManagerImpl)messageProcessInfo.getMessageManager()).
1095                        assignToQueue(MessageQueueManager.UNSORTED);
1096                messageQueue.addMessage(messageProcessInfo.getMessageManager());
1097                utw.commit();
1098            } else if ((result.isAcknowledged() && !result.getReply()) ||
1099                    (result.isAcknowledged() &&
1100                    (message.getState() == Message.DELIVERED))){
1101                log.info("Removing the completed text message : " +
1102                        message.getMessageId());
1103                messageProcessInfo.getMessageManager().remove();
1104                messageProcessInfo.getMessageQueue().removeMessage(
1105                        message.getMessageId());
1106                utw.commit();
1107            } else {
1108                Date JavaDoc nextDate = new Date JavaDoc();
1109                nextDate.setTime(nextDate.getTime() + delay);
1110                ((MessageImpl)result).setNextProcessDate(nextDate);
1111                messageProcessInfo.getMessageManager().updateMessage(result);
1112                utw.commit();
1113                messageProcessInfo.getMessageQueue().pushBackMessage(
1114                        messageProcessInfo.getMessageManager());
1115            }
1116        } catch (java.lang.ClassCastException JavaDoc ex) {
1117            log.error("Failed to deliver the text message ["+
1118                    message.getMessageId()+ "], " +
1119                    "init the undeliverable process, as the target cannot be " +
1120                    "spoken to correctly : " + ex.getMessage(),ex);
1121            message.addError(Message.ERROR,"Failed to deliver the text message : "
1122                    + ex.getMessage());
1123            initUndeliverableProcess(message);
1124        } catch (com.rift.coad.util.connection.NameNotFound ex) {
1125            log.error("Failed to deliver the text message ["+
1126                    message.getMessageId()+ "], " +
1127                    "init the undeliverable process, " +
1128                    "as the target name cannot be found : "
1129                    + ex.getMessage(),ex);
1130            message.addError(Message.ERROR,"Failed to deliver the text message : "
1131                    + ex.getMessage());
1132            initUndeliverableProcess(message);
1133        } catch (Exception JavaDoc ex) {
1134            log.error("Failed to deliver the text message ["+
1135                    message.getMessageId()+ "] : " + ex.getMessage(),ex);
1136            throw new MessageServiceException(
1137                    "Failed to deliver the text message : "
1138                    + ex.getMessage(),ex);
1139        } finally {
1140            utw.release();
1141        }
1142    }
1143    
1144    
1145    /**
1146     * This method delivers the reply text message to its target.
1147     *
1148     * @param reply The reply address for the message.
1149     * @param message The message to deliver.
1150     * @exception MessageServiceException
1151     */

1152    private void deliverReplyTextMessage(String JavaDoc reply, Message message) throws
1153            MessageServiceException {
1154        try {
1155            MessageHandler messageHandler = (MessageHandler)ConnectionManager.
1156                    getInstance().getConnection(MessageHandler.class,
1157                    reply);
1158            utw.begin();
1159            Message result = messageHandler.processMessage(message);
1160            result.incrementRetries();
1161            if (result.isAcknowledged()){
1162                log.info("Removing the completed text message : " +
1163                        message.getMessageId());
1164                messageProcessInfo.getMessageManager().remove();
1165                messageProcessInfo.getMessageQueue().removeMessage(
1166                        message.getMessageId());
1167                utw.commit();
1168            } else {
1169                Date JavaDoc nextDate = new Date JavaDoc();
1170                nextDate.setTime(nextDate.getTime() + delay);
1171                ((MessageImpl)result).setNextProcessDate(nextDate);
1172                messageProcessInfo.getMessageManager().updateMessage(result);
1173                utw.commit();
1174                messageProcessInfo.getMessageQueue().pushBackMessage(
1175                        messageProcessInfo.getMessageManager());
1176            }
1177        } catch (java.lang.ClassCastException JavaDoc ex) {
1178            log.error("Failed to deliver the text message : "
1179                    + ex.getMessage(),ex);
1180            message.addError(Message.ERROR,"Failed to deliver the text message : "
1181                    + ex.getMessage());
1182            initUndeliverableProcess(message);
1183        } catch (com.rift.coad.util.connection.NameNotFound ex) {
1184            log.error("Failed to deliver the text message : "
1185                    + ex.getMessage(),ex);
1186            message.addError(Message.ERROR,"Failed to deliver the text message : "
1187                    + ex.getMessage());
1188            initUndeliverableProcess(message);
1189        } catch (Exception JavaDoc ex) {
1190            log.error("Failed to deliver the text message : "
1191                    + ex.getMessage(),ex);
1192            throw new MessageServiceException(
1193                    "Failed to deliver the text message : "
1194                    + ex.getMessage(),ex);
1195        } finally {
1196            utw.release();
1197        }
1198    }
1199    
1200    
1201    /**
1202     * This method prepends the JNDI URL base.
1203     *
1204     * @return The modified url.
1205     * @param url The url to modify.
1206     */

1207    private String JavaDoc downJNDIUrl(String JavaDoc url) throws
1208            MessageServiceException {
1209        try {
1210            String JavaDoc instanceBase = NamingConstants.SUBCONTEXT + "/"
1211                    + namingDirector.getInstanceId() + "/";
1212            if (url.indexOf(PARENT_INSTANCE) == 0) {
1213                return url.substring(PARENT_INSTANCE.length());
1214            } else if (url.contains(namingDirector.getPrimaryJNDIUrl()) ||
1215                    url.contains(namingDirector.getJNDIBase()) ||
1216                    url.contains(instanceBase) ||
1217                    (url.indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0)) {
1218                return url;
1219            } else if (!url.contains(namingDirector.getInstanceId())) {
1220                return instanceBase + url;
1221            } else {
1222                return NamingConstants.SUBCONTEXT + "/" + url;
1223            }
1224        } catch (Exception JavaDoc ex) {
1225            log.error("Failed to move down the url : "
1226                    + ex.getMessage(),ex);
1227            throw new MessageServiceException("Failed to move down the url : "
1228                    + ex.getMessage(),ex);
1229        }
1230    }
1231    
1232    
1233    /**
1234     * This method prepends the JNDI URL base.
1235     *
1236     * @return The modified url.
1237     * @param url The url to modify.
1238     */

1239    private String JavaDoc upJNDIUrl(String JavaDoc target,String JavaDoc url) throws
1240            MessageServiceException {
1241        try {
1242            String JavaDoc updatedURL = url;
1243            String JavaDoc instanceBase = namingDirector.getInstanceId() + "/" +
1244                    NamingConstants.SUBCONTEXT + "/";
1245            String JavaDoc subContextUrl = NamingConstants.SUBCONTEXT + "/";
1246            int pos = url.indexOf(subContextUrl);
1247            if (url.contains(instanceBase)) {
1248                updatedURL = url.substring(url.indexOf(instanceBase) +
1249                        instanceBase.length());
1250                pos = updatedURL.indexOf(subContextUrl);
1251                if (url.equals(target) && (pos == 0)) {
1252                    updatedURL = updatedURL.substring(updatedURL.indexOf("/",
1253                            pos + subContextUrl.length()) + 1);
1254                }
1255            } else if (url.equals(target) && (pos == 0)) {
1256                updatedURL = url.substring(url.indexOf("/",
1257                        pos + subContextUrl.length()) + 1);
1258            } else {
1259                updatedURL = PARENT_INSTANCE + url;
1260            }
1261            return updatedURL;
1262        } catch (Exception JavaDoc ex) {
1263            log.error("Failed to modify the url to set it relative to the next " +
1264                    "coadunation intance : " + ex.getMessage(),ex);
1265            throw new MessageServiceException("Failed to modify the url to " +
1266                    "set it relative to the next coadunation intance : " +
1267                    ex.getMessage(),ex);
1268        }
1269    }
1270    
1271    
1272    /**
1273     * The message has been deemed undeliverable for some reason.
1274     *
1275     * @param message The message set as undeliverable
1276     * @excption MessageServiceException
1277     */

1278    private void initUndeliverableProcess(Message message) throws
1279            MessageServiceException {
1280        try {
1281            int currentState = message.getState();
1282            utw.begin();
1283            ((MessageImpl)message).setState(Message.UNDELIVERABLE);
1284            messageProcessInfo.getMessageManager().updateMessage(message);
1285            utw.commit();
1286            utw.release();
1287            if (currentState == Message.DELIVERED) {
1288                processUndeliverable(message);
1289            } else if (messageProcessInfo.getMessageQueue().getName().equals(
1290                    MessageQueueManager.UNSORTED)) {
1291                messageProcessInfo.getMessageQueue().pushBackMessage(
1292                        messageProcessInfo.getMessageManager());
1293            } else {
1294                utw.begin();
1295                messageProcessInfo.getMessageQueue().removeMessage(
1296                        message.getMessageId());
1297                MessageQueue messageQueue = MessageQueueManager.getInstance().
1298                        getQueue(MessageQueueManager.UNSORTED);
1299                ((MessageManagerImpl)messageProcessInfo.getMessageManager()).
1300                        assignToQueue(MessageQueueManager.UNSORTED);
1301                messageQueue.addMessage(messageProcessInfo.getMessageManager());
1302                utw.commit();
1303            }
1304            
1305        } catch (Exception JavaDoc ex) {
1306            log.error("Failed to init the undeliverable process :" +
1307                    ex.getMessage(),ex);
1308            throw new MessageServiceException("Failed to init the " +
1309                    "undeliverable process :" + ex.getMessage(),ex);
1310        } finally {
1311            utw.release();
1312        }
1313    }
1314    
1315    
1316    /**
1317     * This method is responsible for initializing the user session.
1318     *
1319     * @param message The message containing the user session information.
1320     * @exception MessageServiceException
1321     */

1322    private void initUserSession(Message message) throws MessageServiceException {
1323        try {
1324            Session session = new Session(message.getMessageCreater(),
1325                    message.getSessionId(),
1326                    new HashSet JavaDoc(message.getMessagePrincipals()));
1327            getServerInterceptor().createSession(session);
1328        } catch (Exception JavaDoc ex) {
1329            log.error("Failed to setup the user session : " +
1330                    ex.getMessage(),ex);
1331            throw new MessageServiceException(
1332                    "Failed to setup the user session :" + ex.getMessage(),ex);
1333        }
1334    }
1335    
1336    
1337    /**
1338     * This method is responsible for initializing the user session.
1339     */

1340    private void releaseUserSession() {
1341        try {
1342            getServerInterceptor().release();
1343        } catch (Exception JavaDoc ex) {
1344            log.error("Failed to release the user session : " +
1345                    ex.getMessage(),ex);
1346        }
1347    }
1348}
1349
Popular Tags