1 21 22 package com.rift.coad.daemon.messageservice.message; 24 25 import java.sql.Timestamp ; 27 import java.util.ArrayList ; 28 import java.util.Date ; 29 import java.util.Enumeration ; 30 import java.util.HashSet ; 31 import java.util.Iterator ; 32 import java.util.List ; 33 import java.util.Set ; 34 import javax.transaction.xa.XAException ; 35 import javax.transaction.xa.XAResource ; 36 import javax.transaction.xa.Xid ; 37 38 import org.hibernate.*; 40 import org.hibernate.cfg.*; 41 42 import org.apache.log4j.Logger; 44 45 import com.rift.coad.daemon.messageservice.Message; 47 import com.rift.coad.daemon.messageservice.MessageServiceImpl; 48 import com.rift.coad.daemon.messageservice.RPCMessage; 49 import com.rift.coad.daemon.messageservice.TextMessage; 50 import com.rift.coad.daemon.messageservice.MessageManager; 51 import com.rift.coad.daemon.messageservice.MessageServiceException; 52 import com.rift.coad.daemon.messageservice.InvalidProperty; 53 import com.rift.coad.daemon.messageservice.db.*; 54 import com.rift.coad.daemon.messageservice.named.NamedQueueManagerImpl; 55 import com.rift.coad.hibernate.util.HibernateUtil; 56 import com.rift.coad.lib.common.ObjectSerializer; 57 import com.rift.coad.util.transaction.TransactionManager; 58 import com.rift.coad.util.change.Change; 59 import com.rift.coad.util.change.ChangeException; 60 import com.rift.coad.util.change.ChangeLog; 61 62 63 68 public class MessageManagerImpl implements MessageManager { 69 70 71 74 public static class AddMessageChange implements Change { 75 private MessageImpl newMessage = null; 77 78 84 public AddMessageChange(MessageImpl newMessage) throws 85 MessageServiceException { 86 try { 87 this.newMessage = (MessageImpl)newMessage.clone(); 88 } catch (Exception ex) { 89 log.error("Failed to create a new add message change : " + 90 ex.getMessage(),ex); 91 throw new MessageServiceException( 92 "Failed to create a new add message change : " + 93 ex.getMessage(),ex); 94 } 95 } 96 97 100 public void applyChanges() throws ChangeException { 101 try { 102 Session session = HibernateUtil.getInstance( 104 MessageServiceImpl.class).getSession(); 105 106 107 int reply = 0; 109 if (newMessage.getReply()) { 110 reply = 1; 111 } 112 int acknowledged = 0; 113 if (newMessage.isAcknowledged()) { 114 acknowledged = 1; 115 } 116 117 int messageType = TEXT_MESSAGE; 119 if (newMessage instanceof RPCMessage) { 120 messageType = RPC_MESSAGE; 121 } 122 123 com.rift.coad.daemon.messageservice.db.Message message = 125 new com.rift.coad.daemon.messageservice.db.Message( 126 newMessage.getMessageId(),newMessage.getMessageCreater(), 127 newMessage.getSessionId(),messageType, 128 newMessage.getMessageType(),newMessage.getPriority(), 129 reply, newMessage.getFrom(), acknowledged, 0, 130 newMessage.getRetries()); 131 if ((newMessage.getTarget() != null) && 132 (newMessage.getTarget().length() != 0 )) { 133 message.setTarget(newMessage.getTarget()); 134 } 135 if (newMessage.getFrom() == null) { 136 throw new InvalidProperty("The from address must be set"); 137 } 138 message.setFromUrl(newMessage.getFrom()); 139 if ((newMessage.getReplyTo() != null) && 140 (newMessage.getReplyTo().length() != 0)) { 141 message.setReplyUrl(newMessage.getReplyTo()); 142 } 143 if ((newMessage.getTargetNamedQueue() != null) && 144 (newMessage.getTargetNamedQueue().length() != 0)) { 145 message.setTargetNamedQueue(newMessage.getTargetNamedQueue()); 146 } 147 if ((newMessage.getReplyNamedQueue() != null) && 148 (newMessage.getReplyNamedQueue().length() != 0)) { 149 message.setReplyNamedQueue(newMessage.getReplyNamedQueue()); 150 } 151 if ((newMessage.getCorrelationId() != null) && 152 (newMessage.getCorrelationId().length() != 0)) { 153 message.setCorrelationId(newMessage.getCorrelationId()); 154 } 155 message.setCreated(new Timestamp (newMessage.getCreated(). 156 getTime())); 157 message.setProcessed(new Timestamp (newMessage.getProcessedDate(). 158 getTime())); 159 message.setNextProcess(new Timestamp ( 160 ((MessageImpl)newMessage).getNextProcessDate().getTime())); 161 message.setMessageState(newMessage.getState()); 162 session.persist(message); 163 164 if (newMessage instanceof RPCMessage) { 165 RPCMessage rpcMessage = (RPCMessage)newMessage; 166 MessageRpcBody rpcBody = new MessageRpcBody(); 167 rpcBody.setMessage(message); 168 rpcBody.setMessageId(message.getId()); 169 rpcBody.setXml(rpcMessage.getMethodBodyXML()); 170 if (rpcMessage.generatedException()) { 171 rpcBody.setExceptionValue( 172 ((RPCMessageImpl)rpcMessage).getThrowableBytes(). 173 clone()); 174 } 175 if (rpcMessage.getResult() != null) { 176 rpcBody.setResultValue( 177 ((RPCMessageImpl)rpcMessage).getResultBytes(). 178 clone()); 179 } 180 session.persist(rpcBody); 181 } else if (newMessage instanceof TextMessage) { 182 TextMessage textMessage = (TextMessage)newMessage; 183 MessageTxtBody txtBody = new MessageTxtBody(); 184 txtBody.setMessage(message); 185 txtBody.setMessageId(message.getId()); 186 txtBody.setBody(textMessage.getTextBody()); 187 session.persist(txtBody); 188 } else { 189 log.error("The message type [" + newMessage.getClass().getName() 190 + "] is not recognised."); 191 throw new MessageServiceException("The message type [" + 192 newMessage.getClass().getName() + 193 "] is not recognised."); 194 } 195 196 if (newMessage.getServices() != null) { 198 String [] services = newMessage.getServices(); 199 for (int index = 0; index < services.length; index++) { 200 MessageService messageServices = new MessageService(); 201 messageServices.setMessage(message); 202 messageServices.setService(services[index]); 203 session.persist(messageServices); 204 } 205 } 206 207 for (Enumeration enumerat = newMessage.getPropertyNames(); 209 enumerat.hasMoreElements();) { 210 String key = (String )enumerat.nextElement(); 211 Object value = newMessage.getPropertyValue(key); 212 MessageProperty property = new MessageProperty(); 213 property.setMessage(message); 214 property.setName(key); 215 if (value instanceof Boolean ) { 216 if (((Boolean )value).booleanValue()) { 217 property.setBoolValue(new Integer (1)); 218 } else { 219 property.setBoolValue(new Integer (0)); 220 } 221 } else if (value instanceof Byte ) { 222 property.setByteValue(new Integer (((Byte )value).intValue())); 223 } else if (value instanceof Integer ) { 224 property.setIntValue((Integer )value); 225 } else if (value instanceof Long ) { 226 property.setLongValue((Long )value); 227 } else if (value instanceof Double ) { 228 property.setDoubleValue((Double )value); 229 } else if (value instanceof Float ) { 230 property.setFloatValue((Float )value); 231 } else if (value instanceof String ) { 232 property.setStringValue((String )value); 233 } else if (value instanceof byte[]) { 234 property.setObjectValue((byte[])value); 235 } 236 session.persist(property); 237 } 238 239 if (newMessage.getMessagePrincipals() == null) { 241 throw new InvalidProperty("Must supply principals"); 242 } 243 List principals = newMessage.getMessagePrincipals(); 244 for (Iterator iter = principals.iterator(); iter.hasNext();) { 245 MessagePrincipal principal = new MessagePrincipal(); 246 principal.setMessage(message); 247 String principalStr = (String )iter.next(); 248 principal.setPrincipalValue(principalStr); 250 session.persist(principal); 251 } 252 List errors = newMessage.getErrors(); 253 for (Iterator iter = errors.iterator(); iter.hasNext();) { 254 com.rift.coad.daemon.messageservice.MessageError messageError = 255 (com.rift.coad.daemon.messageservice.MessageError) 256 iter.next(); 257 com.rift.coad.daemon.messageservice.db.MessageError 258 dbMessageError = new 259 com.rift.coad.daemon.messageservice.db.MessageError(); 260 dbMessageError.setMessage(message); 261 dbMessageError.setErrorDate(new java.sql.Timestamp ( 262 messageError.getErrorDate().getTime())); 263 dbMessageError.setErrorLevel(messageError.getLevel()); 264 dbMessageError.setMsg(messageError.getMSG()); 265 session.persist(dbMessageError); 266 } 267 268 } catch (Exception ex) { 269 log.error("Failed to apply the changes : " + ex.getMessage(),ex); 270 throw new ChangeException( 271 "Failed to apply the changes : " + ex.getMessage(),ex); 272 } 273 274 } 275 } 276 277 280 public static class AssignMessageToQueueChange implements Change { 281 private String messageId = null; 283 private String queueName = null; 284 285 288 public AssignMessageToQueueChange(String messageId,String queueName) { 289 this.messageId = new String (messageId); 290 this.queueName = new String (queueName); 291 } 292 293 294 297 public void applyChanges() throws ChangeException { 298 try { 299 Session session = HibernateUtil.getInstance( 301 MessageServiceImpl.class).getSession(); 302 List entries = session.createQuery( 303 "FROM MessageQueue as queue WHERE queue.messageQueueName = ?"). 304 setString(0,queueName).list(); 305 306 if (entries.size() != 1) { 307 log.error("There is no queue by the name of : " + 308 queueName); 309 throw new MessageServiceException( 310 "There is no queue by the name of : " + 311 queueName); 312 } 313 com.rift.coad.daemon.messageservice.db.MessageQueue messageQueue = 314 (com.rift.coad.daemon.messageservice.db.MessageQueue) 315 entries.get(0); 316 com.rift.coad.daemon.messageservice.db.Message message = 317 (com.rift.coad.daemon.messageservice.db.Message)session. 318 get(com.rift.coad.daemon.messageservice.db.Message.class, 319 messageId); 320 message.setMessageQueue(messageQueue); 321 } catch (Exception ex) { 322 log.error("Failed to apply the changes : " + ex.getMessage(),ex); 323 throw new ChangeException( 324 "Failed to apply the changes : " + ex.getMessage(),ex); 325 } 326 } 327 } 328 329 330 333 public static class UpdateMessageChange implements Change { 334 private MessageImpl updatedMessage = null; 336 337 343 public UpdateMessageChange(MessageImpl updatedMessage) throws 344 MessageServiceException { 345 try { 346 this.updatedMessage = (MessageImpl)updatedMessage.clone(); 347 } catch (Exception ex) { 348 log.error("Failed to clone the updated message : " + 349 ex.getMessage(),ex); 350 throw new MessageServiceException( 351 "Failed to clone the updated message : " + 352 ex.getMessage(),ex); 353 } 354 } 355 356 357 360 public void applyChanges() throws ChangeException { 361 try { 362 Session session = HibernateUtil.getInstance( 364 MessageServiceImpl.class).getSession(); 365 366 com.rift.coad.daemon.messageservice.db.Message message = 367 (com.rift.coad.daemon.messageservice.db.Message)session. 368 get(com.rift.coad.daemon.messageservice.db.Message.class, 369 updatedMessage.getMessageId()); 370 371 if (!(updatedMessage instanceof MessageImpl)) { 372 throw new MessageServiceException( 373 "The incorrect message object has been passed " + 374 "into update"); 375 } 376 MessageImpl messageImpl = (MessageImpl)updatedMessage; 377 if ((updatedMessage.getTarget() != null) && 378 (updatedMessage.getTarget().length() != 0 )) { 379 message.setTarget(updatedMessage.getTarget()); 380 } 381 if (updatedMessage.getFrom() == null) { 382 throw new InvalidProperty("The from address must be set"); 383 } 384 message.setFromUrl(updatedMessage.getFrom()); 385 if ((updatedMessage.getReplyTo() != null) && 386 (updatedMessage.getReplyTo().length() != 0)) { 387 message.setReplyUrl(updatedMessage.getReplyTo()); 388 } 389 if ((updatedMessage.getTargetNamedQueue() != null) && 390 (updatedMessage.getTargetNamedQueue().length() != 0)) { 391 message.setTargetNamedQueue(updatedMessage. 392 getTargetNamedQueue()); 393 } 394 if ((updatedMessage.getReplyNamedQueue() != null) && 395 (updatedMessage.getReplyNamedQueue().length() != 0)) { 396 message.setReplyNamedQueue(updatedMessage. 397 getReplyNamedQueue()); 398 } 399 if ((updatedMessage.getCorrelationId() != null) && 400 (updatedMessage.getCorrelationId().length() != 0)) { 401 message.setCorrelationId(updatedMessage.getCorrelationId()); 402 } 403 message.setCreated(new Timestamp (updatedMessage.getCreated(). 404 getTime())); 405 message.setProcessed(new Timestamp (updatedMessage.getProcessedDate(). 406 getTime())); 407 message.setNextProcess(new Timestamp ( 408 ((MessageImpl)updatedMessage).getNextProcessDate().getTime())); 409 message.setMessageState(updatedMessage.getState()); 410 message.setRetries(updatedMessage.getRetries()); 411 message.setMessageRoutingType(updatedMessage.getMessageType()); 412 message.setPriority(updatedMessage.getPriority()); 413 if (messageImpl.isAcknowledged()) { 414 message.setAcknowledged(1); 415 } else { 416 message.setAcknowledged(0); 417 } 418 419 420 session.createQuery( 422 "DELETE FROM MessageProperty as property WHERE " + 423 "property.message.id = ?"). 424 setString(0,updatedMessage.getMessageId()). 425 executeUpdate(); 426 for (Enumeration enumerat = updatedMessage.getPropertyNames(); 428 enumerat.hasMoreElements();) { 429 String key = (String )enumerat.nextElement(); 430 Object value = updatedMessage.getPropertyValue(key); 431 MessageProperty property = new MessageProperty(); 432 property.setMessage(message); 433 property.setName(key); 434 if (value instanceof Boolean ) { 435 if (((Boolean )value).booleanValue()) { 436 property.setBoolValue(new Integer (1)); 437 } else { 438 property.setBoolValue(new Integer (0)); 439 } 440 } else if (value instanceof Byte ) { 441 property.setByteValue(new Integer (((Byte )value).intValue())); 442 } else if (value instanceof Integer ) { 443 property.setIntValue((Integer )value); 444 } else if (value instanceof Long ) { 445 property.setLongValue((Long )value); 446 } else if (value instanceof Double ) { 447 property.setDoubleValue((Double )value); 448 } else if (value instanceof Float ) { 449 property.setFloatValue((Float )value); 450 } else if (value instanceof String ) { 451 property.setStringValue((String )value); 452 } else if (value instanceof byte[]) { 453 property.setObjectValue((byte[])value); 454 } 455 session.persist(property); 456 } 457 458 session.createQuery( 460 "DELETE FROM MessageError as error WHERE " + 461 "error.message.id = ?"). 462 setString(0,updatedMessage.getMessageId()). 463 executeUpdate(); 464 List errors = updatedMessage.getErrors(); 465 for (Iterator iter = errors.iterator(); iter.hasNext();) { 466 com.rift.coad.daemon.messageservice.MessageError messageError = 467 (com.rift.coad.daemon.messageservice.MessageError) 468 iter.next(); 469 com.rift.coad.daemon.messageservice.db.MessageError 470 dbMessageError = new 471 com.rift.coad.daemon.messageservice.db.MessageError(); 472 dbMessageError.setMessage(message); 473 dbMessageError.setErrorDate(new java.sql.Timestamp ( 474 messageError.getErrorDate().getTime())); 475 dbMessageError.setErrorLevel(messageError.getLevel()); 476 dbMessageError.setMsg(messageError.getMSG()); 477 session.persist(dbMessageError); 478 } 479 480 481 if (updatedMessage instanceof RPCMessage) { 482 RPCMessage rpcMessage = (RPCMessage)updatedMessage; 483 MessageRpcBody rpcBody = (MessageRpcBody)session.get( 484 MessageRpcBody.class,message.getId()); 485 if (rpcMessage.generatedException()) { 486 rpcBody.setExceptionValue( 487 ((RPCMessageImpl)rpcMessage).getThrowableBytes(). 488 clone()); 489 } 490 if (((RPCMessageImpl)rpcMessage).getResultBytes() != null) { 491 rpcBody.setResultValue( 492 ((RPCMessageImpl)rpcMessage).getResultBytes(). 493 clone()); 494 } 495 } else if (updatedMessage instanceof TextMessage) { 496 TextMessage textMessage = (TextMessage)updatedMessage; 497 MessageTxtBody txtBody = (MessageTxtBody)session.get( 498 MessageTxtBody.class,message.getId()); 499 txtBody.setBody(textMessage.getTextBody()); 500 } else { 501 log.error("The message type [" + updatedMessage.getClass().getName() 502 + "] is not recognised."); 503 throw new ChangeException("The message type [" + 504 updatedMessage.getClass().getName() + 505 "] is not recognised."); 506 } 507 508 } catch (ChangeException ex) { 509 throw ex; 510 } catch (Exception ex) { 511 log.error("Failed to update the message because : " + 512 ex.getMessage(),ex); 513 throw new ChangeException( 514 "Failed to update the message because : " + 515 ex.getMessage(),ex); 516 } 517 } 518 } 519 520 521 524 public static class RemoveMessageChange implements Change { 525 private String messageId = null; 527 528 533 public RemoveMessageChange(String messageId) { 534 this.messageId = new String (messageId); 535 } 536 537 538 541 public void applyChanges() throws ChangeException { 542 try { 543 Session session = HibernateUtil.getInstance( 545 MessageServiceImpl.class).getSession(); 546 session.createQuery( 547 "DELETE FROM MessageRpcBody as body WHERE " + 548 "body.message.id = ?"). 549 setString(0,messageId).executeUpdate(); 550 session.createQuery( 551 "DELETE FROM MessageTxtBody as body WHERE " + 552 "body.message.id = ?"). 553 setString(0,messageId).executeUpdate(); 554 session.createQuery( 555 "DELETE FROM MessageService as service WHERE " + 556 "service.message.id = ?"). 557 setString(0,messageId).executeUpdate(); 558 session.createQuery( 559 "DELETE FROM MessageProperty as property WHERE " + 560 "property.message.id = ?"). 561 setString(0,messageId).executeUpdate(); 562 session.createQuery( 563 "DELETE FROM MessagePrincipal as principal WHERE " + 564 "principal.message.id = ?"). 565 setString(0,messageId).executeUpdate(); 566 session.createQuery( 567 "DELETE FROM MessageError as error WHERE " + 568 "error.message.id = ?"). 569 setString(0,messageId).executeUpdate(); 570 session.createQuery("DELETE FROM Message as msg WHERE msg.id = ?"). 571 setString(0,messageId).executeUpdate(); 572 } catch (Exception ex) { 573 log.error("Failed to failed to remove the message from the db : " + 574 ex.getMessage(),ex); 575 throw new ChangeException( 576 "Failed to failed to remove the message from the db : " + 577 ex.getMessage(),ex); 578 } 579 } 580 } 581 582 public final static int TEXT_MESSAGE = 1; 584 public final static int RPC_MESSAGE = 2; 585 586 protected static Logger log = 588 Logger.getLogger(MessageManagerImpl.class.getName()); 589 590 private String id = null; 592 private Date nextProcessTime = null; 593 private String messageQueueName = null; 594 private MessageImpl masterMessageImpl = null; 595 596 private Date originalNextProcessTime = null; 598 private String originalMessageQueueName = null; 599 private MessageImpl originalMessageImpl = null; 600 601 606 public MessageManagerImpl(String id) throws MessageServiceException { 607 this.id = id; 608 originalMessageImpl = masterMessageImpl = loadMessage(); 609 } 610 611 612 617 public MessageManagerImpl(Message newMessage) throws MessageServiceException { 618 try { 619 this.id = newMessage.getMessageId(); 620 nextProcessTime = ((MessageImpl)newMessage). 621 getNextProcessDate(); 622 originalNextProcessTime = ((MessageImpl)newMessage). 623 getNextProcessDate(); 624 originalMessageImpl = (MessageImpl)newMessage; 625 masterMessageImpl = (MessageImpl)newMessage; 626 ChangeLog.getInstance().addChange(new AddMessageChange( 627 (MessageImpl)newMessage)); 628 TransactionManager.getInstance().bindResource(this,true); 629 } catch (MessageServiceException ex) { 630 throw ex; 631 } catch (Exception ex) { 632 log.error("Failed to create the message " + 633 "from the database : " + ex.getMessage(),ex); 634 throw new MessageServiceException("Failed to create the message " + 635 "in the database : " + ex.getMessage(),ex); 636 } 637 } 638 639 640 645 public String getID() { 646 return id; 647 } 648 649 650 656 public Message getMessage() throws MessageServiceException { 657 return masterMessageImpl; 658 } 659 660 661 667 public void assignToQueue(String queueName) throws MessageServiceException { 668 try { 669 TransactionManager.getInstance().bindResource(this,true); 670 ChangeLog.getInstance().addChange(new AssignMessageToQueueChange( 671 this.id,queueName)); 672 this.messageQueueName = queueName; 673 } catch (Exception ex) { 674 log.error("Failed to assign this object to a queue because : " + 675 ex.getMessage(),ex); 676 throw new MessageServiceException( 677 "Failed to assign this object to a queue because : " + 678 ex.getMessage(),ex); 679 } 680 } 681 682 683 689 public void updateMessage(Message updatedMessage) throws 690 MessageServiceException { 691 try { 692 TransactionManager.getInstance().bindResource(this,true); 693 ChangeLog.getInstance().addChange(new UpdateMessageChange( 694 (MessageImpl)updatedMessage)); 695 nextProcessTime = ((MessageImpl)updatedMessage). 696 getNextProcessDate(); 697 masterMessageImpl = (MessageImpl)updatedMessage; 698 } catch (MessageServiceException ex) { 699 throw ex; 700 } catch (Exception ex) { 701 log.error("Failed to update the message because : " + 702 ex.getMessage(),ex); 703 throw new MessageServiceException( 704 "Failed to update the message because : " + 705 ex.getMessage(),ex); 706 } 707 } 708 709 710 715 public void remove() throws MessageServiceException { 716 try { 717 TransactionManager.getInstance().bindResource(this,true); 718 ChangeLog.getInstance().addChange(new RemoveMessageChange( 719 this.id)); 720 } catch (Exception ex) { 721 log.error("Failed to failed to remove the message : " + 722 ex.getMessage(),ex); 723 throw new MessageServiceException( 724 "Failed to failed to remove the message : " + 725 ex.getMessage(),ex); 726 } 727 } 728 729 730 736 public Date nextProcessTime() { 737 return nextProcessTime; 738 } 739 740 741 744 public int getPriority() { 745 return this.masterMessageImpl.getPriority(); 746 } 747 748 749 755 public String getMessageQueueName() { 756 return this.messageQueueName; 757 } 758 759 760 767 public synchronized void commit(Xid xid, boolean onePhase) throws 768 XAException { 769 if (nextProcessTime != null) { 770 this.originalNextProcessTime = nextProcessTime; 771 } 772 if (this.messageQueueName != null) { 773 this.originalMessageQueueName = messageQueueName; 774 } 775 if (this.masterMessageImpl != null) { 776 this.originalMessageImpl = masterMessageImpl; 777 } 778 } 779 780 781 788 public void end(Xid xid, int flags) throws XAException { 789 } 790 791 792 798 public void forget(Xid xid) throws XAException { 799 if (nextProcessTime != null) { 800 this.originalNextProcessTime = nextProcessTime; 801 } 802 if (this.messageQueueName != null) { 803 this.originalMessageQueueName = messageQueueName; 804 } 805 if (this.masterMessageImpl != null) { 806 this.originalMessageImpl = masterMessageImpl; 807 } 808 } 809 810 811 817 public int getTransactionTimeout() throws XAException { 818 return -1; 819 } 820 821 822 830 public boolean isSameRM(XAResource xAResource) throws XAException { 831 return this == xAResource; 832 } 833 834 835 842 public int prepare(Xid xid) throws XAException { 843 return XAResource.XA_OK; 844 } 845 846 847 855 public Xid [] recover(int flags) throws XAException { 856 return null; 857 } 858 859 860 866 public void rollback(Xid xid) throws XAException { 867 nextProcessTime = originalNextProcessTime; 868 messageQueueName = originalMessageQueueName; 869 masterMessageImpl = originalMessageImpl; 870 } 871 872 873 880 public boolean setTransactionTimeout(int transactionTimeout) throws 881 XAException { 882 return true; 883 } 884 885 886 893 public void start(Xid xid, int flags) throws XAException { 894 895 } 896 897 898 904 public int compareTo(Object o) { 905 MessageManagerImpl msg =(MessageManagerImpl)o; 906 if (msg.nextProcessTime().getTime() > nextProcessTime().getTime()) { 907 return -1; 908 } else if (nextProcessTime().getTime() > msg.nextProcessTime().getTime()) { 909 return 1; 910 } else if (msg.getPriority() > getPriority()) { 911 return -1; 912 } else if (getPriority() > msg.getPriority()) { 913 return 1; 914 } 915 return 0; 916 } 917 918 919 925 private MessageImpl loadMessage() throws MessageServiceException { 926 try { 927 Session session = HibernateUtil.getInstance( 929 MessageServiceImpl.class).getSession(); 930 931 com.rift.coad.daemon.messageservice.db.Message message = 932 (com.rift.coad.daemon.messageservice.db.Message)session. 933 get(com.rift.coad.daemon.messageservice.db.Message.class,id); 934 935 MessageImpl result = null; 936 if (message.getMessageType() == MessageManagerImpl.RPC_MESSAGE) { 937 RPCMessageImpl rpcMessage = new RPCMessageImpl(message.getId(), 938 new Date (message.getCreated().getTime()), 939 message.getRetries(), new Date (message.getProcessed(). 940 getTime()),message.getMessageCreator(), 941 message.getSessionId(), null,message.getFromUrl(), 942 message.getMessageRoutingType(), 943 message.getMessageState()); 944 MessageRpcBody rpcBody = (MessageRpcBody)session.get( 945 MessageRpcBody.class,message.getId()); 946 rpcMessage.setMethodBodyXML(rpcBody.getXml()); 947 if (rpcBody.getExceptionValue() != null) { 948 rpcMessage.setThrowableBytes(rpcBody. 949 getExceptionValue().clone()); 950 } 951 if (rpcBody.getResultValue() != null) { 952 rpcMessage.setResultBytes( 953 rpcBody.getResultValue().clone()); 954 } 955 result = rpcMessage; 956 } else { 957 958 TextMessageImpl txtMessage = new TextMessageImpl(message.getId(), 959 new Date (message.getCreated().getTime()), 960 message.getRetries(), new Date (message.getProcessed(). 961 getTime()),message.getMessageCreator(), 962 message.getSessionId(),null,message.getFromUrl(), 963 message.getMessageRoutingType(), 964 message.getMessageState()); 965 MessageTxtBody txtBody = (MessageTxtBody)session.get( 966 MessageTxtBody.class,message.getId()); 967 txtMessage.setTextBody(txtBody.getBody()); 968 result = txtMessage; 969 } 970 971 result.setFrom(message.getFromUrl()); 973 result.setPriority(message.getPriority()); 974 result.setNextProcessDate(new Date ( 975 message.getNextProcess().getTime())); 976 result.setProcessedDate(new Date ( 977 message.getProcessed().getTime())); 978 979 if (message.getTarget() != null) { 981 result.setTarget(message.getTarget()); 982 } 983 if (message.getReply() == 1) { 985 result.setReply(true); 986 } else { 987 result.setReply(false); 988 } 989 if (message.getReplyUrl() != null) { 991 result.setReplyTo(message.getReplyUrl()); 992 } 993 if (message.getTargetNamedQueue() != null) { 995 result.setTargetNamedQueue(message.getTargetNamedQueue()); 996 } 997 if (message.getReplyNamedQueue() != null) { 998 result.setReplyNamedQueue(message.getReplyNamedQueue()); 999 } 1000 if (message.getCorrelationId() != null) { 1002 result.setCorrelationId(message.getCorrelationId()); 1003 } 1004 1005 List dbServices = session.createQuery( 1007 "FROM MessageService as service WHERE " + 1008 "service.message.id = ?"). 1009 setString(0,message.getId()).list(); 1010 String [] services = new String [dbServices.size()]; 1011 int index = 0; 1012 for (Iterator iter = dbServices.iterator(); 1013 iter.hasNext(); index++) { 1014 services[index] = ((MessageService)iter.next()).getService(); 1015 } 1016 result.setServices(services); 1017 1018 List dbProperties = session.createQuery( 1020 "FROM MessageProperty as property WHERE " + 1021 "property.message.id = ?"). 1022 setString(0,message.getId()).list(); 1023 for (Iterator iter = dbProperties.iterator(); 1024 iter.hasNext();) { 1025 MessageProperty property = (MessageProperty)iter.next(); 1026 if (property.getBoolValue() != null) { 1027 result.setBooleanProperty(property.getName(), 1028 ((Integer )property.getBoolValue()).intValue() == 1 ? 1029 true:false); 1030 } else if (property.getByteValue() != null) { 1031 result.setByteProperty(property.getName(), 1032 ((Integer )property.getByteValue()).byteValue()); 1033 } else if (property.getIntValue() != null) { 1034 result.setPropertyValue(property.getName(), 1035 property.getIntValue()); 1036 } else if (property.getLongValue() != null) { 1037 result.setPropertyValue(property.getName(), 1038 property.getLongValue()); 1039 } else if (property.getDoubleValue() != null) { 1040 result.setPropertyValue(property.getName(), 1041 property.getDoubleValue()); 1042 } else if (property.getFloatValue() != null) { 1043 result.setPropertyValue(property.getName(), 1044 property.getFloatValue()); 1045 } else if (property.getObjectValue() != null) { 1046 result.setPropertyValue(property.getName(), 1047 property.getObjectValue()); 1048 } else if (property.getStringValue() != null) { 1049 result.setPropertyValue(property.getName(), 1050 property.getStringValue()); 1051 } 1052 1053 } 1054 1055 List principals = new ArrayList (); 1056 List dbPrincipals = session.createQuery( 1057 "FROM MessagePrincipal as principal WHERE " + 1058 "principal.message.id = ?"). 1059 setString(0,message.getId()).list(); 1060 for (Iterator iter = dbPrincipals.iterator(); iter.hasNext();) { 1061 MessagePrincipal principal = (MessagePrincipal)iter.next(); 1062 principals.add(principal.getPrincipalValue()); 1063 } 1064 result.setMessagePrincipals(principals); 1065 1066 List dbErrors = session.createQuery( 1067 "FROM MessageError as error WHERE " + 1068 "error.message.id = ?"). 1069 setString(0,message.getId()).list(); 1070 for (Iterator iter = dbErrors.iterator(); iter.hasNext();) { 1071 com.rift.coad.daemon.messageservice.db.MessageError 1072 dbMessageError = 1073 (com.rift.coad.daemon.messageservice.db.MessageError) 1074 iter.next(); 1075 com.rift.coad.daemon.messageservice.MessageError messageError = 1076 new com.rift.coad.daemon.messageservice.MessageError( 1077 new Date (dbMessageError.getErrorDate().getTime()), 1078 dbMessageError.getErrorLevel(),dbMessageError.getMsg()); 1079 ((MessageImpl)result).addError(messageError); 1080 } 1081 1082 if (message.getMessageQueue() != null) { 1083 originalMessageQueueName = messageQueueName = 1084 message.getMessageQueue().getMessageQueueName(); 1085 } 1086 if (message.getNextProcess() != null) { 1087 this.nextProcessTime = this.originalNextProcessTime = 1088 new Date (message.getNextProcess().getTime()); 1089 } 1090 1091 return result; 1092 } catch (Exception ex) { 1093 log.error("Failed to load the message because : " + 1094 ex.getMessage(),ex); 1095 throw new MessageServiceException( 1096 "Failed to load the message because : " + 1097 ex.getMessage(),ex); 1098 } 1099 } 1100 1101} 1102 | Popular Tags |