1 4 package nl.justobjects.pushlet.test; 5 6 import nl.justobjects.pushlet.core.Dispatcher; 7 import nl.justobjects.pushlet.core.Event; 8 import nl.justobjects.pushlet.core.EventSource; 9 10 import java.io.BufferedReader ; 11 import java.io.IOException ; 12 import java.io.InputStream ; 13 import java.io.InputStreamReader ; 14 import java.net.URL ; 15 import java.util.Random ; 16 import java.util.StringTokenizer ; 17 import java.util.Vector ; 18 19 25 public class TestEventPushSources { 26 27 static public class AEXStocksEventPushSourceABN { 28 String pageURL = "http://ri2.rois.com/E36msPtnZC0e15CVb4KT97JAGfGSfCcrvv6*FcyZIoNyh/CTIB/RI2APISNAP?RIC=0%23.AEX&FORMAT=XML"; 29 } 32 33 34 static public class AEXStocksEventPushSource implements EventSource, Runnable { 35 36 String pageURL = "http://www.debeurs.nl/koersen/aex.asp"; 37 Thread thread = null; 38 volatile boolean active = false; 39 40 public final static int NR_OF_STOCKS = 24; 42 43 public final static String EMPTY = "wait..."; 44 private int restarts = 1; 45 46 class Stock { 47 public String name = EMPTY; 48 public String rate = EMPTY; 49 volatile public boolean modified = false; 50 } 51 52 Vector stocksCache = new Vector (NR_OF_STOCKS); 53 54 public AEXStocksEventPushSource() { 55 for (int i = 0; i < NR_OF_STOCKS; i++) { 56 stocksCache.addElement(new Stock()); 57 } 58 } 60 61 62 synchronized public void activate() { 63 e("activating..."); 64 stopThread(); 66 67 thread = new Thread (this, "AEXStocksPublisher-" + (restarts++)); 69 active = true; 70 thread.start(); 71 e("activated"); 72 } 73 74 75 synchronized public void passivate() { 76 e("passivating..."); 77 active = false; 78 stopThread(); 79 80 for (int i = 0; i < NR_OF_STOCKS; i++) { 83 ((Stock) stocksCache.elementAt(i)).modified = true; 84 } 85 86 e("passivated"); 87 } 88 89 90 91 synchronized public void stop() { 92 } 93 94 public void run() { 95 publishStocks(); 97 98 int count = 5; while (active) { 100 101 if (count == 5) { 104 updateCache(); 105 count = 0; 106 } 107 count++; 108 109 sendUpdates(); 111 112 if (thread == null || thread.isInterrupted()) { 114 break; 115 } 116 117 try { 119 thread.sleep(2000); 120 } catch (InterruptedException ie) { 121 break; 122 } 123 } 124 125 thread = null; 127 active = false; 128 } 129 130 private String getStocksLine() { 131 BufferedReader br = null; 132 InputStream is = null; 133 String nextLine = ""; 134 135 try { 137 is = new URL (pageURL).openStream(); 138 br = new BufferedReader (new InputStreamReader (is)); 139 boolean foundLine = false; 140 while (!foundLine) { 141 nextLine = br.readLine(); 142 if (nextLine == null) { 143 return ""; 144 } 145 foundLine = (nextLine.indexOf("details.asp?iid=14053&parent=aex") != -1); 146 } 147 } catch (Exception e) { 148 e("could not open or read URL pageURL=" + pageURL + " ex=" + e); 149 return ""; 150 } finally { 151 try { 152 if (is != null) is.close(); 153 } catch (IOException ignore) { 154 } 155 } 156 return nextLine; 157 } 158 159 private void publishStocks() { 160 for (int i = 0; i < NR_OF_STOCKS; i++) { 162 Stock nextStock = (Stock) stocksCache.elementAt(i); 163 164 if (nextStock.modified) { 166 publishStock(i, nextStock.name, nextStock.rate); 167 nextStock.modified = false; 168 try { 169 Thread.sleep(400); 170 } catch (InterruptedException ie) { 171 return; 172 } 173 } 174 } 175 } 176 177 private void publishStock(int index, String name, String rate) { 178 Event event = Event.createDataEvent("/stocks/aex"); 179 event.setField("number", index + ""); 180 event.setField("name", name); 181 event.setField("rate", rate); 182 p("publish: nr=" + index + " name=" + name + " rate=" + rate); 183 Dispatcher.getInstance().multicast(event); 184 } 185 186 private void sendUpdates() { 187 p("sending updates"); 188 int randomIndex = randomInt(0, NR_OF_STOCKS - 1); 191 Stock randomStock = (Stock) stocksCache.elementAt(randomIndex); 192 randomStock.modified = true; 193 194 publishStocks(); 195 } 196 197 private void stopThread() { 198 if (thread != null) { 199 thread.interrupt(); 200 thread = null; 201 } 202 } 203 204 private void updateCache() { 205 p("updating Cache"); 206 207 String stocksLine = getStocksLine(); 209 if ("".equals(stocksLine)) { 210 e("updateCache: stocksLine == null"); 211 return; 212 } 213 214 String delim = "<>"; 219 StringTokenizer st = new StringTokenizer (stocksLine, delim); 220 String nextToken = ""; 221 int count = 0; 222 String nextStock = ""; 223 String nextQuote = ""; 224 String currentQuote = null; 225 int index = -1; 226 while (st.hasMoreTokens()) { 227 nextToken = st.nextToken(); 228 count++; 229 if ((count - 5) % 57 == 0) { 231 p("c=" + count + " s=" + nextToken); 232 nextStock = nextToken; 233 } 234 235 if ((count - 10) % 57 == 0) { 237 nextQuote = nextToken; 238 index++; 239 p("c=" + count + " val=" + nextQuote); 240 Stock currentStock = (Stock) stocksCache.elementAt(index); 241 242 if (EMPTY.equals(currentStock.rate) || !currentStock.rate.equals(nextQuote)) { 244 p("modified: " + nextStock); 245 currentStock.name = nextStock; 246 currentStock.rate = nextQuote; 247 currentStock.modified = true; 248 } 249 } 250 } 251 } 252 } 253 254 private static Random random = new Random (); 255 256 257 public static int randomInt(int min, int max) { 258 int nextRandom = random.nextInt(); 259 return min + (nextRandom < 0 ? -nextRandom : nextRandom) % (max - min + 1); 260 } 261 262 263 264 public static void e(String s) { 265 System.out.println("AEXStocksEventPushSource: " + s); 266 } 267 268 269 public static void p(String s) { 270 } 272 273 274 public static void main(String [] args) { 275 } 277 } 278 279 328 | Popular Tags |