1 18 import java.util.Arrays ; 19 import java.util.Date ; 20 21 import javax.jms.Connection ; 22 import javax.jms.DeliveryMode ; 23 import javax.jms.Destination ; 24 import javax.jms.MessageProducer ; 25 import javax.jms.Session ; 26 import javax.jms.TextMessage ; 27 28 import org.apache.activemq.ActiveMQConnection; 29 import org.apache.activemq.ActiveMQConnectionFactory; 30 import org.apache.activemq.util.IndentPrinter; 31 32 37 public class ProducerTool { 38 39 private Destination destination; 40 private int messageCount = 10; 41 private long sleepTime = 0L; 42 private boolean verbose = true; 43 private int messageSize = 255; 44 private long timeToLive; 45 private String user = ActiveMQConnection.DEFAULT_USER; 46 private String password = ActiveMQConnection.DEFAULT_PASSWORD; 47 private String url = ActiveMQConnection.DEFAULT_BROKER_URL; 48 private String subject = "TOOL.DEFAULT"; 49 private boolean topic = false; 50 private boolean transacted = false; 51 private boolean persistent = false; 52 53 public static void main(String [] args) { 54 ProducerTool producerTool = new ProducerTool(); 55 String [] unknonwn = CommnadLineSupport.setOptions(producerTool, args); 56 if( unknonwn.length > 0 ) { 57 System.out.println("Unknown options: "+Arrays.toString(unknonwn)); 58 System.exit(-1); 59 } 60 producerTool.run(); 61 } 62 63 public void run() { 64 Connection connection=null; 65 try { 66 System.out.println("Connecting to URL: " + url); 67 System.out.println("Publishing a Message with size " + messageSize+ " to " + (topic ? "topic" : "queue") + ": " + subject); 68 System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); 69 System.out.println("Sleeping between publish " + sleepTime + " ms"); 70 if (timeToLive != 0) { 71 System.out.println("Messages time to live " + timeToLive + " ms"); 72 } 73 74 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 76 connection = connectionFactory.createConnection(); 77 connection.start(); 78 79 Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); 81 if (topic) { 82 destination = session.createTopic(subject); 83 } else { 84 destination = session.createQueue(subject); 85 } 86 87 MessageProducer producer = session.createProducer(destination); 89 if (persistent) { 90 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 91 } else { 92 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 93 } 94 if (timeToLive != 0) 95 producer.setTimeToLive(timeToLive); 96 97 sendLoop(session, producer); 99 100 System.out.println("Done."); 101 102 ActiveMQConnection c = (ActiveMQConnection) connection; 104 c.getConnectionStats().dump(new IndentPrinter()); 105 106 } catch (Exception e) { 107 System.out.println("Caught: " + e); 108 e.printStackTrace(); 109 } finally { 110 try { 111 connection.close(); 112 } catch (Throwable ignore) { 113 } 114 } 115 } 116 117 protected void sendLoop(Session session, MessageProducer producer) 118 throws Exception { 119 120 for (int i = 0; i < messageCount || messageCount == 0; i++) { 121 122 TextMessage message = session 123 .createTextMessage(createMessageText(i)); 124 125 if (verbose) { 126 String msg = message.getText(); 127 if (msg.length() > 50) { 128 msg = msg.substring(0, 50) + "..."; 129 } 130 System.out.println("Sending message: " + msg); 131 } 132 133 producer.send(message); 134 if (transacted) { 135 session.commit(); 136 } 137 138 Thread.sleep(sleepTime); 139 140 } 141 142 } 143 144 private String createMessageText(int index) { 145 StringBuffer buffer = new StringBuffer (messageSize); 146 buffer.append("Message: " + index + " sent at: " + new Date ()); 147 if (buffer.length() > messageSize) { 148 return buffer.substring(0, messageSize); 149 } 150 for (int i = buffer.length(); i < messageSize; i++) { 151 buffer.append(' '); 152 } 153 return buffer.toString(); 154 } 155 156 157 public void setPersistent(boolean durable) { 158 this.persistent = durable; 159 } 160 public void setMessageCount(int messageCount) { 161 this.messageCount = messageCount; 162 } 163 public void setMessageSize(int messageSize) { 164 this.messageSize = messageSize; 165 } 166 public void setPassword(String pwd) { 167 this.password = pwd; 168 } 169 public void setSleepTime(long sleepTime) { 170 this.sleepTime = sleepTime; 171 } 172 public void setSubject(String subject) { 173 this.subject = subject; 174 } 175 public void setTimeToLive(long timeToLive) { 176 this.timeToLive = timeToLive; 177 } 178 public void setTopic(boolean topic) { 179 this.topic = topic; 180 } 181 public void setQueue(boolean queue) { 182 this.topic = !queue; 183 } 184 public void setTransacted(boolean transacted) { 185 this.transacted = transacted; 186 } 187 public void setUrl(String url) { 188 this.url = url; 189 } 190 public void setUser(String user) { 191 this.user = user; 192 } 193 public void setVerbose(boolean verbose) { 194 this.verbose = verbose; 195 } 196 } 197 | Popular Tags |