1 18 package org.apache.activemq.tool; 19 20 import javax.jms.Connection ; 21 import javax.jms.JMSException ; 22 import javax.jms.Message ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageListener ; 25 import javax.jms.Session ; 26 import javax.jms.TextMessage ; 27 import javax.jms.Topic ; 28 import java.io.IOException ; 29 30 35 public class ConsumerTool extends ToolSupport implements MessageListener { 36 37 protected int count = 0; 38 protected int dumpCount = 10; 39 protected boolean verbose = true; 40 protected int maxiumMessages = 0; 41 private boolean pauseBeforeShutdown; 42 43 44 public static void main(String [] args) { 45 ConsumerTool tool = new ConsumerTool(); 46 if (args.length > 0) { 47 tool.url = args[0]; 48 } 49 if (args.length > 1) { 50 tool.topic = args[1].equalsIgnoreCase("true"); 51 } 52 if (args.length > 2) { 53 tool.subject = args[2]; 54 } 55 if (args.length > 3) { 56 tool.durable = args[3].equalsIgnoreCase("true"); 57 } 58 if (args.length > 4) { 59 tool.maxiumMessages = Integer.parseInt(args[4]); 60 } 61 tool.run(); 62 } 63 64 public void run() { 65 try { 66 System.out.println("Connecting to URL: " + url); 67 System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject); 68 System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription"); 69 70 Connection connection = createConnection(); 71 Session session = createSession(connection); 72 MessageConsumer consumer = null; 73 if (durable && topic) { 74 consumer = session.createDurableSubscriber((Topic ) destination, consumerName); 75 } 76 else { 77 consumer = session.createConsumer(destination); 78 } 79 if (maxiumMessages <= 0) { 80 consumer.setMessageListener(this); 81 } 82 connection.start(); 83 84 if (maxiumMessages > 0) { 85 consumeMessagesAndClose(connection, session, consumer); 86 } 87 } 88 catch (Exception e) { 89 System.out.println("Caught: " + e); 90 e.printStackTrace(); 91 } 92 } 93 94 public void onMessage(Message message) { 95 try { 96 if (message instanceof TextMessage ) { 97 TextMessage txtMsg = (TextMessage ) message; 98 if (verbose) { 99 100 String msg = txtMsg.getText(); 101 if( msg.length() > 50 ) 102 msg = msg.substring(0, 50)+"..."; 103 104 System.out.println("Received: " + msg); 105 } 106 } 107 else { 108 if (verbose) { 109 System.out.println("Received: " + message); 110 } 111 } 112 117 } 118 catch (JMSException e) { 119 System.out.println("Caught: " + e); 120 e.printStackTrace(); 121 } 122 } 123 124 125 protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException , IOException { 126 System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown"); 127 128 for (int i = 0; i < maxiumMessages; i++) { 129 Message message = consumer.receive(); 130 onMessage(message); 131 } 132 System.out.println("Closing connection"); 133 consumer.close(); 134 session.close(); 135 connection.close(); 136 if (pauseBeforeShutdown) { 137 System.out.println("Press return to shut down"); 138 System.in.read(); 139 } 140 } 141 } 142 | Popular Tags |