1 4 package nl.justobjects.pushlet.core; 5 6 import nl.justobjects.pushlet.util.PushletException; 7 import nl.justobjects.pushlet.util.Rand; 8 import nl.justobjects.pushlet.util.Sys; 9 10 import java.util.Collections ; 11 import java.util.HashMap ; 12 import java.util.Map ; 13 14 20 public class Subscriber implements Protocol, ConfigDefs { 21 private Session session; 22 23 24 private EventQueue eventQueue = new EventQueue(Config.getIntProperty(QUEUE_SIZE)); 25 26 27 private String refreshURL; 28 private long queueReadTimeoutMillis = Config.getLongProperty(QUEUE_READ_TIMEOUT_MILLIS); 29 private long queueWriteTimeoutMillis = Config.getLongProperty(QUEUE_WRITE_TIMEOUT_MILLIS); 30 private long refreshTimeoutMillis = Config.getLongProperty(PULL_REFRESH_TIMEOUT_MILLIS); 31 volatile long lastAlive = Sys.now(); 32 33 34 private Map subscriptions = Collections.synchronizedMap(new HashMap (13)); 35 36 37 private volatile boolean active; 38 39 40 private String mode; 41 42 43 private long eventSeqNr = 1; 44 45 public Subscriber(Session theSession) { 46 session = theSession; 47 } 48 49 public void activate() { 50 active = true; 51 } 52 53 public void passivate() { 54 active = false; 55 } 56 57 public void bailout() { 58 59 passivate(); 60 61 removeSubscriptions(); 62 63 session.stop(); 64 65 } 66 67 68 public boolean isActive() { 69 return active; 70 } 71 72 73 public Session getSession() { 74 return session; 75 } 76 77 78 public String getId() { 79 return session.getId(); 80 } 81 82 83 public Subscription[] getSubscriptions() { 84 return (Subscription[]) subscriptions.values().toArray(new Subscription[0]); 86 } 87 88 89 public Subscription addSubscription(String aSubject, String aLabel) { 90 Subscription subscription = new Subscription(aSubject, aLabel); 91 subscriptions.put(subscription.getId(), subscription); 92 info("Subscription added subject=" + aSubject + " sid=" + subscription.getId() + " label=" + aLabel); 93 return subscription; 94 } 95 96 97 public Subscription removeSubscription(String aSubscriptionId) { 98 Subscription subscription = (Subscription) subscriptions.remove(aSubscriptionId); 99 if (subscription == null) { 100 warn("No subscription found sid=" + aSubscriptionId); 101 return null; 102 } 103 info("Subscription removed subject=" + subscription.getSubject() + " sid=" + subscription.getId() + " label=" + subscription.getLabel()); 104 return subscription; 105 } 106 107 108 public void removeSubscriptions() { 109 subscriptions.clear(); 110 } 111 112 public String getMode() { 113 return mode; 114 } 115 116 public void setMode(String aMode) { 117 mode = aMode; 118 } 119 120 public long getRefreshTimeMillis() { 121 String minWaitProperty = PULL_REFRESH_WAIT_MIN_MILLIS; 122 String maxWaitProperty = PULL_REFRESH_WAIT_MAX_MILLIS; 123 if (mode.equals((MODE_POLL))) { 124 minWaitProperty = POLL_REFRESH_WAIT_MIN_MILLIS; 125 maxWaitProperty = POLL_REFRESH_WAIT_MAX_MILLIS; 126 127 } 128 return Rand.randomLong(Config.getLongProperty(minWaitProperty), 129 Config.getLongProperty(maxWaitProperty)); 130 } 131 132 133 public void fetchEvents(Command aCommand) throws PushletException { 134 135 refreshURL = aCommand.httpReq.getRequestURI() + "?" + P_ID + "=" + session.getId() + "&" + P_EVENT + "=" + E_REFRESH; 136 137 if (mode.equals(MODE_POLL)) { 139 queueReadTimeoutMillis = 0; 140 refreshTimeoutMillis = Config.getLongProperty(POLL_REFRESH_TIMEOUT_MILLIS); 141 } 142 143 aCommand.httpRsp.setBufferSize(128); 145 146 aCommand.sendResponseHeaders(); 148 149 ClientAdapter clientAdapter = aCommand.getClientAdapter(); 151 try { 152 clientAdapter.start(); 153 154 clientAdapter.push(aCommand.getResponseEvent()); 156 } catch (Throwable t) { 157 bailout(); 158 return; 159 } 160 161 162 Event[] events = null; 163 164 166 while (isActive()) { 167 lastAlive = Sys.now(); 169 170 session.kick(); 172 173 try { 177 events = eventQueue.deQueueAll(queueReadTimeoutMillis); 178 } catch (InterruptedException ie) { 179 warn("interrupted"); 180 bailout(); 181 } 182 183 if (events == null) { 185 events = new Event[1]; 186 events[0] = new Event(E_HEARTBEAT); 187 } 188 189 191 for (int i = 0; i < events.length; i++) { 194 if (events[i].getEventType().equals(E_ABORT)) { 196 warn("Aborting Subscriber"); 197 bailout(); 198 } 199 200 try { 202 events[i].setField(P_SEQ, eventSeqNr++); 204 205 clientAdapter.push(events[i]); 207 } catch (Throwable t) { 208 bailout(); 209 return; 210 } 211 } 212 213 if (mode.equals(MODE_PULL) || mode.equals(MODE_POLL)) { 215 Event refreshEvent = new Event(E_REFRESH); 216 217 refreshEvent.setField(P_WAIT, "" + getRefreshTimeMillis()); 219 refreshEvent.setField(P_URL, refreshURL); 220 221 try { 222 clientAdapter.push(refreshEvent); 224 225 clientAdapter.stop(); 227 } catch (Throwable t) { 228 bailout(); 230 } finally { 231 break; 233 } 234 } 235 } 236 } 237 238 239 public Subscription match(Event event) { 240 Subscription[] subscriptions = getSubscriptions(); 241 for (int i = 0; i < subscriptions.length; i++) { 242 if (subscriptions[i].match(event)) { 243 return subscriptions[i]; 244 } 245 } 246 return null; 247 } 248 249 250 public void onEvent(Event theEvent) { 251 if (!isActive()) { 252 return; 253 } 254 255 257 long now = Sys.now(); 261 if (now - lastAlive > refreshTimeoutMillis) { 262 warn("not alive for at least: " + refreshTimeoutMillis + "ms, leaving..."); 263 bailout(); 264 return; 265 } 266 267 try { 269 if (!eventQueue.enQueue(theEvent, queueWriteTimeoutMillis)) { 270 warn("queue full, bailing out..."); 271 bailout(); 272 } 273 274 } catch (InterruptedException ie) { 277 bailout(); 278 } 279 280 } 281 282 283 protected void info(String s) { 284 session.info("[Subscriber] " + s); 285 } 286 287 288 protected void warn(String s) { 289 session.warn("[Subscriber] " + s); 290 } 291 292 293 protected void debug(String s) { 294 session.debug("[Subscriber] " + s); 295 } 296 297 298 public String toString() { 299 return session.toString(); 300 } 301 } 302 303 382 | Popular Tags |