KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > rift > coad > daemon > messageservice > message > MessageManagerImpl


1 /*
2  * MessageService: The message service daemon
3  * Copyright (C) 2006 Rift IT Contracting
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * MessageManagerImpl.java
20  */

21
22 // the package path
23
package com.rift.coad.daemon.messageservice.message;
24
25 // java imports
26
import java.sql.Timestamp JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.Date JavaDoc;
29 import java.util.Enumeration JavaDoc;
30 import java.util.HashSet JavaDoc;
31 import java.util.Iterator JavaDoc;
32 import java.util.List JavaDoc;
33 import java.util.Set JavaDoc;
34 import javax.transaction.xa.XAException JavaDoc;
35 import javax.transaction.xa.XAResource JavaDoc;
36 import javax.transaction.xa.Xid JavaDoc;
37
38 // hibernate imports
39
import org.hibernate.*;
40 import org.hibernate.cfg.*;
41
42 // logging import
43
import org.apache.log4j.Logger;
44
45 // coadunation imports
46
import com.rift.coad.daemon.messageservice.Message;
47 import com.rift.coad.daemon.messageservice.MessageServiceImpl;
48 import com.rift.coad.daemon.messageservice.RPCMessage;
49 import com.rift.coad.daemon.messageservice.TextMessage;
50 import com.rift.coad.daemon.messageservice.MessageManager;
51 import com.rift.coad.daemon.messageservice.MessageServiceException;
52 import com.rift.coad.daemon.messageservice.InvalidProperty;
53 import com.rift.coad.daemon.messageservice.db.*;
54 import com.rift.coad.daemon.messageservice.named.NamedQueueManagerImpl;
55 import com.rift.coad.hibernate.util.HibernateUtil;
56 import com.rift.coad.lib.common.ObjectSerializer;
57 import com.rift.coad.util.transaction.TransactionManager;
58 import com.rift.coad.util.change.Change;
59 import com.rift.coad.util.change.ChangeException;
60 import com.rift.coad.util.change.ChangeLog;
61
62
63 /**
64  * This object implements the message manager interface.
65  *
66  * @author Brett Chaldecott
67  */

