1 57 58 package org.apache.wsif.util.jms; 59 60 import java.io.Serializable ; 61 import java.util.HashMap ; 62 63 import javax.jms.Destination ; 64 import javax.jms.JMSException ; 65 import javax.jms.Message ; 66 import javax.jms.ObjectMessage ; 67 import javax.jms.Queue ; 68 import javax.jms.QueueConnection ; 69 import javax.jms.QueueReceiver ; 70 import javax.jms.QueueSender ; 71 import javax.jms.QueueSession ; 72 import javax.jms.Session ; 73 import javax.jms.TextMessage ; 74 import org.apache.wsif.WSIFException; 75 import org.apache.wsif.logging.Trc; 76 77 85 public class WSIFJMSDestination { 86 protected WSIFJMSFinder finder; 87 protected QueueConnection connection = null; 88 protected QueueSession session = null; 89 protected Queue readQ = null; 90 protected Queue writeQ = null; 91 protected QueueSender sender = null; 92 93 protected boolean asyncMode = false; 94 protected Queue syncTempQueue = null; 95 96 protected WSIFJMSProperties inProps; 97 protected WSIFJMSProperties outProps; 98 protected Message lastMessage = null; 99 protected long timeout; 100 protected String replyToName = null; 101 102 106 public WSIFJMSDestination(WSIFJMSFinder finder) throws WSIFException { 107 this(finder, WSIFJMSConstants.WAIT_FOREVER); 108 Trc.entry(this, finder); 109 Trc.exit(); 110 } 111 112 117 public WSIFJMSDestination(WSIFJMSFinder finder, long timeout) 118 throws WSIFException { 119 this(finder, null, timeout); 120 Trc.entry(this, finder, new Long (timeout)); 121 Trc.exit(); 122 } 123 124 130 public WSIFJMSDestination( 131 WSIFJMSFinder finder, 132 String altDestName, 133 long timeout) 134 throws WSIFException { 135 Trc.entry(this, finder, altDestName, new Long (timeout)); 136 137 inProps = new WSIFJMSProperties(WSIFJMSProperties.IN); 138 outProps = new WSIFJMSProperties(WSIFJMSProperties.OUT); 139 this.timeout = timeout; 140 this.finder = finder; 141 142 try { 143 connection = finder.getFactory().createQueueConnection(); 144 session = 145 connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 146 147 Destination initDest = finder.getInitialDestination(); 148 if (initDest != null && altDestName != null) 149 throw new WSIFException("Both jndiDestinationName and jmsproviderDestinationName cannot be specified"); 150 if (initDest == null && altDestName == null) 151 throw new WSIFException("Either jndiDestinationName or jmsproviderDestinationName must be specified"); 152 153 if (altDestName != null) 154 initDest = session.createQueue(altDestName); 155 156 writeQ = (Queue ) initDest; 157 readQ = null; 158 159 connection.start(); 160 161 } catch (JMSException je) { 162 Trc.exception(je); 163 throw WSIFJMSConstants.ToWsifException(je); 164 } 165 if (Trc.ON) 166 Trc.exit(deep()); 167 } 168 169 172 public void close() throws WSIFException { 173 Trc.entry(this); 174 try { 175 QueueSender sndr = sender; 176 QueueSession sssn = session; 177 QueueConnection cnnctn = connection; 178 179 sender = null; session = null; connection = null; 182 183 if (sndr != null) 184 sndr.close(); 185 if (sssn != null) 186 sssn.close(); 187 if (cnnctn != null) 188 cnnctn.close(); 189 } catch (JMSException je) { 190 Trc.exception(je); 191 throw WSIFJMSConstants.ToWsifException(je); 192 } 193 Trc.exit(); 194 } 195 196 199 public void finalize() throws WSIFException { 200 Trc.entry(this); 201 close(); 202 Trc.exit(); 203 } 204 205 210 public String send(String data) throws WSIFException { 211 Trc.entry(this, data); 212 String s = send(data, null); 213 Trc.exit(s); 214 return s; 215 } 216 217 223 public String send(String data, String id) throws WSIFException { 224 Trc.entry(this, data, id); 225 areWeClosed(); 226 try { 227 TextMessage msg = session.createTextMessage(); 228 msg.setText(data); 229 String s = send(msg, id, true); 230 Trc.exit(s); 231 return s; 232 } catch (JMSException je) { 233 Trc.exception(je); 234 throw WSIFJMSConstants.ToWsifException(je); 235 } 236 } 237 238 243 public String send(Serializable data) throws WSIFException { 244 Trc.entry(this, data); 245 String s = send(data, null); 246 Trc.exit(s); 247 return s; 248 } 249 250 256 public String send(Serializable data, String id) throws WSIFException { 257 Trc.entry(this, data, id); 258 areWeClosed(); 259 260 try { 261 ObjectMessage msg = session.createObjectMessage(); 262 msg.setObject(data); 263 String s = send(msg, id, true); 264 Trc.exit(s); 265 return s; 266 } catch (JMSException je) { 267 Trc.exception(je); 268 throw WSIFJMSConstants.ToWsifException(je); 269 } 270 } 271 272 280 public String send(Message msg, String id, boolean setReplyTo) 281 throws WSIFException { 282 283 Trc.entry(this, msg, id); 284 areWeClosed(); 285 286 String msgId = null; 287 boolean propsSet = true; 288 289 try { 290 if (sender == null) 291 sender = session.createSender(writeQ); 292 293 if (inProps.containsKey(WSIFJMSConstants.REPLY_TO)) { 296 String rto = (String ) inProps.get(WSIFJMSConstants.REPLY_TO); 297 setReplyToQueue(rto); 298 inProps.remove(WSIFJMSConstants.REPLY_TO); 299 msg.setJMSReplyTo(readQ); 300 } else if (setReplyTo) { 301 setReplyToQueue(); 302 msg.setJMSReplyTo(readQ); 303 } 304 305 if (id != null) 306 msg.setJMSCorrelationID(id); 307 inProps.set(sender, msg); 308 309 sender.send(msg); 310 msgId = msg.getJMSMessageID(); 311 312 } catch (JMSException je) { 313 Trc.exception(je); 314 throw WSIFJMSConstants.ToWsifException(je); 315 } finally { 316 if (propsSet) 319 sender = null; 320 inProps.clear(); 321 } 322 323 Trc.exit(msgId); 324 return msgId; 325 } 326 327 331 public String receive() throws WSIFException { 332 Trc.entry(this); 333 String s = receiveString(null); 334 Trc.exit(s); 335 return s; 336 } 337 338 342 public String receiveString(String id) throws WSIFException { 343 Trc.entry(this, id); 344 String s = receiveString( id, timeout ); 345 Trc.exit(s); 346 return s; 347 } 348 349 353 public String receiveString(String id, long timeout) throws WSIFException { 354 Trc.entry(this, id); 355 Message msg = receive(id, timeout); 356 String s = null; 357 try { 358 if (msg instanceof TextMessage ) 359 s = ((TextMessage ) msg).getText(); 360 else 361 throw new WSIFException( 362 "Reply message was not a TextMessage:msg=" 363 + (msg == null ? "null" : msg.toString())); 364 } catch (JMSException e) { 365 Trc.exception(e); 366 throw WSIFJMSConstants.ToWsifException(e); 367 } 368 Trc.exit(s); 369 return s; 370 } 371 372 377 public Message receive(String id) throws WSIFException { 378 Trc.entry(this, id); 379 Message msg = receive(id, timeout); 380 Trc.exit(msg); 381 return msg; 382 } 383 384 390 public Message receive(String id, long timeout) throws WSIFException { 391 Trc.entry(this, id); 392 areWeClosed(); 393 QueueReceiver rec = null; 394 Message msg = null; 395 396 try { 397 if (id != null) 398 rec = 399 session.createReceiver( 400 readQ, 401 WSIFJMSConstants.JMS_CORRELATION_ID + "='" + id + "'"); 402 else 403 rec = session.createReceiver(readQ); 404 405 msg = rec.receive(timeout); 406 setLastMessage(msg); 407 408 if (msg == null) 409 throw new WSIFException( 410 "Receive timed out on JMS queue " 411 + readQ.getQueueName() 412 + ", timeout " 413 + timeout); 414 } catch (JMSException e) { 415 Trc.exception(e); 416 throw WSIFJMSConstants.ToWsifException(e); 417 } finally { 418 try { 419 if (rec != null) 420 rec.close(); 421 } catch (Exception ignored) { 422 Trc.ignoredException(ignored); 423 } 424 } 425 426 Trc.exit(msg); 427 return msg; 428 } 429 430 433 public void setReplyToQueue() throws WSIFException { 434 Trc.entry(this); 435 areWeClosed(); 436 437 Queue tmp; 438 try { 439 if (syncTempQueue == null) 440 syncTempQueue = session.createTemporaryQueue(); 441 } catch (JMSException je) { 442 Trc.exception(je); 443 throw WSIFJMSConstants.ToWsifException(je); 444 } 445 446 readQ = syncTempQueue; 448 replyToName = null; 449 Trc.exit(); 450 } 451 452 456 public void setReplyToQueue(String replyTo) throws WSIFException { 457 Trc.entry(this, replyTo); 458 areWeClosed(); 459 460 if (replyTo == null || replyTo.length() == 0) { 461 setReplyToQueue(); 462 Trc.exit(); 463 return; 464 } 465 466 if (replyTo.equals(replyToName)) { 468 Trc.exit(); 469 return; 470 } 471 472 readQ = finder.findQueue(replyTo); 473 474 replyToName = replyTo; 475 Trc.exit(); 476 } 477 478 486 public void setAsyncMode(boolean b) throws WSIFException { 487 Trc.entry(this, b); 488 areWeClosed(); 489 if (asyncMode != b) { 490 asyncMode = b; 491 } 492 Trc.exit(); 493 } 494 495 499 public void setProperty(String name, Object value) throws WSIFException { 500 Trc.entry(this, name, value); 501 if (name != null && value != null) 502 inProps.put(name, value); 503 Trc.exit(); 504 } 505 506 510 public void setProperties(HashMap propMap) { 511 Trc.entry(this, propMap); 512 if (propMap != null && !propMap.isEmpty()) 513 inProps.putAll(propMap); 514 Trc.exit(); 515 } 516 517 520 public Object getProperty(String name) throws WSIFException { 521 Trc.entry(this, name); 522 if (lastMessage == null) { 523 Trc.exit(null); 524 return null; 525 } 526 527 if (outProps.isEmpty()) 528 outProps.getPropertiesFromMessage(lastMessage); 529 530 Object prop = null; 531 if (name != null) 532 prop = outProps.get(name); 533 534 Trc.exit(prop); 535 return prop; 536 } 537 538 541 public HashMap getProperties() throws WSIFException { 542 Trc.entry(this); 543 if (lastMessage == null) { 544 Trc.exit(null); 545 return null; 546 } 547 548 if (outProps.isEmpty()) 549 outProps.getPropertiesFromMessage(lastMessage); 550 if (!outProps.isEmpty()) { 551 Trc.exit(outProps); 552 return outProps; 553 } 554 Trc.exit(null); 555 return null; 556 } 557 558 protected void areWeClosed() throws WSIFException { 559 if (session == null) 560 throw new WSIFException("Cannot use a closed destination"); 561 } 562 563 public static Message createMessage(Session session, int msgType) 564 throws WSIFException { 565 Trc.entry(null, session, new Integer (msgType)); 566 Message jmsMsg = null; 567 568 try { 569 if (msgType 570 == org 571 .apache 572 .wsif 573 .wsdl 574 .extensions 575 .jms 576 .JMSConstants 577 .MESSAGE_TYPE_OBJECTMESSAGE) 578 jmsMsg = session.createObjectMessage(); 579 else if ( 580 msgType 581 == org 582 .apache 583 .wsif 584 .wsdl 585 .extensions 586 .jms 587 .JMSConstants 588 .MESSAGE_TYPE_TEXTMESSAGE) 589 jmsMsg = session.createTextMessage(); 590 else 591 throw new WSIFException("Unable to support message type"); 592 } catch (JMSException je) { 593 Trc.exception(je); 594 throw WSIFJMSConstants.ToWsifException(je); 595 } 596 Trc.exit(jmsMsg); 597 return jmsMsg; 598 } 599 600 public Message createMessage(int msgType) throws WSIFException { 601 Trc.entry(this, msgType); 602 Message m = createMessage(session, msgType); 603 Trc.exit(m); 604 return m; 605 } 606 607 615 public void setLastMessage(Message msg) { 616 Trc.entry(this, msg); 617 lastMessage = msg; 618 Trc.exit(); 619 } 620 621 public String deep() { 622 String buff = ""; 623 try { 624 buff = new String (this.toString() + "\n"); 625 buff += "finder: " + finder; 626 buff += " connection: " + connection; 627 buff += " session: " + session; 628 buff += " readQ: " + readQ; 629 buff += " writeQ: " + writeQ; 630 buff += " sender: " + sender; 631 buff += " asyncMode: " + asyncMode; 632 buff += " syncTempQueue: " + syncTempQueue; 633 buff += " inProps: " + inProps; 634 buff += " outProps: " + outProps; 635 buff += " lastMessage: " + lastMessage; 636 buff += " timeout: " + timeout; 637 buff += " replyToName: " + replyToName; 638 } catch (Exception e) { 639 Trc.exceptionInTrace(e); 640 } 641 return buff; 642 } 643 } | Popular Tags |