1 2 3 package Jt.jms; 4 5 import Jt.*; 6 import java.io.*; 7 import Jt.jndi.*; 8 import javax.jms.*; 9 import javax.naming.*; 10 11 14 15 public class JtJMSQueueAdapter extends JtAdapter implements MessageListener { 16 17 private String queue; 18 private String connectionFactory; 19 private long timeout = 1L; private Object subject = null; 21 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 22 private int priority = Message.DEFAULT_PRIORITY; 23 private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; 25 26 private transient JtJNDIAdapter jndiAdapter = null; 27 private transient boolean initted = false; 28 29 private transient Queue jmsQueue; 30 private transient QueueConnectionFactory qcFactory; 31 private transient QueueConnection queueConnection; 32 private transient QueueSession queueSession; 33 private transient QueueSender queueSender; 34 private transient QueueReceiver queueReceiver; 35 36 38 private void initial () { 39 JtMessage msg = new JtMessage ("JtLOOKUP"); 40 41 42 jndiAdapter = new JtJNDIAdapter (); 43 44 if (connectionFactory == null) { 45 handleError ("Attribute connectionFactory needs to be set."); 46 return; 47 } 48 49 msg.setMsgContent (connectionFactory); 50 51 qcFactory = (QueueConnectionFactory) sendMessage (jndiAdapter, msg); 52 53 if (qcFactory == null) 54 return; 55 56 if (queue == null) { 57 handleError ("Attribute queue needs to be set."); 58 return; 59 } 60 msg.setMsgContent (queue); 61 62 jmsQueue = (Queue) sendMessage (jndiAdapter, msg); 63 64 65 if (jmsQueue == null) 66 return; 67 68 try { 69 queueConnection = qcFactory.createQueueConnection (); 70 queueSession = queueConnection.createQueueSession (false, 71 Session.AUTO_ACKNOWLEDGE); 72 73 } catch (Exception e) { 74 handleException (e); 75 } 76 77 } 78 79 84 85 public void onMessage (Message message) { 86 JtMessage msg; 87 ObjectMessage omessage; 88 89 if (message == null) 90 return; 91 92 93 try { 94 95 omessage = (ObjectMessage) message; 96 msg = (JtMessage) omessage.getObject (); 97 98 if (subject == null) { 99 handleWarning ("JtJMSQueueAdapter.onMessage: attribute 'subject' needs to be set."); 100 return; 101 } 102 103 105 sendMessage (subject, msg); 106 107 } catch (Exception ex) { 108 handleException (ex); 109 } 110 } 111 112 121 122 public Object processMessage (Object message) { 123 String content; 124 String query; 125 JtMessage e = (JtMessage) message; 126 Object reply; 127 JtMessage msg; 128 129 130 if (e == null || (e.getMsgId() == null)) 131 return (null); 132 133 134 if (e.getMsgId().equals ("JtREMOVE")) { 135 return (null); 136 } 137 138 if (!initted) { 139 initial (); 140 initted = true; 141 } 142 143 144 if (e.getMsgId().equals("JtSEND")) { 145 msg = (JtMessage) e.getMsgContent (); 146 reply = sendJMSMessage (msg); 147 return (reply); 148 } 149 150 151 if (e.getMsgId().equals("JtSTART_LISTENING")) { 152 startListening (); 153 return (null); 154 } 155 156 if (e.getMsgId().equals("JtRECEIVE")) { 157 reply = receiveJMSMessage (); 158 return (reply); 159 } 160 161 163 if (e.getMsgId().equals("JtTEST_SENDER")) { 164 reply = testSender (); 165 return (reply); 166 } 167 168 170 if (e.getMsgId().equals("JtTEST_RECEIVER")) { 171 reply = testReceiver (); 172 return (reply); 173 } 174 175 176 handleError 177 ("processMessage: invalid message id:"+ 178 e.getMsgId()); 179 return (null); 180 } 181 182 183 184 188 189 public void setQueue (String queue) { 190 this.queue = queue; 191 } 192 193 194 197 198 public String getQueue () { 199 return (queue); 200 } 201 202 206 207 public void setTimeout (long timeout) { 208 this.timeout = timeout; 209 } 210 211 212 215 216 public long getTimeout () { 217 return (timeout); 218 } 219 220 221 226 227 public void setDeliveryMode (int deliveryMode) { 228 this.deliveryMode = deliveryMode; 229 } 230 231 232 235 236 public long getDeliveryMode () { 237 return (deliveryMode); 238 } 239 240 241 242 243 248 249 public void setPriority (int priority) { 250 this.priority = priority; 251 } 252 253 254 257 258 public long getPriority () { 259 return (priority); 260 } 261 262 263 268 269 public void setTimeToLive (long timeToLive) { 270 this.timeToLive = timeToLive; 271 } 272 273 274 277 278 public long getTimeToLive () { 279 return (timeToLive); 280 } 281 282 283 284 289 290 public void setSubject (Object subject) { 291 this.subject = subject; 292 } 293 294 295 299 300 public Object getSubject () { 301 return (subject); 302 } 303 304 305 309 310 public void setConnectionFactory (String connectionFactory) { 311 this.connectionFactory = connectionFactory; 312 } 313 314 315 318 319 public String getConnectionFactory () { 320 return (connectionFactory); 321 } 322 323 324 325 326 private Object testReceiver () { 327 String reply = "PASS"; 328 ObjectMessage message; 329 JtMessage msg; 330 331 332 333 335 msg = (JtMessage) sendMessage (this, new JtMessage ("JtRECEIVE")); 336 337 if (msg == null) { 338 System.out.println ("no more messages"); 339 return (reply); 340 } 341 342 System.out.println ("msgId:" + msg.getMsgId ()); 343 344 346 return (reply); 347 } 348 349 350 352 private Object sendJMSMessage (JtMessage msg) { 353 354 ObjectMessage omsg; 355 String reply = "PASS"; 356 357 358 if (msg == null) { 359 reply = "FAIL"; 360 return (reply); 361 } 362 363 try { 364 365 if (queueSender == null) 366 queueSender = queueSession.createSender (jmsQueue); 367 368 omsg = queueSession.createObjectMessage (); 369 omsg.setObject (msg); 370 371 queueSender.send (omsg, deliveryMode, priority, timeToLive); 374 } catch (Exception e) { 375 handleException (e); 376 reply = "FAIL"; 377 } 378 return (reply); 379 } 380 381 private void startListening () { 382 383 384 try { 385 if (queueReceiver == null) 386 queueReceiver = queueSession.createReceiver (jmsQueue); 387 388 389 if (queueConnection == null) { 390 handleError ("receiveJMSMessage:queueConnection is null"); 391 return; 392 } 393 394 396 queueReceiver.setMessageListener (this); 397 398 queueConnection.start (); 399 } catch (Exception ex) { 400 401 handleException (ex); 402 } 403 404 } 405 406 private JtMessage receiveJMSMessage () { 407 408 JtMessage msg = null; 409 ObjectMessage message; 410 411 412 try { 413 414 if (queueReceiver == null) 415 queueReceiver = queueSession.createReceiver (jmsQueue); 416 417 418 if (queueConnection == null) { 419 handleError ("receiveJMSMessage:queueConnection is null"); 420 return (null); 421 } 422 queueConnection.start (); 423 424 message = (ObjectMessage) queueReceiver.receive (timeout); 425 if (message != null) { 426 msg = (JtMessage) message.getObject (); 427 } 428 429 } catch (Exception e) { 430 handleException (e); 431 } 432 433 return (msg); 434 435 } 436 437 438 439 private Object testSender () { 440 String reply = "PASS"; 441 TextMessage message; 442 ObjectMessage omsg; 443 JtMessage msg = new JtMessage ("JtHELLO"); 444 JtMessage wrapper = new JtMessage ("JtSEND"); 445 446 wrapper.setMsgContent (msg); 449 450 return (sendMessage (this, wrapper)); 452 } 453 454 457 458 public static void main (String [] args) { 459 JtFactory main; 460 JtJMSQueueAdapter jmsAdapter; 461 462 main = new JtFactory (); 463 464 465 jmsAdapter = (JtJMSQueueAdapter) main.createObject ("Jt.jms.JtJMSQueueAdapter", "jmsAdapter"); 466 467 if (args.length < 1) { 468 System.err.println ("Usage: java Jt.jms.JtJMSQueueAdapter -s or java Jt.jms.JtJMSQueueAdapter -r"); 469 System.exit (1); 470 } 471 472 if (args[0].equals ("-s")) { 473 main.sendMessage (jmsAdapter, new JtMessage ("JtTEST_SENDER")); 474 System.exit (0); 475 } 476 477 if (args[0].equals ("-r")) { 478 main.sendMessage (jmsAdapter, new JtMessage ("JtTEST_RECEIVER")); 479 System.exit (0); 480 } 481 482 System.err.println ("Usage: java Jt.jms.JtJMSQueueAdapter -s or java Jt.jms.JtJMSQueueAdapter -r"); 483 484 main.removeObject ("jtAdapter"); 485 486 } 487 } | Popular Tags |