1 46 package sample.jms.queues.ReliableTalk; 47 48 import java.io.IOException ; 49 50 51 import javax.jms.QueueConnectionFactory ; 52 import javax.jms.QueueSession ; 53 import javax.jms.Session ; 54 55 import org.mr.api.jms.MantaQueueConnectionFactory; 56 57 58 59 63 64 65 public class ReliableTalk 66 implements javax.jms.MessageListener 67 { 68 69 private static final int MESSAGE_TTL = 6000000; 70 71 private javax.jms.QueueConnection con = null; 72 private javax.jms.QueueSession sendSession = null; 73 private javax.jms.QueueSession receiveSession = null; 74 private javax.jms.QueueSender sender = null; 75 76 77 private void talker(String userName, String rQueue, String sQueue) 78 { 79 try 81 { 82 QueueConnectionFactory conFactory = (QueueConnectionFactory ) new MantaQueueConnectionFactory(); 83 84 con = conFactory.createQueueConnection(); 86 87 sendSession =(QueueSession ) con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 89 receiveSession =(QueueSession ) con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 90 91 } 92 catch (javax.jms.JMSException jmse) 93 { 94 jmse.printStackTrace(); 95 waitForAnyKey(); 96 97 try { 98 System.in.read(); 99 } catch (IOException e) { 100 e.printStackTrace(); 101 } 102 103 System.exit(1); 104 } 105 106 try 108 { 109 if (sQueue != null) 110 { 111 javax.jms.Queue sendQueue = sendSession.createQueue (sQueue); 112 sender = sendSession.createSender(sendQueue); 113 } 114 if (rQueue != null) 115 { 116 javax.jms.Queue receiveQueue = receiveSession.createQueue (rQueue); 117 javax.jms.QueueReceiver qReceiver = receiveSession.createReceiver(receiveQueue); 118 qReceiver.setMessageListener(this); 119 System.out.println ("\nStart receiving messages on queue \"" + rQueue + "\".\n"); 122 con.start(); 123 } 124 } 125 catch (javax.jms.JMSException jmse) 126 { 127 jmse.printStackTrace(); 128 exit(); 129 } 130 try 132 { 133 java.io.BufferedReader stdin = 135 new java.io.BufferedReader ( new java.io.InputStreamReader ( System.in ) ); 136 if (sQueue != null){ 137 System.out.println("Enter text to send to queue \"" + sQueue + "\"."); 138 System.out.println("Press Enter to send each message."); 139 System.out.println("Empty messages will not be sent."); 140 System.out.println("Typing 'exit' will stop the program."); 141 } 142 143 while ( true ) 144 { 145 System.out.print(userName+">"); 146 String s = stdin.readLine(); 147 148 if(s == null) 149 continue; 150 s =s.trim(); 152 153 if(s.length() ==0) 154 continue; 155 156 if ( s.equalsIgnoreCase("exit")) 157 exit(); 158 159 else if ( s.length() > 0 && sQueue != null) 160 { 161 javax.jms.TextMessage msg = sendSession.createTextMessage(); 162 msg.setText( userName + ": " + s ); 163 sender.send( msg, 166 javax.jms.DeliveryMode.PERSISTENT, 167 javax.jms.Message.DEFAULT_PRIORITY, 168 MESSAGE_TTL); 169 } 170 } 171 } 172 catch ( java.io.IOException ioe ) 173 { 174 ioe.printStackTrace(); 175 } 176 catch ( javax.jms.JMSException jmse ) 177 { 178 jmse.printStackTrace(); 179 } 180 exit(); 182 } 183 184 188 public void onMessage( javax.jms.Message aMessage) 189 { 190 try 191 { 192 javax.jms.TextMessage textMessage = (javax.jms.TextMessage ) aMessage; 194 195 try 198 { 199 String string = textMessage.getText(); 200 System.out.println( string ); 201 } 202 catch (javax.jms.JMSException jmse) 203 { 204 jmse.printStackTrace(); 205 } 206 } 207 catch (Throwable t) 208 { 209 t.printStackTrace(); 210 } 211 } 212 213 214 private void exit() 215 { 216 try 217 { 218 con.close(); 219 } 220 catch (javax.jms.JMSException jmse) 221 { 222 jmse.printStackTrace(); 223 } 224 225 System.exit(0); 226 } 227 228 233 234 public static void main(String argv[]) { 235 236 if (argv.length == 0) { 238 printHelp(); 239 waitForAnyKey(); 240 System.exit(1); 241 } 242 243 245 String username = null; 246 247 String qSender = null; 248 String qReceiver = null; 249 250 for (int i = 0; i < argv.length; i++) { 252 String arg = argv[i]; 253 254 if (!arg.startsWith("-")) { 256 System.err.println ("error: unexpected argument - "+arg); 257 printHelp(); 258 259 waitForAnyKey(); 260 System.exit(1); 261 } 262 else { 263 264 if (arg.equals("-u")) { 265 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 266 System.err.println("error: missing user name"); 267 System.exit(1); 268 } 269 username = argv[++i]; 270 continue; 271 } 272 273 if (arg.equals("-qr")) { 274 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 275 System.err.println("error: missing receive queue parameter"); 276 System.exit(1); 277 } 278 qReceiver = argv[++i]; 279 continue; 280 } 281 282 if (arg.equals("-qs")) { 283 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 284 System.err.println("error: missing send queue parameter"); 285 System.exit(1); 286 } 287 qSender = argv[++i]; 288 continue; 289 } 290 291 292 if (arg.equals("-h")) { 293 printHelp(); 294 System.exit(1); 295 } 296 } 297 } 298 299 if (username == null) { 301 System.err.println ("error: user name must be supplied"); 302 printHelp(); 303 System.exit(1); 304 } 305 306 if (qReceiver == null || qSender == null) { 307 System.err.println ("error: receive queue and send queue must be supplied"); 308 printHelp(); 309 System.exit(1); 310 } 311 312 ReliableTalk talk = new ReliableTalk(); 314 talk.talker(username ,qReceiver, qSender); 315 316 } 317 318 private static void waitForAnyKey(){ 319 System.out.print("Press any key ..."); 320 try { 321 System.in.read(); 322 } catch (IOException e) { 323 e.printStackTrace(); 324 } 325 } 326 327 328 private static void printHelp() { 329 330 StringBuffer use = new StringBuffer (); 331 use.append("help: java Talk (options) ...\n\n"); 332 use.append("options:\n"); 333 use.append(" -qr queue Specify queue for receiving messages.\n"); 334 use.append(" -qs queue Specify queue for sending messages.\n"); 335 use.append(" -u user Specify a user name.\n"); 336 use.append(" -h This help screen.\n"); 337 System.err.println (use); 338 } 339 340 } | Popular Tags |