68 public class MessageManagerImpl implements MessageManager {
69     
70     
71     /**
72      * This object represents an add message change
73      */

74     public static class AddMessageChange implements Change {
75         // private member variables
76
private MessageImpl newMessage = null;
77         
78         /**
79          * The constructor of the add message change object.
80          *
81          * @param message The message that represents this change.
82          * @exception MessageServiceException
83          */

84         public AddMessageChange(MessageImpl newMessage) throws
85                 MessageServiceException {
86             try {
87                 this.newMessage = (MessageImpl)newMessage.clone();
88             } catch (Exception JavaDoc ex) {
89                 log.error("Failed to create a new add message change : " +
90                         ex.getMessage(),ex);
91                 throw new MessageServiceException(
92                         "Failed to create a new add message change : " +
93                         ex.getMessage(),ex);
94             }
95         }
96         
97         /**
98          * The definition of the apply method.
99          */

100         public void applyChanges() throws ChangeException {
101             try {
102                 //MessageTransactionLock.getInstance().lock();
103
Session session = HibernateUtil.getInstance(
104                         MessageServiceImpl.class).getSession();
105                 
106                 
107                 // set the reply flag
108
int reply = 0;
109                 if (newMessage.getReply()) {
110                     reply = 1;
111                 }
112                 int acknowledged = 0;
113                 if (newMessage.isAcknowledged()) {
114                     acknowledged = 1;
115                 }
116                 
117                 // check the type of messge
118
int messageType = TEXT_MESSAGE;
119                 if (newMessage instanceof RPCMessage) {
120                     messageType = RPC_MESSAGE;
121                 }
122                 
123                 // instanciate the basic message
124
com.rift.coad.daemon.messageservice.db.Message message =
125                         new com.rift.coad.daemon.messageservice.db.Message(
126                         newMessage.getMessageId(),newMessage.getMessageCreater(),
127                         newMessage.getSessionId(),messageType,
128                         newMessage.getMessageType(),newMessage.getPriority(),
129                         reply, newMessage.getFrom(), acknowledged, 0,
130                         newMessage.getRetries());
131                 if ((newMessage.getTarget() != null) &&
132                         (newMessage.getTarget().length() != 0 )) {
133                     message.setTarget(newMessage.getTarget());
134                 }
135                 if (newMessage.getFrom() == null) {
136                     throw new InvalidProperty("The from address must be set");
137                 }
138                 message.setFromUrl(newMessage.getFrom());
139                 if ((newMessage.getReplyTo() != null) &&
140                         (newMessage.getReplyTo().length() != 0)) {
141                     message.setReplyUrl(newMessage.getReplyTo());
142                 }
143                 if ((newMessage.getTargetNamedQueue() != null) &&
144                         (newMessage.getTargetNamedQueue().length() != 0)) {
145                     message.setTargetNamedQueue(newMessage.getTargetNamedQueue());
146                 }
147                 if ((newMessage.getReplyNamedQueue() != null) &&
148                         (newMessage.getReplyNamedQueue().length() != 0)) {
149                     message.setReplyNamedQueue(newMessage.getReplyNamedQueue());
150                 }
151                 if ((newMessage.getCorrelationId() != null) &&
152                         (newMessage.getCorrelationId().length() != 0)) {
153                     message.setCorrelationId(newMessage.getCorrelationId());
154                 }
155                 message.setCreated(new Timestamp JavaDoc(newMessage.getCreated().
156                         getTime()));
157                 message.setProcessed(new Timestamp JavaDoc(newMessage.getProcessedDate().
158                         getTime()));
159                 message.setNextProcess(new Timestamp JavaDoc(
160                         ((MessageImpl)newMessage).getNextProcessDate().getTime()));
161                 message.setMessageState(newMessage.getState());
162                 session.persist(message);
163                 
164                 if (newMessage instanceof RPCMessage) {
165                     RPCMessage rpcMessage = (RPCMessage)newMessage;
166                     MessageRpcBody rpcBody = new MessageRpcBody();
167                     rpcBody.setMessage(message);
168                     rpcBody.setMessageId(message.getId());
169                     rpcBody.setXml(rpcMessage.getMethodBodyXML());
170                     if (rpcMessage.generatedException()) {
171                         rpcBody.setExceptionValue(
172                                 ((RPCMessageImpl)rpcMessage).getThrowableBytes().
173                                 clone());
174                     }
175                     if (rpcMessage.getResult() != null) {
176                         rpcBody.setResultValue(
177                                 ((RPCMessageImpl)rpcMessage).getResultBytes().
178                                 clone());
179                     }
180                     session.persist(rpcBody);
181                 } else if (newMessage instanceof TextMessage) {
182                     TextMessage textMessage = (TextMessage)newMessage;
183                     MessageTxtBody txtBody = new MessageTxtBody();
184                     txtBody.setMessage(message);
185                     txtBody.setMessageId(message.getId());
186                     txtBody.setBody(textMessage.getTextBody());
187                     session.persist(txtBody);
188                 } else {
189                     log.error("The message type [" + newMessage.getClass().getName()
190                     + "] is not recognised.");
191                     throw new MessageServiceException("The message type [" +
192                             newMessage.getClass().getName() +
193                             "] is not recognised.");
194                 }
195                 
196                 // the message information
197
if (newMessage.getServices() != null) {
198                     String JavaDoc[] services = newMessage.getServices();
199                     for (int index = 0; index < services.length; index++) {
200                         MessageService messageServices = new MessageService();
201                         messageServices.setMessage(message);
202                         messageServices.setService(services[index]);
203                         session.persist(messageServices);
204                     }
205                 }
206                 
207                 // the message properties
208
for (Enumeration JavaDoc enumerat = newMessage.getPropertyNames();
209                 enumerat.hasMoreElements();) {
210                     String JavaDoc key = (String JavaDoc)enumerat.nextElement();
211                     Object JavaDoc value = newMessage.getPropertyValue(key);
212                     MessageProperty property = new MessageProperty();
213                     property.setMessage(message);
214                     property.setName(key);
215                     if (value instanceof Boolean JavaDoc) {
216                         if (((Boolean JavaDoc)value).booleanValue()) {
217                             property.setBoolValue(new Integer JavaDoc(1));
218                         } else {
219                             property.setBoolValue(new Integer JavaDoc(0));
220                         }
221                     } else if (value instanceof Byte JavaDoc) {
222                         property.setByteValue(new Integer JavaDoc(((Byte JavaDoc)value).intValue()));
223                     } else if (value instanceof Integer JavaDoc) {
224                         property.setIntValue((Integer JavaDoc)value);
225                     } else if (value instanceof Long JavaDoc) {
226                         property.setLongValue((Long JavaDoc)value);
227                     } else if (value instanceof Double JavaDoc) {
228                         property.setDoubleValue((Double JavaDoc)value);
229                     } else if (value instanceof Float JavaDoc) {
230                         property.setFloatValue((Float JavaDoc)value);
231                     } else if (value instanceof String JavaDoc) {
232                         property.setStringValue((String JavaDoc)value);
233                     } else if (value instanceof byte[]) {
234                         property.setObjectValue((byte[])value);
235                     }
236                     session.persist(property);
237                 }
238                 
239                 // the message information
240
if (newMessage.getMessagePrincipals() == null) {
241                     throw new InvalidProperty("Must supply principals");
242                 }
243                 List JavaDoc principals = newMessage.getMessagePrincipals();
244                 for (Iterator JavaDoc iter = principals.iterator(); iter.hasNext();) {
245                     MessagePrincipal principal = new MessagePrincipal();
246                     principal.setMessage(message);
247                     String JavaDoc principalStr = (String JavaDoc)iter.next();
248                     //log.info("Add principal : " + principalStr);
249
principal.setPrincipalValue(principalStr);
250                     session.persist(principal);
251                 }
252                 List JavaDoc errors = newMessage.getErrors();
253                 for (Iterator JavaDoc iter = errors.iterator(); iter.hasNext();) {
254                     com.rift.coad.daemon.messageservice.MessageError messageError =
255                             (com.rift.coad.daemon.messageservice.MessageError)
256                             iter.next();
257                     com.rift.coad.daemon.messageservice.db.MessageError
258                             dbMessageError = new
259                             com.rift.coad.daemon.messageservice.db.MessageError();
260                     dbMessageError.setMessage(message);
261                     dbMessageError.setErrorDate(new java.sql.Timestamp JavaDoc(
262                             messageError.getErrorDate().getTime()));
263                     dbMessageError.setErrorLevel(messageError.getLevel());
264                     dbMessageError.setMsg(messageError.getMSG());
265                     session.persist(dbMessageError);
266                 }
267                 
268             } catch (Exception JavaDoc ex) {
269                 log.error("Failed to apply the changes : " + ex.getMessage(),ex);
270                 throw new ChangeException(
271                         "Failed to apply the changes : " + ex.getMessage(),ex);
272             }
273             
274         }
275     }
276     
277     /**
278      * This object represents queue assignment
279      */

280     public static class AssignMessageToQueueChange implements Change {
281         // private member variables
282
private String JavaDoc messageId = null;
283         private String JavaDoc queueName = null;
284         
285         /**
286          * The constructor of the assign message to queue object.
287          */

288         public AssignMessageToQueueChange(String JavaDoc messageId,String JavaDoc queueName) {
289             this.messageId = new String JavaDoc(messageId);
290             this.queueName = new String JavaDoc(queueName);
291         }
292         
293         
294         /**
295          * The definition of the apply method.
296          */

297         public void applyChanges() throws ChangeException {
298             try {
299                 //MessageTransactionLock.getInstance().lock();
300
Session session = HibernateUtil.getInstance(
301                         MessageServiceImpl.class).getSession();
302                 List JavaDoc entries = session.createQuery(
303                         "FROM MessageQueue as queue WHERE queue.messageQueueName = ?").
304                         setString(0,queueName).list();
305                 
306                 if (entries.size() != 1) {
307                     log.error("There is no queue by the name of : " +
308                             queueName);
309                     throw new MessageServiceException(
310                             "There is no queue by the name of : " +
311                             queueName);
312                 }
313                 com.rift.coad.daemon.messageservice.db.MessageQueue messageQueue =
314                         (com.rift.coad.daemon.messageservice.db.MessageQueue)
315                         entries.get(0);
316                 com.rift.coad.daemon.messageservice.db.Message message =
317                         (com.rift.coad.daemon.messageservice.db.Message)session.
318                         get(com.rift.coad.daemon.messageservice.db.Message.class,
319                         messageId);
320                 message.setMessageQueue(messageQueue);
321             } catch (Exception JavaDoc ex) {
322                 log.error("Failed to apply the changes : " + ex.getMessage(),ex);
323                 throw new ChangeException(
324                         "Failed to apply the changes : " + ex.getMessage(),ex);
325             }
326         }
327     }
328     
329     
330     /**
331      * This object represents an update message change
332      */

333     public static class UpdateMessageChange implements Change {
334         // private member variables
335
private MessageImpl updatedMessage = null;
336         
337         /**
338          * The constructor of the update message change object.
339          *
340          * @param updatedMessage The updated message object.
341          * @exception MessageServiceException
342          */

343         public UpdateMessageChange(MessageImpl updatedMessage) throws
344                 MessageServiceException {
345             try {
346                 this.updatedMessage = (MessageImpl)updatedMessage.clone();
347             } catch (Exception JavaDoc ex) {
348                 log.error("Failed to clone the updated message : " +
349                         ex.getMessage(),ex);
350                 throw new MessageServiceException(
351                         "Failed to clone the updated message : " +
352                         ex.getMessage(),ex);
353             }
354         }
355         
356         
357         /**
358          * The definition of the apply method.
359          */

360         public void applyChanges() throws ChangeException {
361             try {
362                 //MessageTransactionLock.getInstance().lock();
363
Session session = HibernateUtil.getInstance(
364                         MessageServiceImpl.class).getSession();
365                 
366                 com.rift.coad.daemon.messageservice.db.Message message =
367                         (com.rift.coad.daemon.messageservice.db.Message)session.
368                         get(com.rift.coad.daemon.messageservice.db.Message.class,
369                         updatedMessage.getMessageId());
370                 
371                 if (!(updatedMessage instanceof MessageImpl)) {
372                     throw new MessageServiceException(
373                             "The incorrect message object has been passed " +
374                             "into update");
375                 }
376                 MessageImpl messageImpl = (MessageImpl)updatedMessage;
377                 if ((updatedMessage.getTarget() != null) &&
378                         (updatedMessage.getTarget().length() != 0 )) {
379                     message.setTarget(updatedMessage.getTarget());
380                 }
381                 if (updatedMessage.getFrom() == null) {
382                     throw new InvalidProperty("The from address must be set");
383                 }
384                 message.setFromUrl(updatedMessage.getFrom());
385                 if ((updatedMessage.getReplyTo() != null) &&
386                         (updatedMessage.getReplyTo().length() != 0)) {
387                     message.setReplyUrl(updatedMessage.getReplyTo());
388                 }
389                 if ((updatedMessage.getTargetNamedQueue() != null) &&
390                         (updatedMessage.getTargetNamedQueue().length() != 0)) {
391                     message.setTargetNamedQueue(updatedMessage.
392                             getTargetNamedQueue());
393                 }
394                 if ((updatedMessage.getReplyNamedQueue() != null) &&
395                         (updatedMessage.getReplyNamedQueue().length() != 0)) {
396                     message.setReplyNamedQueue(updatedMessage.
397                             getReplyNamedQueue());
398                 }
399                 if ((updatedMessage.getCorrelationId() != null) &&
400                         (updatedMessage.getCorrelationId().length() != 0)) {
401                     message.setCorrelationId(updatedMessage.getCorrelationId());
402                 }
403                 message.setCreated(new Timestamp JavaDoc(updatedMessage.getCreated().
404                         getTime()));
405                 message.setProcessed(new Timestamp JavaDoc(updatedMessage.getProcessedDate().
406                         getTime()));
407                 message.setNextProcess(new Timestamp JavaDoc(
408                         ((MessageImpl)updatedMessage).getNextProcessDate().getTime()));
409                 message.setMessageState(updatedMessage.getState());
410                 message.setRetries(updatedMessage.getRetries());
411                 message.setMessageRoutingType(updatedMessage.getMessageType());
412                 message.setPriority(updatedMessage.getPriority());
413                 if (messageImpl.isAcknowledged()) {
414                     message.setAcknowledged(1);
415                 } else {
416                     message.setAcknowledged(0);
417                 }
418                 
419                 
420                 // the message properties
421
session.createQuery(
422                         "DELETE FROM MessageProperty as property WHERE " +
423                         "property.message.id = ?").
424                         setString(0,updatedMessage.getMessageId()).
425                         executeUpdate();
426                 //message.getMessageProperties().clear();
427
for (Enumeration JavaDoc enumerat = updatedMessage.getPropertyNames();
428                 enumerat.hasMoreElements();) {
429                     String JavaDoc key = (String JavaDoc)enumerat.nextElement();
430                     Object JavaDoc value = updatedMessage.getPropertyValue(key);
431                     MessageProperty property = new MessageProperty();
432                     property.setMessage(message);
433                     property.setName(key);
434                     if (value instanceof Boolean JavaDoc) {
435                         if (((Boolean JavaDoc)value).booleanValue()) {
436                             property.setBoolValue(new Integer JavaDoc(1));
437                         } else {
438                             property.setBoolValue(new Integer JavaDoc(0));
439                         }
440                     } else if (value instanceof Byte JavaDoc) {
441                         property.setByteValue(new Integer JavaDoc(((Byte JavaDoc)value).intValue()));
442                     } else if (value instanceof Integer JavaDoc) {
443                         property.setIntValue((Integer JavaDoc)value);
444                     } else if (value instanceof Long JavaDoc) {
445                         property.setLongValue((Long JavaDoc)value);
446                     } else if (value instanceof Double JavaDoc) {
447                         property.setDoubleValue((Double JavaDoc)value);
448                     } else if (value instanceof Float JavaDoc) {
449                         property.setFloatValue((Float JavaDoc)value);
450                     } else if (value instanceof String JavaDoc) {
451                         property.setStringValue((String JavaDoc)value);
452                     } else if (value instanceof byte[]) {
453                         property.setObjectValue((byte[])value);
454                     }
455                     session.persist(property);
456                 }
457                 
458                 // the message properties
459
session.createQuery(
460                         "DELETE FROM MessageError as error WHERE " +
461                         "error.message.id = ?").
462                         setString(0,updatedMessage.getMessageId()).
463                         executeUpdate();
464                 List JavaDoc errors = updatedMessage.getErrors();
465                 for (Iterator JavaDoc iter = errors.iterator(); iter.hasNext();) {
466                     com.rift.coad.daemon.messageservice.MessageError messageError =
467                             (com.rift.coad.daemon.messageservice.MessageError)
468                             iter.next();
469                     com.rift.coad.daemon.messageservice.db.MessageError
470                             dbMessageError = new
471                             com.rift.coad.daemon.messageservice.db.MessageError();
472                     dbMessageError.setMessage(message);
473                     dbMessageError.setErrorDate(new java.sql.Timestamp JavaDoc(
474                             messageError.getErrorDate().getTime()));
475                     dbMessageError.setErrorLevel(messageError.getLevel());
476                     dbMessageError.setMsg(messageError.getMSG());
477                     session.persist(dbMessageError);
478                 }
479                 
480                 
481                 if (updatedMessage instanceof RPCMessage) {
482                     RPCMessage rpcMessage = (RPCMessage)updatedMessage;
483                     MessageRpcBody rpcBody = (MessageRpcBody)session.get(
484                             MessageRpcBody.class,message.getId());
485                     if (rpcMessage.generatedException()) {
486                         rpcBody.setExceptionValue(
487                                 ((RPCMessageImpl)rpcMessage).getThrowableBytes().
488                                 clone());
489                     }
490                     if (((RPCMessageImpl)rpcMessage).getResultBytes() != null) {
491                         rpcBody.setResultValue(
492                                 ((RPCMessageImpl)rpcMessage).getResultBytes().
493                                 clone());
494                     }
495                 } else if (updatedMessage instanceof TextMessage) {
496                     TextMessage textMessage = (TextMessage)updatedMessage;
497                     MessageTxtBody txtBody = (MessageTxtBody)session.get(
498                             MessageTxtBody.class,message.getId());
499                     txtBody.setBody(textMessage.getTextBody());
500                 } else {
501                     log.error("The message type [" + updatedMessage.getClass().getName()
502                     + "] is not recognised.");
503                     throw new ChangeException("The message type [" +
504                             updatedMessage.getClass().getName() +
505                             "] is not recognised.");
506                 }
507                 
508             } catch (ChangeException ex) {
509                 throw ex;
510             } catch (Exception JavaDoc ex) {
511                 log.error("Failed to update the message because : " +
512                         ex.getMessage(),ex);
513                 throw new ChangeException(
514                         "Failed to update the message because : " +
515                         ex.getMessage(),ex);
516             }
517         }
518     }
519     
520     
521     /**
522      * This object represents a remove message change
523      */

524     public static class RemoveMessageChange implements Change {
525         // private member variables
526
private String JavaDoc messageId = null;
527         
528         /**
529          * The constructor of the remove message change object.
530          *
531          * @param messageId The message id for this object.
532          */

533         public RemoveMessageChange(String JavaDoc messageId) {
534             this.messageId = new String JavaDoc(messageId);
535         }
536         
537         
538         /**
539          * The definition of the apply method.
540          */

541         public void applyChanges() throws ChangeException {
542             try {
543                 //MessageTransactionLock.getInstance().lock();
544
Session session = HibernateUtil.getInstance(
545                         MessageServiceImpl.class).getSession();
546                 session.createQuery(
547                         "DELETE FROM MessageRpcBody as body WHERE " +
548                         "body.message.id = ?").
549                         setString(0,messageId).executeUpdate();
550                 session.createQuery(
551                         "DELETE FROM MessageTxtBody as body WHERE " +
552                         "body.message.id = ?").
553                         setString(0,messageId).executeUpdate();
554                 session.createQuery(
555                         "DELETE FROM MessageService as service WHERE " +
556                         "service.message.id = ?").
557                         setString(0,messageId).executeUpdate();
558                 session.createQuery(
559                         "DELETE FROM MessageProperty as property WHERE " +
560                         "property.message.id = ?").
561                         setString(0,messageId).executeUpdate();
562                 session.createQuery(
563                         "DELETE FROM MessagePrincipal as principal WHERE " +
564                         "principal.message.id = ?").
565                         setString(0,messageId).executeUpdate();
566                 session.createQuery(
567                         "DELETE FROM MessageError as error WHERE " +
568                         "error.message.id = ?").
569                         setString(0,messageId).executeUpdate();
570                 session.createQuery("DELETE FROM Message as msg WHERE msg.id = ?").
571                         setString(0,messageId).executeUpdate();
572             } catch (Exception JavaDoc ex) {
573                 log.error("Failed to failed to remove the message from the db : " +
574                         ex.getMessage(),ex);
575                 throw new ChangeException(
576                         "Failed to failed to remove the message from the db : " +
577                         ex.getMessage(),ex);
578             }
579         }
580     }
581     
582     // class constants
583
public final static int TEXT_MESSAGE = 1;
584     public final static int RPC_MESSAGE = 2;
585     
586     // the logger reference
587
protected static Logger log =
588             Logger.getLogger(MessageManagerImpl.class.getName());
589     
590     // private member variable
591
private String JavaDoc id = null;
592     private Date JavaDoc nextProcessTime = null;
593     private String JavaDoc messageQueueName = null;
594     private MessageImpl masterMessageImpl = null;
595     
596     // transaction variables
597
private Date JavaDoc originalNextProcessTime = null;
598     private String JavaDoc originalMessageQueueName = null;
599     private MessageImpl originalMessageImpl = null;
600     
601     /**
602      * Creates a new instance of MessageManagerImpl
603      *
604      * @param id The id of the message.
605      */

606     public MessageManagerImpl(String JavaDoc id) throws MessageServiceException {
607         this.id = id;
608         originalMessageImpl = masterMessageImpl = loadMessage();
609     }
610     
611     
612     /**
613      * Creates a new instance of MessageManagerImpl
614      *
615      * @param message The new message to create.
616      */

617     public MessageManagerImpl(Message newMessage) throws MessageServiceException {
618         try {
619             this.id = newMessage.getMessageId();
620             nextProcessTime = ((MessageImpl)newMessage).
621                     getNextProcessDate();
622             originalNextProcessTime = ((MessageImpl)newMessage).
623                     getNextProcessDate();
624             originalMessageImpl = (MessageImpl)newMessage;
625             masterMessageImpl = (MessageImpl)newMessage;
626             ChangeLog.getInstance().addChange(new AddMessageChange(
627                     (MessageImpl)newMessage));
628             TransactionManager.getInstance().bindResource(this,true);
629         } catch (MessageServiceException ex) {
630             throw ex;
631         } catch (Exception JavaDoc ex) {
632             log.error("Failed to create the message " +
633                     "from the database : " + ex.getMessage(),ex);
634             throw new MessageServiceException("Failed to create the message " +
635                     "in the database : " + ex.getMessage(),ex);
636         }
637     }
638     
639     
640     /**
641      * This method returns the id of this messsage.
642      *
643      * @return The id of the message this object is managing.
644      */

645     public String JavaDoc getID() {
646         return id;
647     }
648     
649     
650     /**
651      * This method returns the message object.
652      *
653      * @return The message object.
654      * @exception MessageServiceException
655      */

656     public Message getMessage() throws MessageServiceException {
657         return masterMessageImpl;
658     }
659     
660     
661     /**
662      * This method returns the message object.
663      *
664      * @return The message object.
665      * @exception MessageServiceException
666      */

667     public void assignToQueue(String JavaDoc queueName) throws MessageServiceException {
668         try {
669             TransactionManager.getInstance().bindResource(this,true);
670             ChangeLog.getInstance().addChange(new AssignMessageToQueueChange(
671                     this.id,queueName));
672             this.messageQueueName = queueName;
673         } catch (Exception JavaDoc ex) {
674             log.error("Failed to assign this object to a queue because : " +
675                     ex.getMessage(),ex);
676             throw new MessageServiceException(
677                     "Failed to assign this object to a queue because : " +
678                     ex.getMessage(),ex);
679         }
680     }
681     
682     
683     /**
684      * This method updates the message object.
685      *
686      * @param updatedMessage The updated message object.
687      * @exception MessageServiceException
688      */

689     public void updateMessage(Message updatedMessage) throws
690             MessageServiceException {
691         try {
692             TransactionManager.getInstance().bindResource(this,true);
693             ChangeLog.getInstance().addChange(new UpdateMessageChange(
694                     (MessageImpl)updatedMessage));
695             nextProcessTime = ((MessageImpl)updatedMessage).
696                     getNextProcessDate();
697             masterMessageImpl = (MessageImpl)updatedMessage;
698         } catch (MessageServiceException ex) {
699             throw ex;
700         } catch (Exception JavaDoc ex) {
701             log.error("Failed to update the message because : " +
702                     ex.getMessage(),ex);
703             throw new MessageServiceException(
704                     "Failed to update the message because : " +
705                     ex.getMessage(),ex);
706         }
707     }
708     
709     
710     /**
711      * This method is responsible from removing this message from the db.
712      *
713      * @exception MessageServiceException
714      */

715     public void remove() throws MessageServiceException {
716         try {
717             TransactionManager.getInstance().bindResource(this,true);
718             ChangeLog.getInstance().addChange(new RemoveMessageChange(
719                     this.id));
720         } catch (Exception JavaDoc ex) {
721             log.error("Failed to failed to remove the message : " +
722                     ex.getMessage(),ex);
723             throw new MessageServiceException(
724                     "Failed to failed to remove the message : " +
725                     ex.getMessage(),ex);
726         }
727     }
728     
729     
730     /**
731      * This method returns the next process time for this message.
732      *
733      * @return The date message.
734      * @exception MessageServiceException
735      */

736     public Date JavaDoc nextProcessTime() {
737         return nextProcessTime;
738     }
739     
740     
741     /**
742      * This message returns the priority.
743      */

744     public int getPriority() {
745         return this.masterMessageImpl.getPriority();
746     }
747     
748     
749     /**
750      * This method returns the name of the messaqe queue to which this message
751      * is assigned.
752      *
753      * @return The name of the message queue that this message is assigned to.
754      */

755     public String JavaDoc getMessageQueueName() {
756         return this.messageQueueName;
757     }
758     
759     
760     /**
761      * This method is called to commit the specified transaction.
762      *
763      * @param xid The id of the transaction to commit.
764      * @param onePhase If true a one phase commit should be used.
765      * @exception XAException
766      */

767     public synchronized void commit(Xid JavaDoc xid, boolean onePhase) throws
768             XAException JavaDoc {
769         if (nextProcessTime != null) {
770             this.originalNextProcessTime = nextProcessTime;
771         }
772         if (this.messageQueueName != null) {
773             this.originalMessageQueueName = messageQueueName;
774         }
775         if (this.masterMessageImpl != null) {
776             this.originalMessageImpl = masterMessageImpl;
777         }
778     }
779     
780     
781     /**
782      * The resource manager has dissociated this object from the transaction.
783      *
784      * @param xid The id of the transaction that is getting ended.
785      * @param flags The flags associated with this operation.
786      * @exception XAException
787      */

788     public void end(Xid JavaDoc xid, int flags) throws XAException JavaDoc {
789     }
790     
791     
792     /**
793      * The transaction has been completed and must be forgotten.
794      *
795      * @param xid The id of the transaction to forget.
796      * @exception XAException
797      */

798     public void forget(Xid JavaDoc xid) throws XAException JavaDoc {
799         if (nextProcessTime != null) {
800             this.originalNextProcessTime = nextProcessTime;
801         }
802         if (this.messageQueueName != null) {
803             this.originalMessageQueueName = messageQueueName;
804         }
805         if (this.masterMessageImpl != null) {
806             this.originalMessageImpl = masterMessageImpl;
807         }
808     }
809     
810     
811     /**
812      * This method returns the transaction timeout for this object.
813      *
814      * @return The int containing the transaction timeout.
815      * @exception XAException
816      */

817     public int getTransactionTimeout() throws XAException JavaDoc {
818         return -1;
819     }
820     
821     
822     /**
823      * This method returns true if this object is the resource manager getting
824      * queried.
825      *
826      * @return TRUE if this is the resource manager, FALSE if not.
827      * @param xaResource The resource to perform the check against.
828      * @exception XAException
829      */

830     public boolean isSameRM(XAResource JavaDoc xAResource) throws XAException JavaDoc {
831         return this == xAResource;
832     }
833     
834     
835     /**
836      * This is called before a transaction is committed.
837      *
838      * @return The results of the transaction.
839      * @param xid The id of the transaction to check against.
840      * @exception XAException
841      */

842     public int prepare(Xid JavaDoc xid) throws XAException JavaDoc {
843         return XAResource.XA_OK;
844     }
845     
846     
847     /**
848      * This method returns the list of transaction branches for this resource
849      * manager.
850      *
851      * @return The list of resource branches.
852      * @param flags The flags
853      * @exception XAException
854      */

855     public Xid JavaDoc[] recover(int flags) throws XAException JavaDoc {
856         return null;
857     }
858     
859     
860     /**
861      * This method is called to roll back the specified transaction.
862      *
863      * @param xid The id of the transaction to roll back.
864      * @exception XAException
865      */

866     public void rollback(Xid JavaDoc xid) throws XAException JavaDoc {
867         nextProcessTime = originalNextProcessTime;
868         messageQueueName = originalMessageQueueName;
869         masterMessageImpl = originalMessageImpl;
870     }
871     
872     
873     /**
874      * This method sets the transaction timeout for this resource manager.
875      *
876      * @return TRUE if the transaction timeout can be set successfully.
877      * @param transactionTimeout The new transaction timeout value.
878      * @exception XAException
879      */

880     public boolean setTransactionTimeout(int transactionTimeout) throws
881             XAException JavaDoc {
882         return true;
883     }
884     
885     
886     /**
887      * This method is called to start a transaction on a resource manager.
888      *
889      * @param xid The id of the new transaction.
890      * @param flags The flags associated with the transaction.
891      * @exception XAException
892      */

893     public void start(Xid JavaDoc xid, int flags) throws XAException JavaDoc {
894         
895     }
896     
897     
898     /**
899      * The compare to interface used to order this object in the queues.
900      *
901      * @return -1,0,1 depending on the order of the object.
902      * @param o The object to perform the comparison on.
903      */

904     public int compareTo(Object JavaDoc o) {
905         MessageManagerImpl msg =(MessageManagerImpl)o;
906         if (msg.nextProcessTime().getTime() > nextProcessTime().getTime()) {
907             return -1;
908         } else if (nextProcessTime().getTime() > msg.nextProcessTime().getTime()) {
909             return 1;
910         } else if (msg.getPriority() > getPriority()) {
911             return -1;
912         } else if (getPriority() > msg.getPriority()) {
913             return 1;
914         }
915         return 0;
916     }
917     
918     
919     /**
920      * This method returns the message object.
921      *
922      * @return The message object.
923      * @exception MessageServiceException
924      */

925     private MessageImpl loadMessage() throws MessageServiceException {
926         try {
927             //MessageTransactionLock.getInstance().lock();
928
Session session = HibernateUtil.getInstance(
929                     MessageServiceImpl.class).getSession();
930             
931             com.rift.coad.daemon.messageservice.db.Message message =
932                     (com.rift.coad.daemon.messageservice.db.Message)session.
933                     get(com.rift.coad.daemon.messageservice.db.Message.class,id);
934             
935             MessageImpl result = null;
936             if (message.getMessageType() == MessageManagerImpl.RPC_MESSAGE) {
937                 RPCMessageImpl rpcMessage = new RPCMessageImpl(message.getId(),
938                         new Date JavaDoc(message.getCreated().getTime()),
939                         message.getRetries(), new Date JavaDoc(message.getProcessed().
940                         getTime()),message.getMessageCreator(),
941                         message.getSessionId(), null,message.getFromUrl(),
942                         message.getMessageRoutingType(),
943                         message.getMessageState());
944                 MessageRpcBody rpcBody = (MessageRpcBody)session.get(
945                         MessageRpcBody.class,message.getId());
946                 rpcMessage.setMethodBodyXML(rpcBody.getXml());
947                 if (rpcBody.getExceptionValue() != null) {
948                     rpcMessage.setThrowableBytes(rpcBody.
949                             getExceptionValue().clone());
950                 }
951                 if (rpcBody.getResultValue() != null) {
952                     rpcMessage.setResultBytes(
953                             rpcBody.getResultValue().clone());
954                 }
955                 result = rpcMessage;
956             } else {
957                 
958                 TextMessageImpl txtMessage = new TextMessageImpl(message.getId(),
959                         new Date JavaDoc(message.getCreated().getTime()),
960                         message.getRetries(), new Date JavaDoc(message.getProcessed().
961                         getTime()),message.getMessageCreator(),
962                         message.getSessionId(),null,message.getFromUrl(),
963                         message.getMessageRoutingType(),
964                         message.getMessageState());
965                 MessageTxtBody txtBody = (MessageTxtBody)session.get(
966                         MessageTxtBody.class,message.getId());
967                 txtMessage.setTextBody(txtBody.getBody());
968                 result = txtMessage;
969             }
970             
971             // set from
972
result.setFrom(message.getFromUrl());
973             result.setPriority(message.getPriority());
974             result.setNextProcessDate(new Date JavaDoc(
975                     message.getNextProcess().getTime()));
976             result.setProcessedDate(new Date JavaDoc(
977                     message.getProcessed().getTime()));
978             
979             // set the target
980
if (message.getTarget() != null) {
981                 result.setTarget(message.getTarget());
982             }
983             // set the reply flag
984
if (message.getReply() == 1) {
985                 result.setReply(true);
986             } else {
987                 result.setReply(false);
988             }
989             // set the reply to address
990
if (message.getReplyUrl() != null) {
991                 result.setReplyTo(message.getReplyUrl());
992             }
993             // set the named queue
994
if (message.getTargetNamedQueue() != null) {
995                 result.setTargetNamedQueue(message.getTargetNamedQueue());
996             }
997             if (message.getReplyNamedQueue() != null) {
998                 result.setReplyNamedQueue(message.getReplyNamedQueue());
999             }
1000            // the correlation id
1001
if (message.getCorrelationId() != null) {
1002                result.setCorrelationId(message.getCorrelationId());
1003            }
1004            
1005            // set the services
1006
List JavaDoc dbServices = session.createQuery(
1007                    "FROM MessageService as service WHERE " +
1008                    "service.message.id = ?").
1009                    setString(0,message.getId()).list();
1010            String JavaDoc[] services = new String JavaDoc[dbServices.size()];
1011            int index = 0;
1012            for (Iterator JavaDoc iter = dbServices.iterator();
1013            iter.hasNext(); index++) {
1014                services[index] = ((MessageService)iter.next()).getService();
1015            }
1016            result.setServices(services);
1017            
1018            // set properties
1019
List JavaDoc dbProperties = session.createQuery(
1020                    "FROM MessageProperty as property WHERE " +
1021                    "property.message.id = ?").
1022                    setString(0,message.getId()).list();
1023            for (Iterator JavaDoc iter = dbProperties.iterator();
1024            iter.hasNext();) {
1025                MessageProperty property = (MessageProperty)iter.next();
1026                if (property.getBoolValue() != null) {
1027                    result.setBooleanProperty(property.getName(),
1028                            ((Integer JavaDoc)property.getBoolValue()).intValue() == 1 ?
1029                                true:false);
1030                } else if (property.getByteValue() != null) {
1031                    result.setByteProperty(property.getName(),
1032                            ((Integer JavaDoc)property.getByteValue()).byteValue());
1033                } else if (property.getIntValue() != null) {
1034                    result.setPropertyValue(property.getName(),
1035                            property.getIntValue());
1036                } else if (property.getLongValue() != null) {
1037                    result.setPropertyValue(property.getName(),
1038                            property.getLongValue());
1039                } else if (property.getDoubleValue() != null) {
1040                    result.setPropertyValue(property.getName(),
1041                            property.getDoubleValue());
1042                } else if (property.getFloatValue() != null) {
1043                    result.setPropertyValue(property.getName(),
1044                            property.getFloatValue());
1045                } else if (property.getObjectValue() != null) {
1046                    result.setPropertyValue(property.getName(),
1047                            property.getObjectValue());
1048                } else if (property.getStringValue() != null) {
1049                    result.setPropertyValue(property.getName(),
1050                            property.getStringValue());
1051                }
1052                
1053            }
1054            
1055            List JavaDoc principals = new ArrayList JavaDoc();
1056            List JavaDoc dbPrincipals = session.createQuery(
1057                    "FROM MessagePrincipal as principal WHERE " +
1058                    "principal.message.id = ?").
1059                    setString(0,message.getId()).list();
1060            for (Iterator JavaDoc iter = dbPrincipals.iterator(); iter.hasNext();) {
1061                MessagePrincipal principal = (MessagePrincipal)iter.next();
1062                principals.add(principal.getPrincipalValue());
1063            }
1064            result.setMessagePrincipals(principals);
1065            
1066            List JavaDoc dbErrors = session.createQuery(
1067                    "FROM MessageError as error WHERE " +
1068                    "error.message.id = ?").
1069                    setString(0,message.getId()).list();
1070            for (Iterator JavaDoc iter = dbErrors.iterator(); iter.hasNext();) {
1071                com.rift.coad.daemon.messageservice.db.MessageError
1072                        dbMessageError =
1073                        (com.rift.coad.daemon.messageservice.db.MessageError)
1074                        iter.next();
1075                com.rift.coad.daemon.messageservice.MessageError messageError =
1076                        new com.rift.coad.daemon.messageservice.MessageError(
1077                        new Date JavaDoc(dbMessageError.getErrorDate().getTime()),
1078                        dbMessageError.getErrorLevel(),dbMessageError.getMsg());
1079                ((MessageImpl)result).addError(messageError);
1080            }
1081            
1082            if (message.getMessageQueue() != null) {
1083                originalMessageQueueName = messageQueueName =
1084                        message.getMessageQueue().getMessageQueueName();
1085            }
1086            if (message.getNextProcess() != null) {
1087                this.nextProcessTime = this.originalNextProcessTime =
1088                        new Date JavaDoc(message.getNextProcess().getTime());
1089            }
1090            
1091            return result;
1092        } catch (Exception JavaDoc ex) {
1093            log.error("Failed to load the message because : " +
1094                    ex.getMessage(),ex);
1095            throw new MessageServiceException(
1096                    "Failed to load the message because : " +
1097                    ex.getMessage(),ex);
1098        }
1099    }
1100    
1101}
1102
Popular Tags