1 4 package nl.justobjects.pushlet.test; 5 6 import nl.justobjects.pushlet.client.PushletClient; 7 import nl.justobjects.pushlet.client.PushletClientListener; 8 import nl.justobjects.pushlet.core.Event; 9 import nl.justobjects.pushlet.core.Protocol; 10 import nl.justobjects.pushlet.util.PushletException; 11 import nl.justobjects.pushlet.util.Rand; 12 13 import java.util.HashMap ; 14 import java.util.Map ; 15 16 26 public class StressTester implements Protocol { 27 static private String host = "localhost"; 28 static private int port = 8080; 29 static private int TESTER_COUNT = 10; 30 private static final String SUBJECT = "/test/ping"; 31 private static final long MIN_PUBLISH_INTERVAL_MILLIS = 200; 32 private static final long MAX_PUBLISH_INTERVAL_MILLIS = 1000; 33 private static final long MIN_SUBSCRIBER_INTERVAL_MILLIS = 500; 34 private static final long MAX_SUBSCRIBER_INTERVAL_MILLIS = 1000; 35 36 public StressTester() { 37 } 38 39 public void run() { 40 new EventPublisher().start(); 41 new EventSubscriber().start(); 42 } 43 44 45 public void err(String s) { 46 System.out.println("[StressTester] ERROR" + s); 47 } 48 49 50 public void p(String s) { 51 System.out.println("[StressTester] " + s); 52 } 53 54 private class EventSubscriber extends Thread implements PushletClientListener { 55 private PushletClient pushletClient; 56 57 public void run() { 58 while (true) { 59 try { 62 pushletClient = new PushletClient(host, port); 63 pushletClient.join(); 65 pushletClient.listen(this, Protocol.MODE_STREAM); 66 String subscriptionId = pushletClient.subscribe(SUBJECT); 69 pushletClient.unsubscribe(subscriptionId); 70 71 subscriptionId = pushletClient.subscribe(SUBJECT); 73 sleepRandom(); 75 pushletClient.unsubscribe(subscriptionId); 77 pushletClient.leave(); 78 79 } catch (Throwable t) { 80 err("Error in EventSubscriber t=" + t); 81 return; 82 } 83 } 84 } 85 86 87 public void onError(String message) { 88 } 90 91 92 public void onAbort(Event theEvent) { 93 } 95 96 97 public void onData(Event theEvent) { 98 long then = Long.parseLong(theEvent.getField("time")); 100 long delay = System.currentTimeMillis() - then; 101 } 103 104 105 public void onHeartbeat(Event theEvent) { 106 } 108 109 private void sleepRandom() throws InterruptedException { 110 Thread.sleep(Rand.randomLong(MIN_SUBSCRIBER_INTERVAL_MILLIS, MAX_SUBSCRIBER_INTERVAL_MILLIS)); 111 } 112 } 113 114 private class EventPublisher extends Thread { 115 private PushletClient pushletClient; 116 117 public void run() { 118 try { 121 pushletClient = new PushletClient(host, port); 122 pushletClient.join(); 123 124 } catch (PushletException pe) { 126 err("Error in EventPublisher pe=" + pe); 127 return; 128 } 129 130 Map eventData = new HashMap (2); 132 int seqNr = 1; 133 while (true) { 134 try { 135 eventData.put("seqNr", "" + seqNr++); 137 eventData.put("time", "" + System.currentTimeMillis()); 138 139 pushletClient.publish(SUBJECT, eventData); 141 142 Thread.sleep(Rand.randomLong(MIN_PUBLISH_INTERVAL_MILLIS, MAX_PUBLISH_INTERVAL_MILLIS)); 143 } catch (Exception e) { 144 p("EventPublisher exception: " + e); 145 return; 146 } 147 } 148 } 149 150 } 151 152 153 public static void main(String args[]) { 154 if (args.length > 0) { 155 TESTER_COUNT = Integer.parseInt(args[0]); 156 } 157 if (args.length == 3) { 158 host = args[1]; 159 port = Integer.parseInt(args[2]); 160 } 161 162 for (int i = 0; i < TESTER_COUNT; i++) { 163 new StressTester().run(); 164 } 165 166 } 167 } 168 169 170 177 | Popular Tags |