1 17 package org.apache.servicemix.jbi.messaging; 18 19 import org.apache.commons.logging.Log; 20 import org.apache.commons.logging.LogFactory; 21 import org.apache.servicemix.JbiConstants; 22 import org.apache.servicemix.jbi.container.ActivationSpec; 23 import org.apache.servicemix.jbi.framework.ComponentContextImpl; 24 import org.apache.servicemix.jbi.framework.ComponentNameSpace; 25 import org.apache.servicemix.jbi.jaxp.SourceTransformer; 26 import org.w3c.dom.Node ; 27 28 import javax.jbi.messaging.ExchangeStatus; 29 import javax.jbi.messaging.Fault; 30 import javax.jbi.messaging.MessageExchange; 31 import javax.jbi.messaging.MessagingException; 32 import javax.jbi.messaging.NormalizedMessage; 33 import javax.jbi.servicedesc.ServiceEndpoint; 34 import javax.transaction.Transaction ; 35 import javax.xml.namespace.QName ; 36 import javax.xml.transform.dom.DOMSource ; 37 38 import java.io.Externalizable ; 39 import java.io.IOException ; 40 import java.io.ObjectInput ; 41 import java.io.ObjectOutput ; 42 import java.net.URI ; 43 import java.util.Set ; 44 45 51 public abstract class MessageExchangeImpl implements MessageExchange, Externalizable { 52 53 public static final int SYNC_STATE_ASYNC = 0; 54 public static final int SYNC_STATE_SYNC_SENT = 1; 55 public static final int SYNC_STATE_SYNC_RECEIVED = 2; 56 57 60 public static final int TX_STATE_NONE = 0; 61 66 public static final int TX_STATE_ENLISTED = 1; 67 72 public static final int TX_STATE_CONVEYED = 2; 73 74 protected static final int CAN_SET_IN_MSG = 0x00000001; 75 protected static final int CAN_SET_OUT_MSG = 0x00000002; 76 protected static final int CAN_SET_FAULT_MSG = 0x00000004; 77 protected static final int CAN_PROVIDER = 0x00000008; 78 protected static final int CAN_CONSUMER = 0x00000000; 79 protected static final int CAN_SEND = 0x00000010; 80 protected static final int CAN_STATUS_ACTIVE = 0x00000040; 81 protected static final int CAN_STATUS_DONE = 0x00000080; 82 protected static final int CAN_STATUS_ERROR = 0x00000100; 83 protected static final int CAN_OWNER = 0x00000200; 84 85 protected static final int STATES_CANS = 0; 86 protected static final int STATES_NEXT_OUT = 1; 87 protected static final int STATES_NEXT_FAULT = 2; 88 protected static final int STATES_NEXT_ERROR = 3; 89 protected static final int STATES_NEXT_DONE = 4; 90 91 public static final String FAULT = "fault"; 92 public static final String IN = "in"; 93 public static final String OUT = "out"; 94 95 private static final long serialVersionUID = -3639175136897005605L; 96 97 private static final Log log = LogFactory.getLog(MessageExchangeImpl.class); 98 99 protected ComponentContextImpl sourceContext; 100 protected ExchangePacket packet; 101 protected PojoMarshaler marshaler; 102 protected int state; 103 protected int syncState = SYNC_STATE_ASYNC; 104 protected int txState = TX_STATE_NONE; 105 protected int[][] states; 106 protected MessageExchangeImpl mirror; 107 protected transient boolean pushDeliver; 108 protected transient Object txLock; 109 110 115 public MessageExchangeImpl(String exchangeId, URI pattern, int[][] states) { 116 this.states = states; 117 this.packet = new ExchangePacket(); 118 this.packet.setExchangeId(exchangeId); 119 this.packet.setPattern(pattern); 120 } 121 122 protected MessageExchangeImpl(ExchangePacket packet, int[][] states) { 123 this.states = states; 124 this.packet = packet; 125 } 126 127 protected MessageExchangeImpl() { 128 } 129 130 protected void copyFrom(MessageExchangeImpl me) { 131 if (this != me) { 132 this.packet = me.packet; 133 this.state = me.state; 134 this.mirror.packet = me.packet; 135 this.mirror.state = me.mirror.state; 136 } 137 } 138 139 protected boolean can(int c) { 140 return (this.states[state][STATES_CANS] & c) == c; 141 } 142 143 147 public ActivationSpec getActivationSpec() { 148 if (sourceContext != null) { 149 return sourceContext.getActivationSpec(); 150 } 151 return null; 152 } 153 154 158 public ComponentContextImpl getSourceContext() { 159 return sourceContext; 160 } 161 162 166 public void setSourceContext(ComponentContextImpl sourceContext) { 167 this.sourceContext = sourceContext; 168 this.mirror.sourceContext = sourceContext; 169 } 170 171 174 public ExchangePacket getPacket(){ 175 return packet; 176 } 177 178 181 public URI getPattern() { 182 return packet.getPattern(); 183 } 184 185 188 public String getExchangeId() { 189 return packet.getExchangeId(); 190 } 191 192 195 public ExchangeStatus getStatus() { 196 if (this.packet.isAborted()) { 197 return ExchangeStatus.ERROR; 198 } 199 return this.packet.getStatus(); 200 } 201 202 208 public void setStatus(ExchangeStatus exchangeStatus) throws MessagingException { 209 if (!can(CAN_OWNER)) { 210 throw new IllegalStateException ("component is not owner"); 211 } 212 this.packet.setStatus(exchangeStatus); 213 214 } 215 216 221 public void setError(Exception exception) { 222 if (!can(CAN_OWNER)) { 223 throw new IllegalStateException ("component is not owner"); 224 } 225 this.packet.setError(exception); 226 } 227 228 231 public Exception getError() { 232 return packet.getError(); 233 } 234 235 238 public Fault getFault() { 239 return packet.getFault(); 240 } 241 242 248 public void setFault(Fault fault) throws MessagingException { 249 setMessage(fault, FAULT); 250 } 251 252 256 public NormalizedMessage createMessage() throws MessagingException { 257 return new NormalizedMessageImpl(this); 258 } 259 260 266 public Fault createFault() throws MessagingException { 267 return new FaultImpl(); 268 } 269 270 276 public NormalizedMessage getMessage(String name) { 277 if (IN.equals(name)) { 278 return packet.getIn(); 279 } else if (OUT.equals(name)) { 280 return packet.getOut(); 281 } else if (FAULT.equals(name)) { 282 return packet.getFault(); 283 } else { 284 return null; 285 } 286 } 287 288 295 public void setMessage(NormalizedMessage message, String name) throws MessagingException { 296 if (!can(CAN_OWNER)) { 297 throw new IllegalStateException ("component is not owner"); 298 } 299 if (message == null) { 300 throw new IllegalArgumentException ("message should not be null"); 301 } 302 if (name == null) { 303 throw new IllegalArgumentException ("name should not be null"); 304 } 305 name = name.toLowerCase(); 306 if (IN.equals(name)) { 307 if (!can(CAN_SET_IN_MSG)) { 308 throw new MessagingException("In not supported"); 309 } 310 if (packet.getIn() != null) { 311 throw new MessagingException("In message is already set"); 312 } 313 ((NormalizedMessageImpl) message).exchange = this; 314 packet.setIn((NormalizedMessageImpl) message); 315 } else if (OUT.equals(name)) { 316 if (!can(CAN_SET_OUT_MSG)) { 317 throw new MessagingException("Out not supported"); 318 } 319 if (packet.getOut() != null) { 320 throw new MessagingException("Out message is already set"); 321 } 322 ((NormalizedMessageImpl) message).exchange = this; 323 packet.setOut((NormalizedMessageImpl) message); 324 } else if (FAULT.equals(name)) { 325 if (!can(CAN_SET_FAULT_MSG)) { 326 throw new MessagingException("Fault not supported"); 327 } 328 if (!(message instanceof Fault)) { 329 throw new MessagingException("Setting fault, but message is not a fault"); 330 } 331 if (packet.getFault() != null) { 332 throw new MessagingException("Fault message is already set"); 333 } 334 ((NormalizedMessageImpl) message).exchange = this; 335 packet.setFault((FaultImpl) message); 336 } else { 337 throw new MessagingException("Message name must be in, out or fault"); 338 } 339 } 340 341 345 public Object getProperty(String name) { 346 if (JTA_TRANSACTION_PROPERTY_NAME.equals(name)) { 347 return packet.getTransactionContext(); 348 } else if (JbiConstants.PERSISTENT_PROPERTY_NAME.equals(name)) { 349 return packet.getPersistent(); 350 } else { 351 return packet.getProperty(name); 352 } 353 } 354 355 361 public void setProperty(String name, Object value) { 362 if (!can(CAN_OWNER)) { 363 throw new IllegalStateException ("component is not owner"); 364 } 365 if (name == null) { 366 throw new IllegalArgumentException ("name should not be null"); 367 } 368 if (JTA_TRANSACTION_PROPERTY_NAME.equals(name)) { 369 packet.setTransactionContext((Transaction ) value); 370 } else if (JbiConstants.PERSISTENT_PROPERTY_NAME.equals(name)) { 371 packet.setPersistent((Boolean ) value); 372 } else { 373 packet.setProperty(name, value); 374 } 375 } 376 377 380 public Set getPropertyNames(){ 381 return packet.getPropertyNames(); 382 } 383 384 389 public void setEndpoint(ServiceEndpoint endpoint) { 390 packet.setEndpoint(endpoint); 391 } 392 393 398 public void setService(QName name) { 399 packet.setServiceName(name); 400 } 401 402 407 public void setOperation(QName name) { 408 packet.setOperationName(name); 409 } 410 411 416 public void setInterfaceName(QName name) { 417 packet.setInterfaceName(name); 418 } 419 420 423 public ServiceEndpoint getEndpoint() { 424 return packet.getEndpoint(); 425 } 426 427 430 public QName getService() { 431 return packet.getServiceName(); 432 } 433 434 437 public QName getInterfaceName() { 438 return packet.getInterfaceName(); 439 } 440 441 444 public QName getOperation() { 445 return packet.getOperationName(); 446 } 447 448 451 public Transaction getTransactionContext() { 452 return packet.getTransactionContext(); 453 } 454 455 461 public void setTransactionContext(Transaction transaction) throws MessagingException { 462 packet.setTransactionContext(transaction); 463 } 464 465 468 public boolean isTransacted() { 469 return this.packet.getTransactionContext() != null; 470 } 471 472 475 public Role getRole() { 476 return can(CAN_PROVIDER) ? Role.PROVIDER : Role.CONSUMER; 477 } 478 479 482 public NormalizedMessage getInMessage() { 483 return this.packet.getIn(); 484 } 485 486 492 public void setInMessage(NormalizedMessage message) throws MessagingException { 493 setMessage(message, IN); 494 } 495 496 499 public NormalizedMessage getOutMessage() { 500 return getMessage(OUT); 501 } 502 503 509 public void setOutMessage(NormalizedMessage message) throws MessagingException { 510 setMessage(message, OUT); 511 } 512 513 516 public ComponentNameSpace getSourceId() { 517 return packet.getSourceId(); 518 } 519 522 public void setSourceId(ComponentNameSpace sourceId) { 523 packet.setSourceId(sourceId); 524 } 525 526 529 public ComponentNameSpace getDestinationId() { 530 return packet.getDestinationId(); 531 } 532 535 public void setDestinationId(ComponentNameSpace destinationId) { 536 packet.setDestinationId(destinationId); 537 } 538 539 public Boolean getPersistent() { 540 return packet.getPersistent(); 541 } 542 543 public void setPersistent(Boolean persistent) { 544 packet.setPersistent(persistent); 545 } 546 547 548 public PojoMarshaler getMarshaler() { 549 if (marshaler == null) { 550 marshaler = new DefaultMarshaler(); 551 } 552 return marshaler; 553 } 554 555 public void setMarshaler(PojoMarshaler marshaler) { 556 this.marshaler = marshaler; 557 } 558 559 public abstract void readExternal(ObjectInput in) throws IOException , ClassNotFoundException ; 560 561 public void writeExternal(ObjectOutput out) throws IOException { 562 packet.writeExternal(out); 563 out.write(state); 564 out.write(mirror.state); 565 out.writeBoolean(can(CAN_PROVIDER)); 566 } 567 568 public void handleSend(boolean sync) throws MessagingException { 569 if (!can(CAN_SEND)) { 571 throw new MessagingException("illegal call to send / sendSync"); 572 } 573 if (sync && getStatus() != ExchangeStatus.ACTIVE) { 574 throw new MessagingException("illegal call to sendSync"); 575 } 576 this.syncState = sync ? SYNC_STATE_SYNC_SENT : SYNC_STATE_ASYNC; 577 ExchangeStatus status = getStatus(); 579 if (status == ExchangeStatus.ACTIVE && !can(CAN_STATUS_ACTIVE)) { 580 throw new MessagingException("illegal exchange status: active"); 581 } 582 if (status == ExchangeStatus.DONE && !can(CAN_STATUS_DONE)) { 583 throw new MessagingException("illegal exchange status: done"); 584 } 585 if (status == ExchangeStatus.ERROR && !can(CAN_STATUS_ERROR)) { 586 throw new MessagingException("illegal exchange status: error"); 587 } 588 if (status == ExchangeStatus.ACTIVE && packet.getFault() == null) { 591 this.state = this.states[this.state][STATES_NEXT_OUT]; 592 } else if (status == ExchangeStatus.ACTIVE && packet.getFault() != null) { 593 this.state = this.states[this.state][STATES_NEXT_FAULT]; 594 } else if (status == ExchangeStatus.ERROR) { 595 this.state = this.states[this.state][STATES_NEXT_ERROR]; 596 } else if (status == ExchangeStatus.DONE) { 597 this.state = this.states[this.state][STATES_NEXT_DONE]; 598 } else { 599 throw new IllegalStateException ("unknown status"); 600 } 601 if (this.state < 0 || this.state >= this.states.length) { 602 throw new IllegalStateException ("next state is illegal"); 603 } 604 } 605 606 public void handleAccept() throws MessagingException { 607 ExchangeStatus status = getStatus(); 609 int nextState; 610 if (status == ExchangeStatus.ACTIVE && packet.getFault() == null) { 611 nextState = this.states[this.state][STATES_NEXT_OUT]; 612 } else if (status == ExchangeStatus.ACTIVE && packet.getFault() != null) { 613 nextState = this.states[this.state][STATES_NEXT_FAULT]; 614 } else if (status == ExchangeStatus.ERROR) { 615 nextState = this.states[this.state][STATES_NEXT_ERROR]; 616 } else if (status == ExchangeStatus.DONE) { 617 nextState = this.states[this.state][STATES_NEXT_DONE]; 618 } else { 619 throw new IllegalStateException ("unknown status"); 620 } 621 if (nextState < 0 || nextState >= this.states.length) { 622 throw new IllegalStateException ("next state is illegal"); 623 } 624 this.state = nextState; 625 } 626 627 public MessageExchangeImpl getMirror() { 628 return mirror; 629 } 630 631 public int getSyncState() { 632 return syncState; 633 } 634 635 public void setSyncState(int syncState) { 636 this.syncState = syncState; 637 } 638 639 642 public int getTxState() { 643 return txState; 644 } 645 646 649 public void setTxState(int txState) { 650 this.txState = txState; 651 } 652 653 public boolean isPushDelivery() { 654 return this.pushDeliver; 655 } 656 657 public void setPushDeliver(boolean b) { 658 this.pushDeliver = true; 659 } 660 661 664 public Object getTxLock() { 665 return txLock; 666 } 667 668 671 public void setTxLock(Object txLock) { 672 this.txLock = txLock; 673 } 674 675 public String toString() { 676 try { 677 StringBuffer sb = new StringBuffer (); 678 String name = getClass().getName(); 679 name = name.substring(name.lastIndexOf('.') + 1, name.length() - 4); 680 sb.append(name); 681 sb.append("[\n"); 682 sb.append(" id: ").append(getExchangeId()).append('\n'); 683 sb.append(" status: ").append(getStatus()).append('\n'); 684 sb.append(" role: ").append(getRole() == Role.CONSUMER ? "consumer" : "provider").append('\n'); 685 if (getInterfaceName() != null) { 686 sb.append(" interface: ").append(getInterfaceName()).append('\n'); 687 } 688 if (getService() != null) { 689 sb.append(" service: ").append(getService()).append('\n'); 690 } 691 if (getEndpoint() != null) { 692 sb.append(" endpoint: ").append(getEndpoint().getEndpointName()).append('\n'); 693 } 694 if (getOperation() != null) { 695 sb.append(" operation: ").append(getOperation()).append('\n'); 696 } 697 SourceTransformer st = new SourceTransformer(); 698 display("in", sb, st); 699 display("out", sb, st); 700 display("fault", sb, st); 701 if (getError() != null) { 702 sb.append(" error: "); 703 sb.append(getError()); 704 sb.append('\n'); 705 } 706 sb.append("]"); 707 return sb.toString(); 708 } catch (Exception e) { 709 log.trace("Error caught in toString", e); 710 return super.toString(); 711 } 712 } 713 714 public static final int maxMsgDisplaySize = 1500; 715 716 public static final boolean preserveContent = Boolean.getBoolean("org.apache.servicemix.preserveContent"); 717 718 private void display(String msg, StringBuffer sb, SourceTransformer st) { 719 if (getMessage(msg) != null) { 720 sb.append(" ").append(msg).append(": "); 721 try { 722 if (getMessage(msg).getContent() != null) { 723 if (preserveContent) { 724 sb.append(getMessage(msg).getContent().getClass()); 725 } else { 726 Node node = st.toDOMNode(getMessage(msg).getContent()); 727 getMessage(msg).setContent(new DOMSource (node)); 728 String str = st.toString(node); 729 if (str.length() > maxMsgDisplaySize) { 730 sb.append(str.substring(0, maxMsgDisplaySize)).append("..."); 731 } else { 732 sb.append(str); 733 } 734 } 735 } else { 736 sb.append("null"); 737 } 738 } catch (Exception e) { 739 sb.append("Unable to display: ").append(e); 740 } 741 sb.append('\n'); 742 } 743 } 744 745 } | Popular Tags |