1 18 import java.util.Arrays ; 19 20 import javax.jms.*; 21 22 import org.apache.activemq.ActiveMQConnectionFactory; 23 24 27 public class TopicPublisher implements MessageListener 28 { 29 private final Object mutex = new Object (); 30 private Connection connection; 31 private Session session; 32 private MessageProducer publisher; 33 private Topic topic; 34 private Topic control; 35 36 private String url="tcp://localhost:61616"; 38 private int size=256; 39 private int subscribers=1; 40 private int remaining; 41 private int messages=10000; 42 private long delay; 43 private int batch=40; 44 45 private byte[] payload; 46 private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); 47 48 public static void main(String [] argv) throws Exception 49 { 50 TopicPublisher p = new TopicPublisher(); 51 String [] unknonwn = CommnadLineSupport.setOptions(p, argv); 52 if (unknonwn.length > 0) { 53 System.out.println("Unknown options: " + Arrays.toString(unknonwn)); 54 System.exit(-1); 55 } 56 p.run(); 57 } 58 59 private void run() throws Exception 60 { 61 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); 62 connection = factory.createConnection(); 63 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 64 topic = session.createTopic("topictest.messages"); 65 control = session.createTopic("topictest.control"); 66 67 publisher = session.createProducer(topic); 68 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 69 70 payload = new byte[size]; 71 for(int i = 0; i < size; i++) 72 { 73 payload[i] = (byte) DATA[i % DATA.length]; 74 } 75 76 session.createConsumer(control).setMessageListener(this); 77 connection.start(); 78 79 long[] times = new long[batch]; 80 for(int i = 0; i < batch; i++) 81 { 82 if(i > 0) Thread.sleep(delay*1000); 83 times[i] = batch(messages); 84 System.out.println("Batch " + (i+1) + " of " + batch + " completed in " + times[i] + " ms."); 85 } 86 87 long min = min(times); 88 long max = max(times); 89 System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max)); 90 91 publisher.send(session.createTextMessage("SHUTDOWN")); 93 94 connection.stop(); 95 connection.close(); 96 } 97 98 private long batch(int msgCount) throws Exception 99 { 100 long start = System.currentTimeMillis(); 101 remaining=subscribers; 102 publish(); 103 waitForCompletion(); 104 return System.currentTimeMillis() - start; 105 } 106 107 private void publish() throws Exception 108 { 109 110 BytesMessage msg = session.createBytesMessage(); 112 msg.writeBytes(payload); 113 for (int i = 0; i < messages; i++) 114 { 115 publisher.send(msg); 116 if ((i + 1) % 1000 == 0) 117 { 118 System.out.println("Sent " + (i + 1) + " messages"); 119 } 120 } 121 122 publisher.send(session.createTextMessage("REPORT")); 124 } 125 126 private void waitForCompletion() throws Exception 127 { 128 System.out.println("Waiting for completion..."); 129 synchronized (mutex) 130 { 131 while (remaining > 0) 132 { 133 mutex.wait(); 134 } 135 } 136 } 137 138 139 public void onMessage(Message message) 140 { 141 synchronized (mutex) 142 { 143 System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining"); 144 if (remaining == 0) 145 { 146 mutex.notify(); 147 } 148 } 149 } 150 151 Object getReport(Message m) 152 { 153 try 154 { 155 return ((TextMessage) m).getText(); 156 } 157 catch (JMSException e) 158 { 159 e.printStackTrace(System.out); 160 return e.toString(); 161 } 162 } 163 164 static long min(long[] times) 165 { 166 long min = times.length > 0 ? times[0] : 0; 167 for(int i = 0; i < times.length; i++) 168 { 169 min = Math.min(min, times[i]); 170 } 171 return min; 172 } 173 174 static long max(long[] times) 175 { 176 long max = times.length > 0 ? times[0] : 0; 177 for(int i = 0; i < times.length; i++) 178 { 179 max = Math.max(max, times[i]); 180 } 181 return max; 182 } 183 184 static long avg(long[] times, long min, long max) 185 { 186 long sum = 0; 187 for(int i = 0; i < times.length; i++) 188 { 189 sum += times[i]; 190 } 191 sum -= min; 192 sum -= max; 193 return (sum / times.length - 2); 194 } 195 196 public void setBatch(int batch) { 197 this.batch = batch; 198 } 199 public void setDelay(long delay) { 200 this.delay = delay; 201 } 202 public void setMessages(int messages) { 203 this.messages = messages; 204 } 205 public void setSize(int size) { 206 this.size = size; 207 } 208 public void setSubscribers(int subscribers) { 209 this.subscribers = subscribers; 210 } 211 public void setUrl(String url) { 212 this.url = url; 213 } 214 } 215 | Popular Tags |