1 25 26 package org.objectweb.petals.jbi.messaging; 27 28 import java.io.IOException ; 29 import java.io.ObjectInputStream ; 30 import java.io.ObjectOutputStream ; 31 import java.io.Serializable ; 32 import java.net.URI ; 33 import java.util.HashMap ; 34 import java.util.Map ; 35 import java.util.Set ; 36 37 import javax.jbi.messaging.ExchangeStatus; 38 import javax.jbi.messaging.Fault; 39 import javax.jbi.messaging.MessageExchange; 40 import javax.jbi.messaging.MessagingException; 41 import javax.jbi.messaging.NormalizedMessage; 42 import javax.jbi.servicedesc.ServiceEndpoint; 43 import javax.xml.namespace.QName ; 44 45 import org.objectweb.petals.jbi.registry.ConsumerEndpoint; 46 47 52 public class MessageExchangeImpl implements MessageExchange, Cloneable , Serializable { 53 54 public static final String IN_MSG = "in"; 55 56 public static final URI IN_ONLY_PATTERN = URI 57 .create("http://www.w3.org/2004/08/wsdl/in-only"); 58 59 public static final URI IN_OPTIONAL_OUT_PATTERN = URI 60 .create("http://www.w3.org/2004/08/wsdl/in-opt-out"); 61 62 public static final URI IN_OUT_PATTERN = URI 63 .create("http://www.w3.org/2004/08/wsdl/in-out"); 64 65 public static final String OUT_MSG = "out"; 66 67 public static final URI ROBUST_IN_ONLY_PATTERN = URI 68 .create("http://www.w3.org/2004/08/wsdl/robust-in-only"); 69 70 private static final short SERIALIZE_ROLE_CONSUMER = 0; 71 72 private static final short SERIALIZE_ROLE_PROVIDER = 1; 73 74 private static final short SERIALIZE_STATUS_ACTIVE = 0; 75 76 private static final short SERIALIZE_STATUS_DONE = 1; 77 78 private static final short SERIALIZE_STATUS_ERROR = 2; 79 80 83 private static final long serialVersionUID = 1L; 84 85 protected ConsumerEndpoint consumerEndpoint; 86 87 protected ServiceEndpoint endpoint; 88 89 protected Exception error; 90 91 protected String exchangeId; 92 93 protected Fault fault; 94 95 101 protected QName interfaceName; 102 103 protected Map <String , NormalizedMessage> messages = new HashMap <String , NormalizedMessage>(); 104 105 protected QName operation; 106 107 protected URI pattern; 108 109 protected Map <String , Object > properties = new HashMap <String , Object >(); 110 111 protected transient Role role; 112 113 protected QName service; 114 115 protected transient ExchangeStatus status; 116 117 protected boolean terminated; 118 119 protected boolean transacted; 120 121 127 public MessageExchangeImpl(ConsumerEndpoint consumerEndpoint) { 128 129 status = ExchangeStatus.ACTIVE; 130 131 role = Role.CONSUMER; 132 133 this.consumerEndpoint = consumerEndpoint; 134 } 135 136 139 public Fault createFault() throws MessagingException { 140 return new FaultImpl(); 141 } 142 143 146 public NormalizedMessage createMessage() throws MessagingException { 147 return new NormalizedMessageImpl(); 148 } 149 150 public ConsumerEndpoint getConsumerEndpoint() { 151 return consumerEndpoint; 152 } 153 154 157 public ServiceEndpoint getEndpoint() { 158 return endpoint; 159 } 160 161 164 public Exception getError() { 165 return error; 166 } 167 168 171 public String getExchangeId() { 172 return exchangeId; 173 } 174 175 178 public Fault getFault() { 179 return fault; 180 } 181 182 public QName getInterfaceName() { 183 return interfaceName; 184 } 185 186 192 public NormalizedMessage getMessage(String name) { 193 if (name != null) { 194 return (NormalizedMessage) messages.get(name.toLowerCase()); 195 } else { 196 return null; 197 } 198 } 199 200 public Map <String , NormalizedMessage> getMessages() { 201 return messages; 202 } 203 204 207 public QName getOperation() { 208 return operation; 209 } 210 211 214 public URI getPattern() { 215 return pattern; 216 } 217 218 221 public Object getProperty(String name) { 222 return properties.get(name); 223 } 224 225 228 public Set getPropertyNames() { 229 return properties.keySet(); 230 } 231 232 237 public Role getRole() { 238 return role; 239 } 240 241 244 public QName getService() { 245 return service; 246 } 247 248 251 public javax.jbi.messaging.ExchangeStatus getStatus() { 252 return status; 253 } 254 255 public boolean isTerminated() { 256 return terminated; 257 } 258 259 262 public boolean isTransacted() { 263 return transacted; 264 } 265 266 269 public void setEndpoint(ServiceEndpoint endpoint) { 270 this.endpoint = endpoint; 271 } 272 273 276 public void setError(Exception error) { 277 this.error = error; 278 279 this.status = ExchangeStatus.ERROR; 280 } 281 282 286 public void setExchangeId(String exchangeId) { 287 this.exchangeId = exchangeId; 288 } 289 290 294 299 public void setFault(Fault fault) throws MessagingException { 300 checkNotTerminated(); 301 302 if (role == null) { 303 throw new MessagingException("The Role is not defined."); 304 } 305 306 if (this.fault != null) { 308 throw new MessagingException("A fault has already been set."); 309 } 310 311 if (ExchangeStatus.DONE.equals(status) 313 || ExchangeStatus.ERROR.equals(status)) { 314 throw new MessagingException( 315 "The MessageExchange state and the current Role do not allow this operation."); 316 } 317 318 322 if (MessageExchange.Role.CONSUMER.equals(role) 323 && !messages.containsKey(OUT_MSG)) { 324 throw new MessagingException( 325 "The MessageExchange state and the current Role do not allow this operation."); 326 } 327 328 this.fault = fault; 329 } 330 331 public void setInterfaceName(QName interfaceName) { 332 this.interfaceName = interfaceName; 333 } 334 335 346 public void setMessage(NormalizedMessage msg, String name) 347 throws MessagingException { 348 checkNotTerminated(); 349 350 if (msg == null) { 351 throw new MessagingException("NormalizedMessage must be non null."); 352 } 353 if (name == null || name.trim().length() == 0) { 354 throw new MessagingException( 355 "The message reference must be non null and non empty."); 356 } 357 358 checkPatternMatching(name); 360 361 checkRoleMatching(name); 363 364 if (messages.containsKey(name.toLowerCase())) { 366 throw new MessagingException( 367 "A message has already been set with the '" + name 368 + "' reference."); 369 } 370 371 messages.put(name.toLowerCase(), msg); 373 } 374 375 378 public void setOperation(QName name) { 379 this.operation = name; 380 } 381 382 386 public void setPattern(URI pattern) { 387 this.pattern = pattern; 388 } 389 390 394 public void setProperty(String name, Object obj) { 395 properties.put(name, obj); 396 } 397 398 public void setRole(Role role) { 399 this.role = role; 400 } 401 402 406 public void setService(QName service) { 407 this.service = service; 408 } 409 410 413 public void setStatus(ExchangeStatus status) throws MessagingException { 414 checkNotTerminated(); 415 416 if (ExchangeStatus.DONE.equals(status)) { 418 if (Role.CONSUMER.equals(role)) { 419 if (IN_ONLY_PATTERN.equals(pattern)) { 420 throw new MessagingException( 421 "The MessageExchange state does not allow this operation."); 422 } else if (IN_OUT_PATTERN.equals(pattern) && fault == null 423 && getMessage(OUT_MSG) == null) { 424 throw new MessagingException( 425 "The MessageExchange state does not allow this operation."); 426 } else if (IN_OPTIONAL_OUT_PATTERN.equals(pattern) 427 && fault == null && getMessage(OUT_MSG) == null) { 428 throw new MessagingException( 429 "The MessageExchange state does not allow this operation."); 430 } 431 } else { 432 if (ROBUST_IN_ONLY_PATTERN.equals(pattern) && fault != null) { 433 throw new MessagingException( 434 "The MessageExchange state does not allow this operation."); 435 } else if (IN_OUT_PATTERN.equals(pattern)) { 436 throw new MessagingException( 437 "The MessageExchange state does not allow this operation."); 438 } else if (IN_OPTIONAL_OUT_PATTERN.equals(pattern)) { 439 if (fault != null && getMessage(OUT_MSG) == null) { 440 throw new MessagingException( 441 "The MessageExchange state does not allow this operation."); 442 } else if (fault == null && getMessage(OUT_MSG) != null) { 443 throw new MessagingException( 444 "The MessageExchange state does not allow this operation."); 445 } 446 } 447 } 448 } 449 this.status = status; 450 } 451 452 public void setTerminated(boolean terminated) { 453 this.terminated = terminated; 454 } 455 456 460 public void setTransacted(boolean transacted) { 461 this.transacted = transacted; 462 } 463 464 public String toString() { 465 return "MessageExchange@" + getExchangeId(); 466 } 467 468 473 protected void checkNotTerminated() throws MessagingException { 474 if (isTerminated()) { 475 throw new MessagingException("The Exchange is terminated."); 476 } 477 } 478 479 488 protected void checkPatternMatching(String name) throws MessagingException { 489 if (pattern == null) { 490 throw new MessagingException("The MEP is not defined."); 491 } 492 493 if (name == null) { 494 throw new MessagingException("The reference name is not defined."); 495 } 496 497 if (IN_MSG.equalsIgnoreCase(name)) { 498 if (IN_ONLY_PATTERN.equals(pattern) 499 || ROBUST_IN_ONLY_PATTERN.equals(pattern) 500 || IN_OUT_PATTERN.equals(pattern) 501 || IN_OPTIONAL_OUT_PATTERN.equals(pattern)) { 502 return; 503 } 504 } else if (OUT_MSG.equalsIgnoreCase(name) 505 && ((IN_OUT_PATTERN.equals(pattern) || IN_OPTIONAL_OUT_PATTERN 506 .equals(pattern)))) { 507 return; 508 } 509 throw new MessagingException( 510 "the MessageExchange state does not allow this operation."); 511 } 512 513 522 protected void checkRoleMatching(String name) throws MessagingException { 523 if (role == null) { 524 throw new MessagingException("the Role is not defined."); 525 } 526 527 if (name == null) { 528 throw new MessagingException("The reference name is not defined."); 529 } 530 531 if (MessageExchange.Role.CONSUMER.equals(role)) { 532 if (IN_MSG.equalsIgnoreCase(name)) { 533 return; 534 } 535 } else if (MessageExchange.Role.PROVIDER.equals(role) 536 && OUT_MSG.equalsIgnoreCase(name)) { 537 return; 538 } 539 throw new MessagingException("The Role does not allow this operation."); 540 } 541 542 550 public void cleanMessages(){ 551 if(! ExchangeStatus.ACTIVE.equals(status)){ 554 messages.clear(); 555 fault=null; 556 } 557 if(fault != null){ 559 messages.clear(); 560 } 561 if(messages.containsKey(OUT_MSG) && messages.containsKey(IN_MSG)){ 563 messages.remove(IN_MSG); 564 } 565 } 566 567 570 protected void readObjectDelegate(ObjectInputStream s) throws IOException { 571 switch (s.readShort()) { 573 case 0: 574 role = MessageExchange.Role.CONSUMER; 575 break; 576 case 1: 577 role = MessageExchange.Role.PROVIDER; 578 break; 579 default: 580 break; 581 } 582 583 switch (s.readShort()) { 585 case 0: 586 status = ExchangeStatus.ACTIVE; 587 break; 588 case 1: 589 status = ExchangeStatus.DONE; 590 break; 591 case 2: 592 status = ExchangeStatus.ERROR; 593 break; 594 default: 595 break; 596 } 597 598 try { 599 s.defaultReadObject(); 600 } catch (ClassNotFoundException e) { 601 throw new IOException (e.getClass() + ":" + e.getMessage()); 602 } 603 } 604 605 608 protected void writeObjectDelegate(ObjectOutputStream s) throws IOException { 609 if (MessageExchange.Role.CONSUMER.equals(role)) { 611 s.writeShort(SERIALIZE_ROLE_CONSUMER); 612 } else { 613 s.writeShort(SERIALIZE_ROLE_PROVIDER); 614 } 615 616 if (ExchangeStatus.ACTIVE.equals(status)) { 618 s.writeShort(SERIALIZE_STATUS_ACTIVE); 619 } else if (ExchangeStatus.DONE.equals(status)) { 620 s.writeShort(SERIALIZE_STATUS_DONE); 621 } else { 622 s.writeShort(SERIALIZE_STATUS_ERROR); 623 } 624 s.defaultWriteObject(); 625 } 626 627 633 private void readObject(ObjectInputStream s) throws IOException { 634 readObjectDelegate(s); 635 } 636 637 643 private void writeObject(ObjectOutputStream s) throws IOException { 644 writeObjectDelegate(s); 645 } 646 647 653 public synchronized MessageExchangeImpl clone(){ 654 MessageExchangeImpl msg = null; 655 try{ 656 msg = (MessageExchangeImpl) super.clone(); 657 msg.consumerEndpoint = (ConsumerEndpoint)consumerEndpoint.clone(); 658 } catch (CloneNotSupportedException e) { 659 msg = null; 660 } 661 return msg; 662 } 663 } 664 | Popular Tags |