1 21 22 package com.rift.coad.daemon.messageservice; 24 25 import com.rift.coad.daemon.messageservice.named.NamedQueueManagerImpl; 27 import java.lang.reflect.Method ; 28 import java.util.ArrayList ; 29 import java.util.Date ; 30 import java.util.HashSet ; 31 import java.util.List ; 32 import java.util.Set ; 33 import java.util.StringTokenizer ; 34 import javax.naming.Context ; 35 import javax.naming.InitialContext ; 36 37 import org.apache.log4j.Logger; 39 40 import com.rift.coad.daemon.messageservice.message.MessageImpl; 42 import com.rift.coad.daemon.messageservice.message.RPCMessageImpl; 43 import com.rift.coad.daemon.messageservice.message.MessageManagerImpl; 44 import com.rift.coad.daemon.messageservice.message.MessageManagerFactory; 45 import com.rift.coad.daemon.messageservice.named.NamedMemoryQueue; 46 import com.rift.coad.daemon.servicebroker.ServiceBroker; 47 import com.rift.coad.lib.common.RandomGuid; 48 import com.rift.coad.lib.configuration.Configuration; 49 import com.rift.coad.lib.configuration.ConfigurationFactory; 50 import com.rift.coad.lib.deployment.DeploymentMonitor; 51 import com.rift.coad.lib.naming.NamingDirector; 52 import com.rift.coad.lib.naming.NamingConstants; 53 import com.rift.coad.lib.interceptor.InterceptorWrapper; 54 import com.rift.coad.lib.interceptor.credentials.Session; 55 import com.rift.coad.lib.thread.pool.Task; 56 import com.rift.coad.lib.thread.pool.ThreadPoolManager; 57 import com.rift.coad.lib.thread.pool.PoolException; 58 import com.rift.coad.lib.deployment.BeanInfo; 59 import com.rift.coad.lib.deployment.bean.BeanConnector; 60 import com.rift.coad.lib.deployment.bean.BeanManager; 61 import com.rift.coad.lib.deployment.jmxbean.JMXBeanConnector; 62 import com.rift.coad.lib.deployment.jmxbean.JMXBeanManager; 63 import com.rift.coad.util.connection.ConnectionManager; 64 import com.rift.coad.util.transaction.UserTransactionWrapper; 65 66 67 73 public class MessageProcessor extends InterceptorWrapper implements Task { 74 75 private final static long BACK_OFF_PERIOD = 1000; 77 private final static String RETRY_DELAY = "retry_delay"; 78 private final static long DEFAULT_RETRY_DELAY = 60000; 79 private final static String PARENT_INSTANCE = "../"; 80 81 protected static Logger log = 83 Logger.getLogger(MessageProcessor.class.getName()); 84 85 private Context context = null; 87 private UserTransactionWrapper utw = null; 88 private MessageProcessInfo messageProcessInfo = null; 89 private NamingDirector namingDirector = null; 90 private long delay = 0; 91 92 97 public MessageProcessor() throws Exception { 98 try { 99 context = new InitialContext (); 100 utw = new UserTransactionWrapper(); 101 namingDirector = NamingDirector.getInstance(); 102 Configuration config = ConfigurationFactory.getInstance(). 103 getConfig(MessageProcessor.class); 104 delay = config.getLong(RETRY_DELAY,DEFAULT_RETRY_DELAY); 105 } catch (Exception ex) { 106 log.error("Failed init the message processor : " + ex.getMessage(), 107 ex); 108 throw new Exception ("Failed init the message processor : " + 109 ex.getMessage(),ex); 110 } 111 } 112 113 114 121 public void process(ThreadPoolManager poolManager) throws Exception { 122 DeploymentMonitor.getInstance().waitUntilInitDeployComplete(); 123 if (DeploymentMonitor.getInstance().isTerminated()) { 124 return; 125 } 126 boolean foundMessage = getMessageManager(); 127 poolManager.releaseThread(); 128 if (foundMessage) { 129 processMessage(); 130 } 131 } 132 133 134 140 private boolean getMessageManager() { 141 try { 142 Date currentTime = new Date (); 143 Date delayTime = new Date (); 144 messageProcessInfo = MessageQueueManager. 145 getInstance().getNextMessage(currentTime); 146 if (messageProcessInfo != null) { 147 return true; 148 } 149 long difference = delayTime.getTime() - currentTime.getTime(); 150 if ((delayTime.getTime() == currentTime.getTime()) || 151 (difference > BACK_OFF_PERIOD) || (difference < 0)) { 152 ProcessMonitor.getInstance().monitor(BACK_OFF_PERIOD); 153 } else { 154 ProcessMonitor.getInstance().monitor(difference); 155 } 156 } catch (Exception ex) { 157 log.error("Failed to retrieve a message : " + ex.getMessage(),ex); 158 } 159 return false; 160 } 161 162 163 166 private void processMessage() { 167 Message message = null; 168 try { 169 message = getMessage(); 170 if (message.getState() == Message.UNDELIVERED) { 171 processUndelivered(message); 172 } else if (message.getState() == Message.DELIVERED) { 173 processDelivered(message); 174 } else if (message.getState() == Message.UNDELIVERABLE) { 175 processUndeliverable(message); 176 } 177 } catch (Exception ex) { 178 log.error("Failed to process the message : " + ex.getMessage(),ex); 179 if (message == null) { 180 pushMessage(messageProcessInfo); 181 } else { 182 pushMessage(message,messageProcessInfo); 183 } 184 } 185 } 186 187 188 194 private Message getMessage() throws MessageServiceException { 195 Message message = null; 196 try { 197 utw.begin(); 198 MessageManager messageManager = messageProcessInfo. 199 getMessageManager(); 200 message = messageManager.getMessage(); 201 utw.commit(); 202 } catch (Exception ex) { 203 log.error("Failed to retrieve the message : " + ex.getMessage(),ex); 204 throw new MessageServiceException( 205 "Failed to retrieve the message : " + ex.getMessage(),ex); 206 } finally { 207 utw.release(); 208 } 209 210 return message; 211 } 212 213 214 220 private void processUndelivered(Message message) throws 221 MessageServiceException { 222 try { 223 if (message.getMessageType() == Message.POINT_TO_POINT) { 224 if (checkIfTargetLocal(message) && 225 checkIfMessageInQueue(message)) { 226 deliverMessage(message); 227 } 228 } else if (message.getMessageType() == Message.POINT_TO_SERVICE) { 229 if (checkIfServiceLocal(message) && 230 checkIfMessageInQueue(message)) { 231 deliverMessage(message); 232 } 233 } else if (message.getMessageType() == 234 Message.POINT_TO_MULTI_SERVICE) { 235 if (!namingDirector.isPrimary()) { 236 deliverToParent(message); 237 } else { 238 cloneMessageForServices(message); 239 } 240 } 241 } catch (Exception ex) { 242 log.error("Failed to process the undelivered message : " + 243 ex.getMessage(),ex); 244 throw new MessageServiceException( 245 "Failed to process the undelivered message : " + 246 ex.getMessage(),ex); 247 } 248 } 249 250 251 257 private void processDelivered(Message message) throws 258 MessageServiceException { 259 try { 260 if (checkIfReplyLocal(message) && 261 checkIfReplyMessageInQueue(message)) { 262 deliverReplyMessage(message); 263 } 264 } catch (Exception ex) { 265 log.error("Failed to process the undelivered message : " + 266 ex.getMessage(),ex); 267 throw new MessageServiceException( 268 "Failed to process the undelivered message : " + 269 ex.getMessage(),ex); 270 } 271 } 272 273 274 280 private void processUndeliverable(Message message) throws 281 MessageServiceException { 282 try { 283 if (checkIfReplyLocal(message)) { 284 utw.begin(); 285 if (NamedQueueManagerImpl.getInstance().checkForNamedQueue( 286 MessageQueueManager.DEAD_LETTER,true)) { 287 log.info("Assign message to dead letter queue."); 288 messageProcessInfo.getMessageQueue().removeMessage( 289 message.getMessageId()); 290 ((MessageManagerImpl)messageProcessInfo.getMessageManager()). 291 assignToQueue(MessageQueueManager.DEAD_LETTER); 292 NamedMemoryQueue.getInstance( 293 MessageQueueManager.DEAD_LETTER). 294 addMessage(messageProcessInfo.getMessageManager()); 295 log.info("Added the value to the dead letter queue"); 296 } else { 297 log.error("Failed to add to the dead letter queue."); 298 } 299 utw.commit(); 300 } 301 } catch (Exception ex) { 302 log.error("Failed to process the Undeliverable message : " + 303 ex.getMessage(),ex); 304 throw new MessageServiceException( 305 "Failed to process the Undeliverable message : " + 306 ex.getMessage(),ex); 307 } finally { 308 utw.release(); 309 } 310 } 311 312 313 319 private void pushMessage(MessageProcessInfo messageProcessInfo) { 320 try { 321 messageProcessInfo.getMessageQueue().pushBackMessage( 322 messageProcessInfo.getMessageManager()); 323 ProcessMonitor.getInstance().notifyProcessor(); 324 } catch (Exception ex) { 325 log.error("Failed to push the message manager onto a queue : " + 326 ex.getMessage(),ex); 327 } 328 } 329 330 331 338 private void pushMessage(Message message, 339 MessageProcessInfo messageProcessInfo) { 340 try { 341 try { 342 utw.begin(); 343 Date nextDate = new Date (); 344 nextDate.setTime(nextDate.getTime() + delay); 345 ((MessageImpl)message).setNextProcessDate(nextDate); 346 messageProcessInfo.getMessageManager().updateMessage(message); 347 utw.commit(); 348 } catch (Exception ex) { 349 log.error("Failed to process the message : " + 350 ex.getMessage(),ex); 351 } finally { 352 utw.release(); 353 } 354 messageProcessInfo.getMessageQueue().pushBackMessage( 355 messageProcessInfo.getMessageManager()); 356 ProcessMonitor.getInstance().notifyProcessor(); 357 } catch (Exception ex) { 358 log.error("Failed to push the message manager onto a queue : " + 359 ex.getMessage(),ex); 360 } 361 } 362 363 364 371 private boolean checkIfTargetLocal(Message message) throws 372 MessageServiceException { 373 try { 374 String target = message.getTarget(); 375 if (target == null) { 376 message.addError(Message.ERROR, 377 "There is no target for this message"); 378 initUndeliverableProcess(message); 379 return false; 380 } 381 382 String jndiBase = NamingDirector.getInstance().getJNDIBase() + "/"; 384 String parentUrl = NamingDirector.getInstance().getPrimaryJNDIUrl(); 385 String instanceURL = NamingDirector.getInstance(). 386 getInstanceId() + "/"; 387 int pos = target.indexOf(jndiBase); 388 int instancePos = target.indexOf(instanceURL); 389 if ((((target.indexOf(parentUrl)) != -1) && 390 (target.indexOf(jndiBase) == -1)) || 391 (target.indexOf(PARENT_INSTANCE) == 0) || 392 ((target.indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0) && 393 !NamingDirector.getInstance().isPrimary())) { 394 deliverToParent(message); 395 return false; 396 } else if (((instancePos != -1) && ( 397 target.indexOf(NamingConstants.SUBCONTEXT,(instancePos + 398 instanceURL.length())) != -1)) || 399 (target.indexOf(NamingConstants.SUBCONTEXT) == 0)) { 400 deliverToChild(message.getTarget(),message); 401 return false; 402 } else if (instancePos != -1) { 403 utw.begin(); 404 target = target.substring(instancePos + instanceURL.length()); 405 message.setTarget(target); 406 messageProcessInfo.getMessageManager().updateMessage(message); 407 utw.commit(); 408 messageProcessInfo.getMessageQueue().pushBackMessage( 409 messageProcessInfo.getMessageManager()); 410 ProcessMonitor.getInstance().notifyProcessor(); 411 return false; 412 } 413 return true; 414 } catch (MessageServiceException ex) { 415 throw ex; 416 } catch (Exception ex) { 417 log.error("Failed to check the message is local : " + 418 ex.getMessage(),ex); 419 throw new MessageServiceException( 420 "Failed to check the message is local : " + 421 ex.getMessage(),ex); 422 } finally { 423 utw.release(); 424 } 425 } 426 427 428 435 private boolean checkIfReplyLocal(Message message) throws 436 MessageServiceException { 437 try { 438 String reply = message.getReplyTo(); 439 if (reply == null) { 440 reply = message.getFrom(); 441 if (reply == null) { 442 message.addError(Message.ERROR, 443 "There is no reply for this message"); 444 initUndeliverableProcess(message); 445 return false; 446 } 447 } 448 449 String jndiBase = NamingDirector.getInstance().getJNDIBase() + "/"; 451 String parentUrl = NamingDirector.getInstance().getPrimaryJNDIUrl(); 452 String instanceURL = NamingDirector.getInstance(). 453 getInstanceId() + "/"; 454 int pos = reply.indexOf(jndiBase); 455 int instancePos = reply.indexOf(instanceURL); 456 if ((((reply.indexOf(parentUrl)) != -1) && 457 (reply.indexOf(jndiBase) == -1)) || 458 (reply.indexOf(PARENT_INSTANCE) == 0) || 459 ((reply.indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0) && 460 !NamingDirector.getInstance().isPrimary())) { 461 deliverToParent(message); 462 return false; 463 } else if (((instancePos != -1) && ( 464 reply.indexOf(NamingConstants.SUBCONTEXT,(instancePos + 465 instanceURL.length())) != -1)) || 466 (reply.indexOf(NamingConstants.SUBCONTEXT) == 0)) { 467 deliverToChild(reply,message); 468 return false; 469 } else if (instancePos != -1) { 470 utw.begin(); 471 reply = reply.substring(instancePos + instanceURL.length()); 472 if (message.getReplyTo() != null) { 473 message.setReplyTo(reply); 474 } else { 475 message.setFrom(reply); 476 } 477 messageProcessInfo.getMessageManager().updateMessage(message); 478 utw.commit(); 479 messageProcessInfo.getMessageQueue().pushBackMessage( 480 messageProcessInfo.getMessageManager()); 481 ProcessMonitor.getInstance().notifyProcessor(); 482 return false; 483 } 484 return true; 485 } catch (MessageServiceException ex) { 486 throw ex; 487 } catch (Exception ex) { 488 log.error("Failed to check the message is local : " + 489 ex.getMessage(),ex); 490 throw new MessageServiceException( 491 "Failed to check the message is local : " + 492 ex.getMessage(),ex); 493 } 494 } 495 496 497 504 private boolean checkIfServiceLocal(Message message) throws 505 MessageServiceException { 506 try { 507 String target = message.getTarget(); 508 if (target != null) { 509 return checkIfTargetLocal(message); 510 } 511 512 String [] services = message.getServices(); 513 if (services == null) { 514 message.addError(Message.ERROR, 515 "There are no services for this message"); 516 initUndeliverableProcess(message); 517 return false; 518 } 519 List serviceList = new ArrayList (); 520 for (int index = 0; index < services.length; index++) { 521 serviceList.add(services[index]); 522 } 523 ServiceBroker serviceBroker = (ServiceBroker)ConnectionManager. 524 getInstance().getConnection(ServiceBroker.class, 525 ServiceBroker.JNDI_URL); 526 String service = serviceBroker.getServiceProvider(serviceList); 527 if (service.length() != 0) { 528 message.setTarget(service); 529 utw.begin(); 530 messageProcessInfo.getMessageManager().updateMessage(message); 531 utw.commit(); 532 messageProcessInfo.getMessageQueue().pushBackMessage( 533 messageProcessInfo.getMessageManager()); 534 } else { 535 deliverToParent(message); 536 } 537 } catch (MessageServiceException ex) { 538 throw ex; 539 } catch (Exception ex) { 540 log.error("Failed to check if the services are local : " + 541 ex.getMessage(),ex); 542 throw new MessageServiceException( 543 "Failed to check if the services are local : " + 544 ex.getMessage(),ex); 545 } finally { 546 utw.release(); 547 } 548 return false; 549 } 550 551 552 559 private boolean checkIfMessageInQueue(Message message) throws 560 MessageServiceException { 561 try { 562 if (messageProcessInfo.getMessageQueue().getName().equals( 563 message.getTarget())) { 564 return true; 565 } 566 utw.begin(); 567 if (message.getTarget().equals(MessageService.JNDI_URL) && 568 (message.getTargetNamedQueue() != null)) { 569 if (false == NamedQueueManagerImpl.getInstance(). 570 checkForNamedQueue(message.getTargetNamedQueue(),false)){ 571 utw.release(); 572 message.addError(Message.ERROR,"The named queue [" + 573 message.getTargetNamedQueue() + "] does not exist."); 574 initUndeliverableProcess(message); 575 return false; 576 } 577 messageProcessInfo.getMessageQueue().removeMessage( 578 message.getMessageId()); 579 ((MessageManagerImpl)messageProcessInfo.getMessageManager()). 580 assignToQueue(message.getTargetNamedQueue()); 581 NamedMemoryQueue.getInstance( 582 message.getTargetNamedQueue()). 583 addMessage(messageProcessInfo.getMessageManager()); 584 } else { 585 MessageQueue messageQueue = MessageQueueManager.getInstance(). 586 getQueue(message.getTarget()); 587 messageProcessInfo.getMessageQueue().removeMessage( 588 message.getMessageId()); 589 ((MessageManagerImpl)messageProcessInfo.getMessageManager()). 590 assignToQueue(message.getTarget()); 591 messageQueue.addMessage(messageProcessInfo.getMessageManager()); 592 } 593 utw.commit(); 594 return false; 595 } catch (Exception ex) { 596 log.error("Failed to check the target : " + ex.getMessage(),ex); 597 throw new MessageServiceException( 598 "Failed to check the target : " + ex.getMessage(),ex); 599 } finally { 600 utw.release(); 601 } 602 } 603 604 605 612 private boolean checkIfReplyMessageInQueue(Message message) throws 613 MessageServiceException { 614 try { 615 String reply = message.getReplyTo(); 616 if (reply == null) { 617 reply = message.getFrom(); 618 if (reply == null) { 619 message.addError(Message.ERROR, 620 "There is no reply for this message"); 621 initUndeliverableProcess(message); 622 return false; 623 } 624 } 625 626 if (messageProcessInfo.getMessageQueue().getName().equals( 627 reply)) { 628 return true; 629 } 630 utw.begin(); 631 if (reply.equals(MessageService.JNDI_URL) && 632 (message.getTargetNamedQueue() != null)) { 633 if (false == NamedQueueManagerImpl.getInstance(). 634 checkForNamedQueue(message.getReplyNamedQueue(),false)){ 635 utw.release(); 636 message.addError(Message.ERROR,"The named queue [" + 637 message.getReplyNamedQueue() + "] does not exist."); 638 initUndeliverableProcess(message); 639 return false; 640 } 641 messageProcessInfo.getMessageQueue().removeMessage( 642 message.getMessageId()); 643 ((MessageManagerImpl)messageProcessInfo.getMessageManager()). 644 assignToQueue(message.getReplyNamedQueue()); 645 NamedMemoryQueue.getInstance( 646 message.getTargetNamedQueue()). 647 addMessage(messageProcessInfo.getMessageManager()); 648 } else { 649 MessageQueue messageQueue = MessageQueueManager.getInstance(). 650 getQueue(reply); 651 messageProcessInfo.getMessageQueue().removeMessage( 652 message.getMessageId()); 653 ((MessageManagerImpl)messageProcessInfo.getMessageManager()). 654 assignToQueue(reply); 655 messageQueue.addMessage(messageProcessInfo.getMessageManager()); 656 } 657 utw.commit(); 658 return false; 659 } catch (Exception ex) { 660 log.error("Failed to check the reply queue : " + ex.getMessage(),ex); 661 throw new MessageServiceException( 662 "Failed to check the reply queue : " + ex.getMessage(),ex); 663 } finally { 664 utw.release(); 665 } 666 } 667 668 669 676 private void cloneMessageForServices(Message message) throws 677 MessageServiceException { 678 try { 679 String [] services = message.getServices(); 680 List serviceList = new ArrayList (); 681 for (int index = 0; index < services.length; index++) { 682 serviceList.add(services[index]); 683 } 684 ServiceBroker serviceBroker = (ServiceBroker)ConnectionManager. 685 getInstance().getConnection(ServiceBroker.class, 686 ServiceBroker.JNDI_URL); 687 List daemonList = serviceBroker.getServiceProviders(serviceList); 688 if (daemonList.size() == 0) { 689 message.addError(Message.ERROR, 690 "There are no daemon providing these services."); 691 initUndeliverableProcess(message); 692 return; 693 } 694 695 utw.begin(); 696 for (int index = 0; index < daemonList.size(); index++) { 697 MessageImpl newMessage = 698 (MessageImpl)((MessageImpl)message).clone(); 699 newMessage.setMessageId(RandomGuid.getInstance().getGuid()); 700 newMessage.setTarget((String )daemonList.get(index)); 701 newMessage.setMessageType(Message.POINT_TO_POINT); 702 newMessage.setNextProcessDate(new Date ()); 703 MessageManager messageManager = MessageManagerFactory.getInstance(). 704 getMessageManager(newMessage); 705 MessageQueue messageQueue = MessageQueueManager.getInstance(). 706 getQueue(MessageQueueManager.UNSORTED); 707 ((MessageManagerImpl)messageManager).assignToQueue( 708 MessageQueueManager.UNSORTED); 709 messageQueue.addMessage(messageManager); 710 } 711 messageProcessInfo.getMessageManager().remove(); 712 messageProcessInfo.getMessageQueue().removeMessage( 713 message.getMessageId()); 714 utw.commit(); 715 } catch (Exception ex) { 716 log.error("Failed to clone the messages : " + ex.getMessage(),ex); 717 throw new MessageServiceException( 718 "Failed to clone the messages : " + ex.getMessage(),ex); 719 } finally { 720 utw.release(); 721 } 722 } 723 724 725 731 private void deliverToParent(Message message) throws 732 MessageServiceException { 733 try { 734 if (namingDirector.isPrimary()) { 735 message.addError(Message.ERROR, 736 "The primary has no parent cannot go further."); 737 initUndeliverableProcess(message); 738 return; 739 } 740 Message messageCopy = (Message)((MessageImpl)message).clone(); 741 if (message.getTarget() != null) { 742 messageCopy.setFrom(downJNDIUrl(message.getTarget())); 743 } 744 if (message.getReplyTo() != null) { 745 messageCopy.setReplyTo(downJNDIUrl(message.getReplyTo())); 746 } 747 if (message.getFrom() != null) { 748 messageCopy.setFrom(downJNDIUrl(message.getFrom())); 749 } 750 751 log.debug("Deliver message to parent : " + message.getMessageId()); 752 utw.begin(); 753 MessageStore messageStore = (MessageStore)ConnectionManager. 754 getInstance().getConnection(MessageStore.class, 755 namingDirector.getPrimaryJNDIUrl() + "/" + 756 MessageStore.JNDI_URL); 757 messageProcessInfo.getMessageManager().remove(); 758 messageProcessInfo.getMessageQueue().removeMessage( 759 message.getMessageId()); 760 log.debug("The message has been deliverd to parent committing : " + 761 message.getMessageId()); 762 IDLock.getInstance().lock(message.getMessageId()); 763 messageStore.addMessage(messageCopy); 764 utw.commit(); 765 log.debug("Delivered message to parent : " + message.getMessageId()); 766 } catch (Exception ex) { 767 log.error("Failed to deliver to a the parent : " + 768 ex.getMessage(),ex); 769 throw new MessageServiceException( 770 "Failed to deliver to a the parent : " + 771 ex.getMessage(),ex); 772 } finally { 773 utw.release(); 774 } 775 } 776 777 778 784 private void deliverToChild(String target, Message message) throws 785 MessageServiceException { 786 try { 787 Message messageCopy = (Message)((MessageImpl)message).clone(); 788 if (message.getTarget() != null) { 789 messageCopy.setTarget(upJNDIUrl(target,message.getTarget())); 790 } 791 if (message.getReplyTo() != null) { 792 messageCopy.setReplyTo(upJNDIUrl(target,message.getReplyTo())); 793 } 794 if (message.getFrom() != null) { 795 messageCopy.setFrom(upJNDIUrl(target,message.getFrom())); 796 } 797 798 String subContextUrl = NamingConstants.SUBCONTEXT + "/"; 799 if (target.contains(namingDirector.getInstanceId())) { 800 subContextUrl = namingDirector.getInstanceId() + "/" + 801 NamingConstants.SUBCONTEXT + "/"; 802 } 803 int pos = target.indexOf(subContextUrl); 804 if (pos == -1) { 805 message.addError(Message.ERROR, 806 "Cannot find the sub reference information : " + target); 807 initUndeliverableProcess(message); 808 return; 809 } 810 String subContext = target.substring(pos + subContextUrl.length()); 811 subContext = NamingConstants.SUBCONTEXT + "/" + 812 subContext.substring(0,subContext.indexOf('/')) + "/" + 813 MessageStore.JNDI_URL; 814 log.debug("Deliver message to child : " + message.getMessageId()); 815 utw.begin(); 816 MessageStore messageStore = (MessageStore)ConnectionManager. 817 getInstance().getConnection(MessageStore.class, 818 subContext); 819 messageProcessInfo.getMessageManager().remove(); 820 messageProcessInfo.getMessageQueue().removeMessage( 821 message.getMessageId()); 822 IDLock.getInstance().lock(message.getMessageId()); 823 messageStore.addMessage(messageCopy); 824 log.debug("The message has been deliverd to child committing : " + 825 message.getMessageId()); 826 utw.commit(); 827 log.debug("Delivered message to child : " + message.getMessageId()); 828 } catch (Exception ex) { 829 log.error("Failed to deliver to a the child : " + 830 ex.getMessage(),ex); 831 throw new MessageServiceException( 832 "Failed to deliver to a the child : " + 833 ex.getMessage(),ex); 834 } finally { 835 utw.release(); 836 } 837 } 838 839 840 846 private void deliverMessage(Message message) throws 847 MessageServiceException { 848 initUserSession(message); 849 try { 850 if (message instanceof RPCMessage) { 851 deliverRPCMessage(message.getTarget(),message); 852 } else if (message instanceof TextMessage) { 853 deliverTextMessage(message.getTarget(),message); 854 } 855 } finally { 856 releaseUserSession(); 857 } 858 } 859 860 861 867 private void deliverReplyMessage(Message message) throws 868 MessageServiceException { 869 initUserSession(message); 870 try { 871 String reply = message.getReplyTo(); 872 if (reply == null) { 873 reply = message.getFrom(); 874 if (reply == null) { 875 message.addError(Message.ERROR, 876 "There is no reply for this message"); 877 initUndeliverableProcess(message); 878 return; 879 } 880 } 881 if (message instanceof RPCMessage) { 882 deliverReplyRPCMessage(reply,message); 883 } else if (message instanceof TextMessage) { 884 deliverReplyTextMessage(reply,message); 885 } 886 } finally { 887 releaseUserSession(); 888 } 889 } 890 891 892 898 private void deliverRPCMessage(String target, Message message) throws 899 MessageServiceException { 900 ClassLoader original = null; 901 try { 902 Object ref = null; 903 if (((ref = BeanConnector.getInstance().getBean(target)) == null) && 904 ((ref = JMXBeanConnector.getInstance().getJMXBean(target)) 905 == null)) { 906 message.addError(Message.ERROR,"The target [" + target 907 + "] does not exist."); 908 initUndeliverableProcess(message); 909 return; 910 } 911 original = Thread.currentThread().getContextClassLoader(); 912 Thread.currentThread().setContextClassLoader( 913 ref.getClass().getClassLoader()); 914 915 916 RPCMessageImpl rpcMessageImpl = 917 (RPCMessageImpl)((RPCMessageImpl)message).clone(); 918 Method method = null; 919 try { 920 method = ref.getClass().getMethod(rpcMessageImpl.getMethodName(), 921 rpcMessageImpl.getArgumentTypes()); 922 } catch (Exception ex) { 923 log.error("Failed to find the method on the target : " 924 + ex.getMessage(),ex); 925 if (original != null) { 926 Thread.currentThread().setContextClassLoader(original); 927 original = null; 928 } 929 message.addError(Message.ERROR, 930 "Failed to find the method on the target : " 931 + ex.getMessage()); 932 initUndeliverableProcess(message); 933 return; 934 } 935 utw.begin(); 936 937 try { 938 Object result = method.invoke(ref,rpcMessageImpl.getArguments()); 939 ((RPCMessage)message).setResult(result); 940 } catch (Throwable ex) { 941 log.error("Caught an exception : " 942 + ex.getMessage(),ex); 943 ((RPCMessage)message).setThrowable(ex); 944 } 945 if (original != null) { 946 Thread.currentThread().setContextClassLoader(original); 947 original = null; 948 } 949 if (message.getReply()) { 950 log.info("Init the process to deliver to the sender : " + 951 message.getMessageId()); 952 ((RPCMessageImpl)message).setState(Message.DELIVERED); 953 messageProcessInfo.getMessageManager().updateMessage(message); 954 messageProcessInfo.getMessageQueue().removeMessage( 955 message.getMessageId()); 956 MessageQueue messageQueue = MessageQueueManager.getInstance(). 957 getQueue(MessageQueueManager.UNSORTED); 958 ((MessageManagerImpl)messageProcessInfo.getMessageManager()). 959 assignToQueue(MessageQueueManager.UNSORTED); 960 messageQueue.addMessage(messageProcessInfo.getMessageManager()); 961 } else { 962 log.info("Removing the completed rpc message : " + 963 message.getMessageId()); 964 messageProcessInfo.getMessageManager().remove(); 965 messageProcessInfo.getMessageQueue().removeMessage( 966 message.getMessageId()); 967 } 968 utw.commit(); 969 } catch (Exception ex) { 970 log.error("Failed to deliver the RPC Message : " 971 + ex.getMessage(),ex); 972 throw new MessageServiceException( 973 "Failed to deliver the RPC Message : " 974 + ex.getMessage(),ex); 975 } finally { 976 utw.release(); 977 if (original != null) { 978 Thread.currentThread().setContextClassLoader(original); 979 } 980 } 981 } 982 983 984 991 private void deliverReplyRPCMessage(String reply, Message message) throws 992 MessageServiceException { 993 ClassLoader original = null; 994 try { 995 Object ref = null; 996 if (((ref = BeanConnector.getInstance().getBean(reply)) == null) && 997 ((ref = JMXBeanConnector.getInstance().getJMXBean(reply)) 998 == null)) { 999 message.addError(Message.ERROR,"The reply [" + reply 1000 + "] does not exist."); 1001 initUndeliverableProcess(message); 1002 return; 1003 } 1004 original = Thread.currentThread().getContextClassLoader(); 1005 Thread.currentThread().setContextClassLoader( 1006 ref.getClass().getClassLoader()); 1007 1008 utw.begin(); 1009 RPCMessage rpcMessage = (RPCMessage)message; 1010 try { 1011 if (rpcMessage.generatedException()) { 1012 Method method = ref.getClass().getMethod("onFailure", 1013 new Class [] {String .class,String .class, 1014 Throwable .class}); 1015 Throwable ex = rpcMessage.getThrowable(); 1016 if (ex instanceof 1017 java.lang.reflect.InvocationTargetException ) { 1018 ex = ((java.lang.reflect.InvocationTargetException )ex). 1019 getCause(); 1020 } 1021 method.invoke(ref,new Object [] {rpcMessage.getMessageId(), 1022 rpcMessage.getCorrelationId(),ex}); 1023 } else { 1024 Method method = ref.getClass().getMethod("onSuccess", 1025 new Class [] {String .class,String .class, 1026 Object .class}); 1027 method.invoke(ref,new Object [] {rpcMessage.getMessageId(), 1028 rpcMessage.getCorrelationId(), 1029 rpcMessage.getResult()}); 1030 } 1031 } catch (Exception ex) { 1032 log.error("Failed to deliver the message [" 1033 + reply + "] to the AsyncCallbackHandler method : " + 1034 ex.getMessage(),ex); 1035 utw.release(); 1036 message.addError(Message.ERROR,"Failed to deliver the message [" 1037 + reply + "] to the AsyncCallbackHandler method : " + 1038 ex.getMessage()); 1039 initUndeliverableProcess(message); 1040 return; 1041 } 1042 if (original != null) { 1043 Thread.currentThread().setContextClassLoader(original); 1044 original = null; 1045 } 1046 log.info("Removing the completed rpc message : " + 1047 rpcMessage.getMessageId()); 1048 messageProcessInfo.getMessageManager().remove(); 1049 messageProcessInfo.getMessageQueue().removeMessage( 1050 rpcMessage.getMessageId()); 1051 1052 utw.commit(); 1053 } catch (Exception ex) { 1054 log.error("Failed to deliver the reply RPC Message : " 1055 + ex.getMessage(),ex); 1056 throw new MessageServiceException( 1057 "Failed to deliver the reply RPC Message : " 1058 + ex.getMessage(),ex); 1059 } finally { 1060 utw.release(); 1061 if (original != null) { 1062 Thread.currentThread().setContextClassLoader(original); 1063 } 1064 } 1065 } 1066 1067 1068 1075 private void deliverTextMessage(String target, Message message) throws 1076 MessageServiceException { 1077 try { 1078 MessageHandler messageHandler = (MessageHandler)ConnectionManager. 1079 getInstance().getConnection(MessageHandler.class, 1080 target); 1081 utw.begin(); 1082 Message result = messageHandler.processMessage(message); 1083 result.incrementRetries(); 1084 if (result.isAcknowledged() && result.getReply() && 1085 (message.getState() == Message.UNDELIVERED)) { 1086 log.info("Init the process to deliver to the sender : " + 1087 message.getMessageId()); 1088 ((MessageImpl)result).setState(Message.DELIVERED); 1089 messageProcessInfo.getMessageManager().updateMessage(result); 1090 messageProcessInfo.getMessageQueue().removeMessage( 1091 message.getMessageId()); 1092 MessageQueue messageQueue = MessageQueueManager.getInstance(). 1093 getQueue(MessageQueueManager.UNSORTED); 1094 ((MessageManagerImpl)messageProcessInfo.getMessageManager()). 1095 assignToQueue(MessageQueueManager.UNSORTED); 1096 messageQueue.addMessage(messageProcessInfo.getMessageManager()); 1097 utw.commit(); 1098 } else if ((result.isAcknowledged() && !result.getReply()) || 1099 (result.isAcknowledged() && 1100 (message.getState() == Message.DELIVERED))){ 1101 log.info("Removing the completed text message : " + 1102 message.getMessageId()); 1103 messageProcessInfo.getMessageManager().remove(); 1104 messageProcessInfo.getMessageQueue().removeMessage( 1105 message.getMessageId()); 1106 utw.commit(); 1107 } else { 1108 Date nextDate = new Date (); 1109 nextDate.setTime(nextDate.getTime() + delay); 1110 ((MessageImpl)result).setNextProcessDate(nextDate); 1111 messageProcessInfo.getMessageManager().updateMessage(result); 1112 utw.commit(); 1113 messageProcessInfo.getMessageQueue().pushBackMessage( 1114 messageProcessInfo.getMessageManager()); 1115 } 1116 } catch (java.lang.ClassCastException ex) { 1117 log.error("Failed to deliver the text message ["+ 1118 message.getMessageId()+ "], " + 1119 "init the undeliverable process, as the target cannot be " + 1120 "spoken to correctly : " + ex.getMessage(),ex); 1121 message.addError(Message.ERROR,"Failed to deliver the text message : " 1122 + ex.getMessage()); 1123 initUndeliverableProcess(message); 1124 } catch (com.rift.coad.util.connection.NameNotFound ex) { 1125 log.error("Failed to deliver the text message ["+ 1126 message.getMessageId()+ "], " + 1127 "init the undeliverable process, " + 1128 "as the target name cannot be found : " 1129 + ex.getMessage(),ex); 1130 message.addError(Message.ERROR,"Failed to deliver the text message : " 1131 + ex.getMessage()); 1132 initUndeliverableProcess(message); 1133 } catch (Exception ex) { 1134 log.error("Failed to deliver the text message ["+ 1135 message.getMessageId()+ "] : " + ex.getMessage(),ex); 1136 throw new MessageServiceException( 1137 "Failed to deliver the text message : " 1138 + ex.getMessage(),ex); 1139 } finally { 1140 utw.release(); 1141 } 1142 } 1143 1144 1145 1152 private void deliverReplyTextMessage(String reply, Message message) throws 1153 MessageServiceException { 1154 try { 1155 MessageHandler messageHandler = (MessageHandler)ConnectionManager. 1156 getInstance().getConnection(MessageHandler.class, 1157 reply); 1158 utw.begin(); 1159 Message result = messageHandler.processMessage(message); 1160 result.incrementRetries(); 1161 if (result.isAcknowledged()){ 1162 log.info("Removing the completed text message : " + 1163 message.getMessageId()); 1164 messageProcessInfo.getMessageManager().remove(); 1165 messageProcessInfo.getMessageQueue().removeMessage( 1166 message.getMessageId()); 1167 utw.commit(); 1168 } else { 1169 Date nextDate = new Date (); 1170 nextDate.setTime(nextDate.getTime() + delay); 1171 ((MessageImpl)result).setNextProcessDate(nextDate); 1172 messageProcessInfo.getMessageManager().updateMessage(result); 1173 utw.commit(); 1174 messageProcessInfo.getMessageQueue().pushBackMessage( 1175 messageProcessInfo.getMessageManager()); 1176 } 1177 } catch (java.lang.ClassCastException ex) { 1178 log.error("Failed to deliver the text message : " 1179 + ex.getMessage(),ex); 1180 message.addError(Message.ERROR,"Failed to deliver the text message : " 1181 + ex.getMessage()); 1182 initUndeliverableProcess(message); 1183 } catch (com.rift.coad.util.connection.NameNotFound ex) { 1184 log.error("Failed to deliver the text message : " 1185 + ex.getMessage(),ex); 1186 message.addError(Message.ERROR,"Failed to deliver the text message : " 1187 + ex.getMessage()); 1188 initUndeliverableProcess(message); 1189 } catch (Exception ex) { 1190 log.error("Failed to deliver the text message : " 1191 + ex.getMessage(),ex); 1192 throw new MessageServiceException( 1193 "Failed to deliver the text message : " 1194 + ex.getMessage(),ex); 1195 } finally { 1196 utw.release(); 1197 } 1198 } 1199 1200 1201 1207 private String downJNDIUrl(String url) throws 1208 MessageServiceException { 1209 try { 1210 String instanceBase = NamingConstants.SUBCONTEXT + "/" 1211 + namingDirector.getInstanceId() + "/"; 1212 if (url.indexOf(PARENT_INSTANCE) == 0) { 1213 return url.substring(PARENT_INSTANCE.length()); 1214 } else if (url.contains(namingDirector.getPrimaryJNDIUrl()) || 1215 url.contains(namingDirector.getJNDIBase()) || 1216 url.contains(instanceBase) || 1217 (url.indexOf(NamingConstants.JNDI_NETWORK_PREFIX) == 0)) { 1218 return url; 1219 } else if (!url.contains(namingDirector.getInstanceId())) { 1220 return instanceBase + url; 1221 } else { 1222 return NamingConstants.SUBCONTEXT + "/" + url; 1223 } 1224 } catch (Exception ex) { 1225 log.error("Failed to move down the url : " 1226 + ex.getMessage(),ex); 1227 throw new MessageServiceException("Failed to move down the url : " 1228 + ex.getMessage(),ex); 1229 } 1230 } 1231 1232 1233 1239 private String upJNDIUrl(String target,String url) throws 1240 MessageServiceException { 1241 try { 1242 String updatedURL = url; 1243 String instanceBase = namingDirector.getInstanceId() + "/" + 1244 NamingConstants.SUBCONTEXT + "/"; 1245 String subContextUrl = NamingConstants.SUBCONTEXT + "/"; 1246 int pos = url.indexOf(subContextUrl); 1247 if (url.contains(instanceBase)) { 1248 updatedURL = url.substring(url.indexOf(instanceBase) + 1249 instanceBase.length()); 1250 pos = updatedURL.indexOf(subContextUrl); 1251 if (url.equals(target) && (pos == 0)) { 1252 updatedURL = updatedURL.substring(updatedURL.indexOf("/", 1253 pos + subContextUrl.length()) + 1); 1254 } 1255 } else if (url.equals(target) && (pos == 0)) { 1256 updatedURL = url.substring(url.indexOf("/", 1257 pos + subContextUrl.length()) + 1); 1258 } else { 1259 updatedURL = PARENT_INSTANCE + url; 1260 } 1261 return updatedURL; 1262 } catch (Exception ex) { 1263 log.error("Failed to modify the url to set it relative to the next " + 1264 "coadunation intance : " + ex.getMessage(),ex); 1265 throw new MessageServiceException("Failed to modify the url to " + 1266 "set it relative to the next coadunation intance : " + 1267 ex.getMessage(),ex); 1268 } 1269 } 1270 1271 1272 1278 private void initUndeliverableProcess(Message message) throws 1279 MessageServiceException { 1280 try { 1281 int currentState = message.getState(); 1282 utw.begin(); 1283 ((MessageImpl)message).setState(Message.UNDELIVERABLE); 1284 messageProcessInfo.getMessageManager().updateMessage(message); 1285 utw.commit(); 1286 utw.release(); 1287 if (currentState == Message.DELIVERED) { 1288 processUndeliverable(message); 1289 } else if (messageProcessInfo.getMessageQueue().getName().equals( 1290 MessageQueueManager.UNSORTED)) { 1291 messageProcessInfo.getMessageQueue().pushBackMessage( 1292 messageProcessInfo.getMessageManager()); 1293 } else { 1294 utw.begin(); 1295 messageProcessInfo.getMessageQueue().removeMessage( 1296 message.getMessageId()); 1297 MessageQueue messageQueue = MessageQueueManager.getInstance(). 1298 getQueue(MessageQueueManager.UNSORTED); 1299 ((MessageManagerImpl)messageProcessInfo.getMessageManager()). 1300 assignToQueue(MessageQueueManager.UNSORTED); 1301 messageQueue.addMessage(messageProcessInfo.getMessageManager()); 1302 utw.commit(); 1303 } 1304 1305 } catch (Exception ex) { 1306 log.error("Failed to init the undeliverable process :" + 1307 ex.getMessage(),ex); 1308 throw new MessageServiceException("Failed to init the " + 1309 "undeliverable process :" + ex.getMessage(),ex); 1310 } finally { 1311 utw.release(); 1312 } 1313 } 1314 1315 1316 1322 private void initUserSession(Message message) throws MessageServiceException { 1323 try { 1324 Session session = new Session(message.getMessageCreater(), 1325 message.getSessionId(), 1326 new HashSet (message.getMessagePrincipals())); 1327 getServerInterceptor().createSession(session); 1328 } catch (Exception ex) { 1329 log.error("Failed to setup the user session : " + 1330 ex.getMessage(),ex); 1331 throw new MessageServiceException( 1332 "Failed to setup the user session :" + ex.getMessage(),ex); 1333 } 1334 } 1335 1336 1337 1340 private void releaseUserSession() { 1341 try { 1342 getServerInterceptor().release(); 1343 } catch (Exception ex) { 1344 log.error("Failed to release the user session : " + 1345 ex.getMessage(),ex); 1346 } 1347 } 1348} 1349 | Popular Tags |