1 18 19 import javax.jms.Connection ; 20 import javax.jms.DeliveryMode ; 21 import javax.jms.Destination ; 22 import javax.jms.ExceptionListener ; 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageConsumer ; 26 import javax.jms.MessageListener ; 27 import javax.jms.MessageProducer ; 28 import javax.jms.Session ; 29 import javax.jms.TextMessage ; 30 import javax.jms.Topic ; 31 32 import org.apache.activemq.ActiveMQConnection; 33 import org.apache.activemq.ActiveMQConnectionFactory; 34 35 import java.io.IOException ; 36 import java.util.Arrays ; 37 38 43 public class ConsumerTool implements MessageListener , ExceptionListener { 44 45 private boolean running; 46 47 private Session session; 48 private Destination destination; 49 private MessageProducer replyProducer; 50 51 private boolean pauseBeforeShutdown; 52 private boolean verbose = true; 53 private int maxiumMessages = 0; 54 private String subject = "TOOL.DEFAULT"; 55 private boolean topic = false; 56 private String user = ActiveMQConnection.DEFAULT_USER; 57 private String password = ActiveMQConnection.DEFAULT_PASSWORD; 58 private String url = ActiveMQConnection.DEFAULT_BROKER_URL; 59 private boolean transacted = false; 60 private boolean durable = false; 61 private String clientId; 62 private int ackMode = Session.AUTO_ACKNOWLEDGE; 63 private String consumerName = "James"; 64 private long sleepTime = 0; 65 private long receiveTimeOut = 0; 66 67 public static void main(String [] args) { 68 ConsumerTool consumerTool = new ConsumerTool(); 69 String [] unknonwn = CommnadLineSupport.setOptions(consumerTool, args); 70 if (unknonwn.length > 0) { 71 System.out.println("Unknown options: " + Arrays.toString(unknonwn)); 72 System.exit(-1); 73 } 74 consumerTool.run(); 75 } 76 77 public void run() { 78 try { 79 running = true; 80 81 System.out.println("Connecting to URL: " + url); 82 System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject); 83 System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription"); 84 85 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 86 Connection connection = connectionFactory.createConnection(); 87 if (durable && clientId != null && clientId.length()>0 && !"null".equals(clientId) ) { 88 connection.setClientID(clientId); 89 } 90 connection.setExceptionListener(this); 91 connection.start(); 92 93 session = connection.createSession(transacted, ackMode); 94 if (topic) { 95 destination = session.createTopic(subject); 96 } else { 97 destination = session.createQueue(subject); 98 } 99 100 replyProducer = session.createProducer(null); 101 replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 102 103 MessageConsumer consumer = null; 104 if (durable && topic) { 105 consumer = session.createDurableSubscriber((Topic ) destination, consumerName); 106 } else { 107 consumer = session.createConsumer(destination); 108 } 109 110 if (maxiumMessages > 0) { 111 consumeMessagesAndClose(connection, session, consumer); 112 } else { 113 if (receiveTimeOut == 0) { 114 consumer.setMessageListener(this); 115 } else { 116 consumeMessagesAndClose(connection, session, consumer, receiveTimeOut); 117 } 118 } 119 120 } catch (Exception e) { 121 System.out.println("Caught: " + e); 122 e.printStackTrace(); 123 } 124 } 125 126 public void onMessage(Message message) { 127 try { 128 129 if (message instanceof TextMessage ) { 130 TextMessage txtMsg = (TextMessage ) message; 131 if (verbose) { 132 133 String msg = txtMsg.getText(); 134 if (msg.length() > 50) { 135 msg = msg.substring(0, 50) + "..."; 136 } 137 138 System.out.println("Received: " + msg); 139 } 140 } else { 141 if (verbose) { 142 System.out.println("Received: " + message); 143 } 144 } 145 146 if (message.getJMSReplyTo() != null) { 147 replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID())); 148 } 149 150 if (transacted) { 151 session.commit(); 152 } else if ( ackMode == Session.CLIENT_ACKNOWLEDGE ) { 153 message.acknowledge(); 154 } 155 156 } catch (JMSException e) { 157 System.out.println("Caught: " + e); 158 e.printStackTrace(); 159 } finally { 160 if (sleepTime > 0) { 161 try { 162 Thread.sleep(sleepTime); 163 } catch (InterruptedException e) { 164 } 165 } 166 } 167 } 168 169 synchronized public void onException(JMSException ex) { 170 System.out.println("JMS Exception occured. Shutting down client."); 171 running = false; 172 } 173 174 synchronized boolean isRunning() { 175 return running; 176 } 177 178 protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException , 179 IOException { 180 System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown"); 181 182 for (int i = 0; i < maxiumMessages && isRunning();) { 183 Message message = consumer.receive(1000); 184 if (message != null) { 185 i++; 186 onMessage(message); 187 } 188 } 189 System.out.println("Closing connection"); 190 consumer.close(); 191 session.close(); 192 connection.close(); 193 if (pauseBeforeShutdown) { 194 System.out.println("Press return to shut down"); 195 System.in.read(); 196 } 197 } 198 199 protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) 200 throws JMSException , IOException { 201 System.out.println("We will consume messages while they continue to be delivered within: " + timeout 202 + " ms, and then we will shutdown"); 203 204 Message message; 205 while ((message = consumer.receive(timeout)) != null) { 206 onMessage(message); 207 } 208 209 System.out.println("Closing connection"); 210 consumer.close(); 211 session.close(); 212 connection.close(); 213 if (pauseBeforeShutdown) { 214 System.out.println("Press return to shut down"); 215 System.in.read(); 216 } 217 } 218 219 public void setAckMode(String ackMode) { 220 if( "CLIENT_ACKNOWLEDGE".equals(ackMode) ) { 221 this.ackMode = Session.CLIENT_ACKNOWLEDGE; 222 } 223 if( "AUTO_ACKNOWLEDGE".equals(ackMode) ) { 224 this.ackMode = Session.AUTO_ACKNOWLEDGE; 225 } 226 if( "DUPS_OK_ACKNOWLEDGE".equals(ackMode) ) { 227 this.ackMode = Session.DUPS_OK_ACKNOWLEDGE; 228 } 229 if( "SESSION_TRANSACTED".equals(ackMode) ) { 230 this.ackMode = Session.SESSION_TRANSACTED; 231 } 232 } 233 234 public void setClientId(String clientID) { 235 this.clientId = clientID; 236 } 237 public void setConsumerName(String consumerName) { 238 this.consumerName = consumerName; 239 } 240 public void setDurable(boolean durable) { 241 this.durable = durable; 242 } 243 public void setMaxiumMessages(int maxiumMessages) { 244 this.maxiumMessages = maxiumMessages; 245 } 246 public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) { 247 this.pauseBeforeShutdown = pauseBeforeShutdown; 248 } 249 public void setPassword(String pwd) { 250 this.password = pwd; 251 } 252 public void setReceiveTimeOut(long receiveTimeOut) { 253 this.receiveTimeOut = receiveTimeOut; 254 } 255 public void setSleepTime(long sleepTime) { 256 this.sleepTime = sleepTime; 257 } 258 public void setSubject(String subject) { 259 this.subject = subject; 260 } 261 public void setTopic(boolean topic) { 262 this.topic = topic; 263 } 264 public void setQueue(boolean queue) { 265 this.topic = !queue; 266 } 267 public void setTransacted(boolean transacted) { 268 this.transacted = transacted; 269 } 270 public void setUrl(String url) { 271 this.url = url; 272 } 273 public void setUser(String user) { 274 this.user = user; 275 } 276 public void setVerbose(boolean verbose) { 277 this.verbose = verbose; 278 } 279 280 } 281 | Popular Tags |