1 46 package sample.jms.queues.Talk; 47 48 import java.io.IOException ; 49 50 import javax.jms.QueueConnectionFactory ; 51 import javax.jms.QueueSession ; 52 import javax.jms.Session ; 53 54 import org.mr.api.jms.MantaQueueConnectionFactory; 55 56 60 61 public class Talk 62 implements javax.jms.MessageListener 63 { 64 65 private static final int MESSAGE_TTL = 6000000; 66 67 private javax.jms.QueueConnection con = null; 68 private javax.jms.QueueSession sendSession = null; 69 private javax.jms.QueueSession receiveSession = null; 70 private javax.jms.QueueSender sender = null; 71 72 73 private void talker(String userName, String rQueue, String sQueue) 74 { 75 try 77 { 78 QueueConnectionFactory conFactory = (QueueConnectionFactory ) new MantaQueueConnectionFactory(); 79 80 con = conFactory.createQueueConnection(); 82 83 sendSession =(QueueSession ) con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 85 receiveSession =(QueueSession ) con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 86 87 } 88 catch (javax.jms.JMSException jmse) 89 { 90 jmse.printStackTrace(); 91 waitForAnyKey(); 92 93 try { 94 System.in.read(); 95 } catch (IOException e) { 96 e.printStackTrace(); 97 } 98 99 System.exit(1); 100 } 101 102 try 104 { 105 javax.jms.Queue sendQueue = sendSession.createQueue (sQueue); 106 sender = sendSession.createSender(sendQueue); 107 javax.jms.Queue receiveQueue = receiveSession.createQueue (rQueue); 108 javax.jms.QueueReceiver qReceiver = receiveSession.createReceiver(receiveQueue); 109 qReceiver.setMessageListener(this); 110 System.out.println ("\nStart receiving messages on queue \"" + rQueue + "\".\n"); 113 con.start(); 114 } 115 catch (javax.jms.JMSException jmse) 116 { 117 jmse.printStackTrace(); 118 exit(); 119 } 120 try 121 { 122 java.io.BufferedReader stdin = 124 new java.io.BufferedReader ( new java.io.InputStreamReader ( System.in ) ); 125 if (sQueue != null){ 126 System.out.println("Enter text to send to queue \"" + sQueue + "\"."); 127 System.out.println("Press Enter to send each message."); 128 System.out.println("Empty messages will not be sent."); 129 System.out.println("Typing 'exit' will stop the program."); 130 } 131 132 while ( true ) 133 { 134 System.out.print(userName+">"); 135 String s = stdin.readLine(); 136 137 if(s == null) 138 continue; 139 s =s.trim(); 141 142 if(s.length() ==0) 143 continue; 144 145 if ( s.equalsIgnoreCase("exit")) 146 exit(); 147 148 else if ( s.length() > 0 && sQueue != null) 149 { 150 javax.jms.TextMessage msg = sendSession.createTextMessage(); 151 msg.setText( userName + ": " + s ); 152 sender.send( msg, 155 javax.jms.DeliveryMode.NON_PERSISTENT, 156 javax.jms.Message.DEFAULT_PRIORITY, 157 MESSAGE_TTL); 158 } 159 } 160 } 161 catch ( java.io.IOException ioe ) 162 { 163 ioe.printStackTrace(); 164 } 165 catch ( javax.jms.JMSException jmse ) 166 { 167 jmse.printStackTrace(); 168 } 169 exit(); 171 } 172 173 177 public void onMessage( javax.jms.Message aMessage) 178 { 179 180 try 181 { 182 javax.jms.TextMessage textMessage = (javax.jms.TextMessage ) aMessage; 184 185 try 188 { 189 String string = textMessage.getText(); 190 System.out.println( string ); 191 } 192 catch (javax.jms.JMSException jmse) 193 { 194 jmse.printStackTrace(); 195 } 196 } 197 catch (Throwable t) 198 { 199 t.printStackTrace(); 200 } 201 } 202 203 204 private void exit() 205 { 206 try 207 { 208 con.close(); 209 } 210 catch (javax.jms.JMSException jmse) 211 { 212 jmse.printStackTrace(); 213 } 214 215 System.exit(0); 216 } 217 218 223 224 public static void main(String argv[]) { 225 226 if (argv.length == 0) { 228 printHelp(); 229 waitForAnyKey(); 230 System.exit(1); 231 } 232 233 235 String username = null; 236 237 String qSender = null; 238 String qReceiver = null; 239 240 for (int i = 0; i < argv.length; i++) { 242 String arg = argv[i]; 243 244 if (!arg.startsWith("-")) { 246 System.err.println ("error: unexpected argument - "+arg); 247 printHelp(); 248 249 waitForAnyKey(); 250 System.exit(1); 251 } 252 else { 253 254 if (arg.equals("-u")) { 255 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 256 System.err.println("error: missing user name"); 257 System.exit(1); 258 } 259 username = argv[++i]; 260 continue; 261 } 262 263 if (arg.equals("-qr")) { 264 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 265 System.err.println("error: missing receive queue parameter"); 266 System.exit(1); 267 } 268 qReceiver = argv[++i]; 269 continue; 270 } 271 272 if (arg.equals("-qs")) { 273 if (i == argv.length - 1 || argv[i+1].startsWith("-")) { 274 System.err.println("error: missing send queue parameter"); 275 System.exit(1); 276 } 277 qSender = argv[++i]; 278 continue; 279 } 280 281 282 if (arg.equals("-h")) { 283 printHelp(); 284 System.exit(1); 285 } 286 } 287 } 288 289 if (username == null) { 291 System.err.println ("error: user name must be supplied"); 292 printHelp(); 293 System.exit(1); 294 } 295 296 if (qReceiver == null || qSender == null) { 297 System.err.println ("error: receive queue and send queue must be supplied"); 298 printHelp(); 299 System.exit(1); 300 } 301 302 Talk talk = new Talk(); 304 talk.talker(username ,qReceiver, qSender); 305 306 } 307 308 private static void waitForAnyKey(){ 309 System.out.print("Press any key ..."); 310 try { 311 System.in.read(); 312 } catch (IOException e) { 313 e.printStackTrace(); 314 } 315 } 316 317 318 private static void printHelp() { 319 320 StringBuffer use = new StringBuffer (); 321 use.append("help: java Talk (options) ...\n\n"); 322 use.append("options:\n"); 323 use.append(" -qr queue Specify queue for receiving messages.\n"); 324 use.append(" -qs queue Specify queue for sending messages.\n"); 325 use.append(" -u user Specify a user name.\n"); 326 use.append(" -h This help screen.\n"); 327 System.err.println (use); 328 } 329 330 } | Popular Tags |