| 1 2 3 package Jt.jms; 4 5 import Jt.*; 6 import java.io.*; 7 import Jt.jndi.*; 8 import javax.jms.*; 9 import javax.naming.*; 10 11 14 15 public class JtJMSTopicAdapter extends JtAdapter implements MessageListener { 16 17 private String topic; 18 private String connectionFactory; 19 private long timeout = 1L; private Object subject = null; 21 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 22 private int priority = Message.DEFAULT_PRIORITY; 23 private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; 25 private transient JtJNDIAdapter jndiAdapter = null; 26 private transient boolean initted = false; 27 28 private transient Topic jmsTopic; 29 private transient TopicConnectionFactory tcFactory; 30 private transient TopicConnection topicConnection; 31 private transient TopicSession topicSession; 32 private transient TopicPublisher topicPublisher; 33 private transient TopicSubscriber topicSubscriber; 34 35 37 private void initial () { 38 JtMessage msg = new JtMessage ("JtLOOKUP"); 39 40 41 jndiAdapter = new JtJNDIAdapter (); 42 43 if (connectionFactory == null) { 44 handleError ("Attribute value needs to be set (connectionFactory)"); 45 return; 46 } 47 48 msg.setMsgContent ("TestJMSConnectionFactory"); 49 50 tcFactory = (TopicConnectionFactory) sendMessage (jndiAdapter, msg); 51 52 if (tcFactory == null) 53 return; 54 55 if (topic == null) { 56 handleError ("Attribute value needs to be set (topic)"); 57 return; 58 } 59 msg.setMsgContent (topic); 60 61 jmsTopic = (Topic) sendMessage (jndiAdapter, msg); 62 63 64 if (jmsTopic == null) 65 return; 66 67 try { 68 topicConnection = tcFactory.createTopicConnection (); 69 topicSession = topicConnection.createTopicSession (false, 70 Session.AUTO_ACKNOWLEDGE); 71 72 } catch (Exception e) { 73 handleException (e); 74 } 75 76 } 77 78 79 84 85 public void onMessage (Message message) { 86 JtMessage msg; 87 ObjectMessage omessage; 88 89 if (message == null) 90 return; 91 92 93 try { 94 95 omessage = (ObjectMessage) message; 96 msg = (JtMessage) omessage.getObject (); 97 98 if (subject == null) { 99 handleWarning ("JtJMSAdapter.onMessage: the subject attribute needs to be set"); 100 return; 101 } 102 103 sendMessage (subject, msg); 104 105 } catch (Exception ex) { 106 handleException (ex); 107 } 108 } 109 110 119 120 public Object processMessage (Object message) { 121 String content; 122 String query; 123 JtMessage e = (JtMessage) message; 124 Object reply; 125 JtMessage msg; 126 127 128 if (e == null || (e.getMsgId() == null)) 129 return (null); 130 131 132 if (e.getMsgId().equals ("JtREMOVE")) { 133 return (null); 134 } 135 136 if (!initted) { 137 initial (); 138 initted = true; 139 } 140 141 142 if (e.getMsgId().equals("JtPUBLISH")) { 143 msg = (JtMessage) e.getMsgContent (); 144 reply = publishJMSMessage (msg); 145 return (reply); 146 } 147 148 149 if (e.getMsgId().equals("JtSTART_LISTENING")) { 150 startListening (); 151 return (null); 152 } 153 154 if (e.getMsgId().equals("JtRECEIVE")) { 155 reply = receiveJMSMessage (); 156 return (reply); 157 } 158 159 161 if (e.getMsgId().equals("JtTEST_PUBLISHER")) { 162 reply = testPublisher (); 163 return (reply); 164 } 165 166 168 if (e.getMsgId().equals("JtTEST_SUBSCRIBER")) { 169 reply = testSubscriber (); 170 return (reply); 171 } 172 173 174 handleError 175 ("processMessage: invalid message id:"+ 176 e.getMsgId()); 177 return (null); 178 } 179 180 181 182 186 187 public void setTopic (String topic) { 188 this.topic = topic; 189 } 190 191 192 195 196 public String getTopic () { 197 return (topic); 198 } 199 200 204 205 public void setTimeout (long timeout) { 206 this.timeout = timeout; 207 } 208 209 210 213 214 public long getTimeout () { 215 return (timeout); 216 } 217 218 219 224 225 public void setDeliveryMode (int deliveryMode) { 226 this.deliveryMode = deliveryMode; 227 } 228 229 230 233 234 public long getDeliveryMode () { 235 return (deliveryMode); 236 } 237 238 239 240 241 246 247 public void setPriority (int priority) { 248 this.priority = priority; 249 } 250 251 252 255 256 public long getPriority () { 257 return (priority); 258 } 259 260 261 266 267 public void setTimeToLive (long timeToLive) { 268 this.timeToLive = timeToLive; 269 } 270 271 272 275 276 public long getTimeToLive () { 277 return (timeToLive); 278 } 279 280 285 286 287 public void setSubject (Object subject) { 288 this.subject = subject; 289 } 290 291 292 296 297 public Object getSubject () { 298 return (subject); 299 } 300 301 302 306 307 public void setConnectionFactory (String connectionFactory) { 308 this.connectionFactory = connectionFactory; 309 } 310 311 312 315 316 public String getConnectionFactory () { 317 return (connectionFactory); 318 } 319 320 321 322 323 private Object testSubscriber () { 324 String reply = "PASS"; 325 ObjectMessage message; 327 JtMessage msg; 328 329 330 331 for (;;) { 332 333 msg = (JtMessage) sendMessage (this, new JtMessage ("JtRECEIVE")); 334 335 if (msg == null) { 336 System.out.println ("no more messages"); 337 break; 338 } 339 340 System.out.println ("msgId:" + msg.getMsgId ()); 341 342 } 343 344 return (reply); 345 } 346 347 348 350 private Object publishJMSMessage (JtMessage msg) { 351 352 ObjectMessage omsg; 353 String reply = "PASS"; 354 355 356 if (msg == null) { 357 reply = "FAIL"; 358 return (reply); 359 } 360 361 try { 362 363 if (topicPublisher == null) 364 topicPublisher = topicSession.createPublisher (jmsTopic); 365 366 omsg = topicSession.createObjectMessage (); 367 omsg.setObject (msg); 368 369 370 topicPublisher.publish (omsg, deliveryMode, priority, timeToLive); 371 } catch (Exception e) { 372 handleException (e); 373 reply = "FAIL"; 374 } 375 return (reply); 376 } 377 378 private void startListening () { 379 380 381 try { 382 if (topicSubscriber == null) 383 topicSubscriber = topicSession.createSubscriber (jmsTopic); 384 385 386 if (topicConnection == null) { 387 handleError ("receiveJMSMessage:topicConnection is null"); 388 return; 389 } 390 391 393 topicSubscriber.setMessageListener (this); 394 395 topicConnection.start (); 396 } catch (Exception ex) { 397 398 handleException (ex); 399 } 400 401 } 402 403 private JtMessage receiveJMSMessage () { 404 405 JtMessage msg = null; 406 ObjectMessage message; 407 408 409 try { 410 411 if (topicSubscriber == null) 412 topicSubscriber = topicSession.createSubscriber (jmsTopic); 413 414 415 if (topicConnection == null) { 416 handleError ("receiveJMSMessage:topicConnection is null"); 417 return (null); 418 } 419 topicConnection.start (); 420 421 message = (ObjectMessage) topicSubscriber.receive (timeout); 422 if (message != null) { 423 msg = (JtMessage) message.getObject (); 424 } 425 426 } catch (Exception e) { 427 handleException (e); 428 } 429 430 return (msg); 431 432 } 433 434 435 436 private Object testPublisher () { 437 String reply = "PASS"; 438 TextMessage message; 439 ObjectMessage omsg; 440 JtMessage msg = new JtMessage ("JtHELLO"); 441 JtMessage wrapper = new JtMessage ("JtPUBLISH"); 442 443 wrapper.setMsgContent (msg); 444 445 446 return (sendMessage (this, wrapper)); 447 } 448 449 452 453 public static void main (String [] args) { 454 JtFactory main; 455 JtJMSTopicAdapter jmsAdapter; 456 457 main = new JtFactory (); 458 459 460 jmsAdapter = (JtJMSTopicAdapter) main.createObject ("Jt.jms.JtJMSTopicAdapter", "jmsAdapter"); 461 462 if (args.length < 1) { 463 System.err.println ("Usage: java Jt.jms.JtJMSTopicAdapter -p or java Jt.jms.JtJMSTopicAdapter -s"); 464 System.exit (1); 465 } 466 467 if (args[0].equals ("-p")) { 468 main.sendMessage (jmsAdapter, new JtMessage ("JtTEST_PUBLISHER")); 469 System.exit (0); 470 } else if (args[0].equals ("-s")) { 471 main.sendMessage (jmsAdapter, new JtMessage ("JtTEST_SUBSCRIBER")); 472 System.exit (0); 473 } else 474 System.err.println ("Usage: java Jt.jms.JtJMSTopicAdapter -p or java Jt.jms.JtJMSTopicAdapter -s"); 475 476 main.removeObject (jmsAdapter); 477 478 } 479 } | Popular Tags |