1 18 import java.util.Arrays ; 19 20 import javax.jms.Connection ; 21 import javax.jms.JMSException ; 22 import javax.jms.Message ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageListener ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Session ; 27 import javax.jms.TextMessage ; 28 import javax.jms.Topic ; 29 30 import org.apache.activemq.ActiveMQConnectionFactory; 31 32 35 public class TopicListener implements MessageListener { 36 37 private Connection connection; 38 private MessageProducer producer; 39 private Session session; 40 private int count; 41 private long start; 42 private Topic topic; 43 private Topic control; 44 45 private String url="tcp://localhost:61616"; 47 48 public static void main(String [] argv) throws Exception { 49 TopicListener l = new TopicListener(); 50 String [] unknonwn = CommnadLineSupport.setOptions(l, argv); 51 if (unknonwn.length > 0) { 52 System.out.println("Unknown options: " + Arrays.toString(unknonwn)); 53 System.exit(-1); 54 } 55 l.run(); 56 } 57 58 public void run() throws JMSException { 59 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); 60 connection = factory.createConnection(); 61 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 62 topic = session.createTopic("topictest.messages"); 63 control = session.createTopic("topictest.control"); 64 65 MessageConsumer consumer = session.createConsumer(topic); 66 consumer.setMessageListener(this); 67 68 connection.start(); 69 70 producer = session.createProducer(control); 71 System.out.println("Waiting for messages..."); 72 } 73 74 private static boolean checkText(Message m, String s) 75 { 76 try 77 { 78 return m instanceof TextMessage && ((TextMessage ) m).getText().equals(s); 79 } 80 catch (JMSException e) 81 { 82 e.printStackTrace(System.out); 83 return false; 84 } 85 } 86 87 88 public void onMessage(Message message) { 89 if ( checkText(message, "SHUTDOWN") ) { 90 91 try { 92 connection.close(); 93 } catch (Exception e) { 94 e.printStackTrace(System.out); 95 } 96 97 } else if (checkText(message, "REPORT")) { 98 try { 100 long time = (System.currentTimeMillis() - start); 101 String msg = "Received " + count + " in " + time + "ms"; 102 producer.send(session.createTextMessage(msg)); 103 } catch (Exception e) { 104 e.printStackTrace(System.out); 105 } 106 count = 0; 107 108 } else { 109 110 if (count==0) { 111 start = System.currentTimeMillis(); 112 } 113 114 if (++count % 1000 == 0) 115 System.out.println("Received " + count + " messages."); 116 } 117 } 118 119 public void setUrl(String url) { 120 this.url = url; 121 } 122 123 } 124 | Popular Tags |