1 46 47 package sample.simple.talk; 48 49 import java.io.IOException ; 50 51 import org.mr.MantaAgentConstants; 52 import org.mr.api.simple.Consumer; 53 import org.mr.api.simple.Message; 54 import org.mr.api.simple.Producer; 55 import org.mr.api.simple.SimpleAPI; 56 import org.mr.api.simple.SimpleException; 57 58 59 63 64 65 74 public class Talk implements Runnable { 75 private Producer producer; 76 private Consumer consumer; 77 78 public Talk(Producer producer, Consumer consumer) { 79 this.producer = producer; 80 this.consumer = consumer; 81 } 82 83 public void go() { 84 Thread t = new Thread (this); 86 t.start(); 87 88 try { 90 while (true) { 91 StringBuffer text = new StringBuffer (); 92 int c; 93 boolean shouldExit = false; 94 95 System.out.print("talk> "); 97 while ((c = System.in.read()) != (int) '\n') { 98 if (c == -1) { System.out.println(); 100 System.out.println("Goodbye."); 101 shouldExit = true; 102 } else { 103 text.append((char) c); 104 } 105 } 106 107 if (shouldExit) break; 109 Message message = new Message(text.toString().getBytes()); 110 this.producer.send(message); 111 } 112 } catch (SimpleException e) { 113 System.err.println("Exception thrown in send loop: " + e); 114 System.err.println("Exiting..."); 115 } catch (IOException e) { 116 System.err.println("I/O Error while reading from stdin: " + e); 117 System.err.println("Exiting..."); 118 } 119 } 120 121 public void run() { 122 try { 124 while (true) { 125 Message message = this.consumer.receive(); 127 String text = new String (message.getPayload()); 128 System.out.println(); 129 System.out.println("> " + text); 130 System.out.print("talk> "); 131 } 132 } catch (SimpleException e) { 133 System.err.println("Exception thrown in receive loop: " + e); 134 System.err.println("Receive thread exiting... " + 135 "No more messages will be received."); 136 } 137 } 138 139 140 public static String sendQueueName; 141 public static String receiveQueueName; 142 public static String mantaConf; 143 144 public static void main(String args[]) { 145 parseArgs(args); 146 147 try { 148 SimpleAPI api = new SimpleAPI(mantaConf); 150 Producer producer = api.openProducer(sendQueueName); 151 Consumer consumer = api.openConsumer(receiveQueueName); 152 Talk talk = new Talk(producer, consumer); 153 154 talk.go(); 155 } catch (SimpleException e) { 156 System.err.println("Exception thrown while initializing: " + e); 157 System.err.println("Exiting..."); 158 } 159 System.exit(0); 160 } 161 162 private static void parseArgs(String [] args) { 163 for (int i = 0; i < args.length; i++) { 164 String arg = args[i]; 165 if (!arg.startsWith("-")) { 166 System.err.println("error: unexpected argument -- " + arg); 167 printUsage(); 168 System.exit(1); 169 } else { 170 if (arg.equals("-c")) { 171 if (i == args.length - 1 || args[i+1].startsWith("-")) { 172 System.err.println("error: missing config-dir " + 173 "argument"); 174 printUsage(); 175 System.exit(1); 176 } 177 mantaConf = args[++i]; 178 continue; 179 } else if (arg.equals("-qr")) { 180 if (i == args.length - 1 || args[i+1].startsWith("-")) { 181 System.err.println("error: missing receive queue " + 182 "argument"); 183 printUsage(); 184 System.exit(1); 185 } 186 receiveQueueName = args[++i]; 187 continue; 188 } else if (arg.equals("-qs")) { 189 if (i == args.length - 1 || args[i+1].startsWith("-")) { 190 System.err.println("error: missing send queue " + 191 "argument"); 192 printUsage(); 193 System.exit(1); 194 } 195 sendQueueName = args[++i]; 196 continue; 197 } else if (arg.equals("-h")) { 198 printUsage(); 199 System.exit(1); 200 } else { 201 System.err.println("unrecognized option -- " + arg); 202 } 203 } 204 } 205 206 if(mantaConf == null){ 207 mantaConf = System.getProperty(MantaAgentConstants.MANTA_CONFIG); 208 } 209 if (mantaConf == null) { 210 mantaConf = "./default_config.xml"; 211 } 212 if (sendQueueName == null) { 213 System.err.println("error: you did not specify a send queue"); 214 System.exit(1); 215 } 216 if (receiveQueueName == null) { 217 System.err.println("error: you did not specify a receive queue"); 218 System.exit(1); 219 } 220 } 221 222 private static void printUsage() { 223 System.out.println("Usage: java Talk -c <config-dir> -qs <queue> " + 224 "-qr <queue>"); 225 System.out.println("\t-c config-dir\tmanta installation dir, should " + 226 "contain manta.jar"); 227 System.out.println("\t-qs queue\tqueue name used for sending " + 228 "messages"); 229 System.out.println("\t-qr queue\tqueue name used for receiving " + 230 "messages"); 231 } 232 233 } | Popular Tags |