1 46 package sample.jms.queues.SelectorTalk; 47 48 import java.io.IOException ; 49 50 51 import javax.jms.QueueConnectionFactory ; 52 import javax.jms.QueueSession ; 53 import javax.jms.Session ; 54 55 import org.mr.api.jms.MantaQueueConnectionFactory; 56 57 58 62 63 64 public class SelectorTalk 65 implements javax.jms.MessageListener 66 { 67 68 private static final int MESSAGE_TTL = 6000000; 69 private static final String PROPERTY_NAME = "Ticket"; 70 71 private javax.jms.QueueConnection con = null; 72 private javax.jms.QueueSession sendSession = null; 73 private javax.jms.QueueSession receiveSession = null; 74 private javax.jms.QueueSender sender = null; 75 76 77 private void talker(String userName, String rQueue, String sQueue, String selection) 78 { 79 try 81 { 82 QueueConnectionFactory conFactory = (QueueConnectionFactory ) new MantaQueueConnectionFactory(); 83 84 con = conFactory.createQueueConnection(); 86 87 sendSession =(QueueSession ) con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 89 receiveSession =(QueueSession ) con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 90 91 } 92 catch (javax.jms.JMSException jmse) 93 { 94 jmse.printStackTrace(); 95 waitForAnyKey(); 96 97 try { 98 System.in.read(); 99 } catch (IOException e) { 100 e.printStackTrace(); 101 } 102 103 System.exit(1); 104 } 105 106 try 108 { 109 if (sQueue != null) 110 { 111 javax.jms.Queue sendQueue = sendSession.createQueue (sQueue); 112 sender = sendSession.createSender(sendQueue); 113 } 114 if (rQueue != null) 115 { 116 javax.jms.Queue receiveQueue = receiveSession.createQueue(rQueue); 117 javax.jms.QueueReceiver qReceiver = receiveSession.createReceiver(receiveQueue, PROPERTY_NAME + " = \'" + selection + "\'"); 118 qReceiver.setMessageListener(this); 119 System.out.println("\nStart receiving messages on queue \"" + rQueue + "\"\n" + 122 "with a message selector set to: \"" +PROPERTY_NAME + " = \'" + selection + "\'\"\n"); 123 con.start(); 124 } 125 } 126 catch (javax.jms.JMSException jmse) 127 { 128 jmse.printStackTrace(); 129 exit(); 130 } 131 try 133 { 134 java.io.BufferedReader stdin = 136 new java.io.BufferedReader ( new java.io.InputStreamReader ( System.in ) ); 137 if (sQueue != null){ 138 System.out.println("Enter text to send to queue \"" + sQueue + "\"."); 139 System.out.println("Press Enter to send each message."); 140 System.out.println("Empty messages will not be sent."); 141 System.out.println("Typing 'exit' will stop the program."); 142 } 143 144 while ( true ) 145 { 146 System.out.print(userName+">"); 147 String s = stdin.readLine(); 148 149 if(s == null) 150 continue; 151 s =s.trim(); 153 154 if(s.length() ==0) 155 continue; 156 157 if ( s.equalsIgnoreCase("exit")) 158 exit(); 159 160 else if ( s.length() > 0 && sQueue != null) 161 { 162 javax.jms.TextMessage msg = sendSession.createTextMessage(); 163 msg.setStringProperty(PROPERTY_NAME, selection); 165 msg.setText( userName + ": " + s ); 166 sender.send( msg, 169 javax.jms.DeliveryMode.NON_PERSISTENT, 170 javax.jms.Message.DEFAULT_PRIORITY, 171 MESSAGE_TTL); 172 } 173 } 174 } 175 catch ( java.io.IOException ioe ) 176 { 177 ioe.printStackTrace(); 178 } 179 catch ( javax.jms.JMSException jmse ) 180 { 181 jmse.printStackTrace(); 182 } 183 exit(); 185 } 186 187 191 public void onMessage( javax.jms.Message aMessage) 192 { 193 194 try 195 { 196 javax.jms.TextMessage textMessage = (javax.jms.TextMessage ) aMessage; 198 199 try 202 { 203 String string = textMessage.getText(); 204 System.out.println( string ); 205 } 206 catch (javax.jms.JMSException jmse) 207 { 208 jmse.printStackTrace(); 209 } 210 } 211 catch (Throwable t) 212 { 213 t.printStackTrace(); 214 } 215 } 216 217 218 private void exit() 219 { 220 try 221 { 222 con.close(); 223 } 224 catch (javax.jms.JMSException jmse) 225 { 226 jmse.printStackTrace(); 227 } 228 229 System.exit(0); 230 } 231 232 237 238 public static void main(String argv[]) { 239 240 if (argv.length == 0) { 242 printHelp(); 243 waitForAnyKey(); 244 System.exit(1); 245 } 246 247 249 String username = null; 250 String qSender = null; 251 String qReceiver = null; 252 String selection = null; 253 254 for (int i = 0; i < argv.length; i++) { 256 String arg = argv[i]; 257 258 if (!arg.startsWith("-")) { 260 System.err.println ("error: unexpected argument - "+arg); 261 printHelp(); 262 263 waitForAnyKey(); 264 System.exit(1); 265 } 266 else { 267 268 if (arg.equals("-u")) { 269 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 270 System.err.println("error: missing user name"); 271 System.exit(1); 272 } 273 username = argv[++i]; 274 continue; 275 } 276 277 if (arg.equals("-qr")) { 278 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 279 System.err.println("error: missing receive queue parameter"); 280 System.exit(1); 281 } 282 qReceiver = argv[++i]; 283 continue; 284 } 285 286 if (arg.equals("-qs")) { 287 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 288 System.err.println("error: missing send queue parameter"); 289 System.exit(1); 290 } 291 qSender = argv[++i]; 292 continue; 293 } 294 295 if (arg.equals("-s")) { 296 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 297 System.err.println("error: missing selectiion"); 298 System.exit(1); 299 } 300 selection = argv[++i]; 301 continue; 302 } 303 if (arg.equals("-h")) { 304 printHelp(); 305 System.exit(1); 306 } 307 } 308 } 309 310 if (username == null) { 312 System.err.println ("error: user name must be supplied"); 313 printHelp(); 314 System.exit(1); 315 } 316 317 if (qReceiver == null || qSender == null) { 318 System.err.println ("error: receive queue and send queue must be supplied"); 319 printHelp(); 320 System.exit(1); 321 } 322 323 if (selection == null) { 324 System.err.println ("error: selection must be supplied"); 325 printHelp(); 326 System.exit(1); 327 } 328 329 SelectorTalk talk = new SelectorTalk(); 331 talk.talker(username, qReceiver, qSender, selection); 332 } 333 334 private static void waitForAnyKey(){ 335 System.out.print("press any key ..."); 336 try { 337 System.in.read(); 338 } catch (IOException e) { 339 e.printStackTrace(); 340 } 341 } 342 343 344 private static void printHelp() { 345 346 StringBuffer use = new StringBuffer (); 347 use.append("help: java Talk (options) ...\n\n"); 348 use.append("options:\n"); 349 use.append(" -qr queue Specify queue for receiving messages.\n"); 350 use.append(" -qs queue Specify queue for sending messages.\n"); 351 use.append(" -s selection Required, selection used to receive messages.\n"); 352 use.append(" -h This help screen.\n"); 353 System.err.println (use); 354 } 355 356 } 357 358 359 360 361 362 363 | Popular Tags |