1 4 package nl.justobjects.pushlet.core; 5 6 import java.io.IOException ; 7 8 14 public class Controller implements Protocol, ConfigDefs { 15 16 private Session session; 17 18 public Controller(Session theSession) { 19 session = theSession; 20 } 21 22 23 public void doCommand(Command aCommand) { 24 try { 25 session.kick(); 27 28 session.setAddress(aCommand.httpReq.getRemoteAddr()); 30 31 debug("doCommand() event=" + aCommand.reqEvent); 32 33 String eventType = aCommand.reqEvent.getEventType(); 35 36 if (eventType.equals(Protocol.E_REFRESH)) { 38 doRefresh(aCommand); 40 } else if (eventType.equals(Protocol.E_SUBSCRIBE)) { 41 doSubscribe(aCommand); 43 } else if (eventType.equals(Protocol.E_UNSUBSCRIBE)) { 44 doUnsubscribe(aCommand); 46 } else if (eventType.equals(Protocol.E_JOIN)) { 47 doJoin(aCommand); 49 } else if (eventType.equals(Protocol.E_JOIN_LISTEN)) { 50 doJoinListen(aCommand); 52 } else if (eventType.equals(Protocol.E_LEAVE)) { 53 doLeave(aCommand); 55 } else if (eventType.equals(Protocol.E_HEARTBEAT)) { 56 doHeartbeat(aCommand); 58 } else if (eventType.equals(Protocol.E_PUBLISH)) { 59 doPublish(aCommand); 61 } else if (eventType.equals(Protocol.E_LISTEN)) { 62 doListen(aCommand); 64 } 65 66 if (eventType.endsWith(Protocol.E_LISTEN) || 68 eventType.equals(Protocol.E_REFRESH)) { 69 getSubscriber().fetchEvents(aCommand); 72 73 } else { 74 sendControlResponse(aCommand); 76 } 77 78 } catch (Throwable t) { 79 warn("Exception in doCommand(): " + t); 80 t.printStackTrace(); 81 } 82 } 83 84 public String toString() { 85 return session.toString(); 86 } 87 88 89 protected void doHeartbeat(Command aCommand) { 90 91 aCommand.setResponseEvent(new Event(E_HEARTBEAT_ACK)); 93 } 94 95 96 protected void doJoin(Command aCommand) throws IOException { 97 98 Event responseEvent = null; 99 100 try { 101 102 session.start(); 103 104 String format = aCommand.reqEvent.getField(P_FORMAT, FORMAT_JAVASCRIPT); 107 108 session.setFormat(format); 109 responseEvent = new Event(E_JOIN_ACK); 110 111 responseEvent.setField(P_ID, session.getId()); 113 responseEvent.setField(P_FORMAT, format); 114 info("joined"); 115 } catch (Throwable t) { 116 session.stop(); 117 responseEvent = new Event(E_NACK); 118 responseEvent.setField(P_ID, session.getId()); 119 responseEvent.setField(P_REASON, "unexpected error: " + t); 120 warn("doJoin() error: " + t); 121 t.printStackTrace(); 122 } finally { 123 aCommand.setResponseEvent(responseEvent); 125 } 126 127 } 128 129 130 protected void doJoinListen(Command aCommand) throws IOException { 131 132 137 doJoin(aCommand); 139 if (!aCommand.getResponseEvent().getEventType().equals(E_NACK)) { 140 doListen(aCommand); 142 if (!aCommand.getResponseEvent().getEventType().equals(E_NACK)) { 143 aCommand.getResponseEvent().setField(P_EVENT, E_JOIN_LISTEN_ACK); 145 } 146 } 147 } 148 149 150 protected void doLeave(Command aCommand) throws IOException { 151 152 Event responseEvent = null; 153 154 try { 155 getSubscriber().bailout(); 157 158 responseEvent = new Event(E_LEAVE_ACK); 160 161 responseEvent.setField(P_ID, session.getId()); 163 info("left"); 164 } catch (Throwable t) { 165 responseEvent = new Event(E_NACK); 166 responseEvent.setField(P_ID, session.getId()); 167 responseEvent.setField(P_REASON, "unexpected error: " + t); 168 warn("doLeave() error: " + t); 169 t.printStackTrace(); 170 } finally { 171 aCommand.setResponseEvent(responseEvent); 173 } 174 175 } 176 177 178 protected void doListen(Command aCommand) throws IOException { 179 180 181 String mode = MODE_STREAM; 182 if (Config.getBoolProperty(LISTEN_FORCE_PULL_ALL)) { 184 mode = MODE_PULL; 185 } else { 186 189 mode = aCommand.reqEvent.getField(P_MODE, MODE_STREAM); 193 194 String userAgent = aCommand.httpReq.getHeader("User-Agent"); 195 if (userAgent != null) { 196 userAgent = userAgent.toLowerCase(); 197 for (int i = 0; i < session.FORCED_PULL_AGENTS.length; i++) { 198 if ((userAgent.indexOf(session.FORCED_PULL_AGENTS[i]) != -1)) { 199 info("Forcing pull mode for agent=" + userAgent); 200 mode = MODE_PULL; 201 break; 202 } 203 } 204 } else { 205 userAgent = "unknown"; 206 } 207 } 208 209 getSubscriber().setMode(mode); 210 211 Event listenAckEvent = new Event(E_LISTEN_ACK); 213 214 String subject = aCommand.reqEvent.getField(P_SUBJECT); 216 if (subject != null) { 217 String label = aCommand.reqEvent.getField(Protocol.P_SUBSCRIPTION_LABEL); 219 220 Subscription subscription = getSubscriber().addSubscription(subject, label); 222 223 listenAckEvent.setField(P_SUBSCRIPTION_ID, subscription.getId()); 225 if (label != null) { 226 listenAckEvent.setField(P_SUBSCRIPTION_LABEL, label); 227 } 228 } 229 230 listenAckEvent.setField(P_ID, session.getId()); 232 listenAckEvent.setField(P_MODE, mode); 233 listenAckEvent.setField(P_FORMAT, session.getFormat()); 234 235 getSubscriber().activate(); 237 238 aCommand.setResponseEvent(listenAckEvent); 240 241 info("Listening mode=" + mode + " userAgent=" + session.getUserAgent()); 242 243 } 244 245 246 protected void doPublish(Command aCommand) { 247 Event responseEvent = null; 248 249 try { 250 String subject = aCommand.reqEvent.getField(Protocol.P_SUBJECT); 251 if (subject == null) { 252 responseEvent = new Event(E_NACK); 254 responseEvent.setField(P_ID, session.getId()); 255 responseEvent.setField(P_REASON, "no subject provided"); 256 } else { 257 aCommand.reqEvent.setField(P_FROM, session.getId()); 258 aCommand.reqEvent.setField(P_EVENT, E_DATA); 259 260 String to = aCommand.reqEvent.getField(P_TO); 262 if (to != null) { 263 Dispatcher.getInstance().unicast(aCommand.reqEvent, to); 264 } else { 265 debug("doPublish() event=" + aCommand.reqEvent); 267 Dispatcher.getInstance().multicast(aCommand.reqEvent); 268 } 269 270 responseEvent = new Event(E_PUBLISH_ACK); 272 } 273 274 } catch (Throwable t) { 275 responseEvent = new Event(E_NACK); 276 responseEvent.setField(P_ID, session.getId()); 277 responseEvent.setField(P_REASON, "unexpected error: " + t); 278 warn("doPublish() error: " + t); 279 t.printStackTrace(); 280 } finally { 281 aCommand.setResponseEvent(responseEvent); 283 } 284 } 285 286 287 protected void doRefresh(Command aCommand) { 288 aCommand.setResponseEvent(new Event(E_REFRESH_ACK)); 290 } 291 292 293 protected void doSubscribe(Command aCommand) throws IOException { 294 295 Event responseEvent = null; 296 try { 297 String subject = aCommand.reqEvent.getField(Protocol.P_SUBJECT); 298 Subscription subscription = null; 299 if (subject == null) { 300 responseEvent = new Event(E_NACK); 302 responseEvent.setField(P_ID, session.getId()); 303 responseEvent.setField(P_REASON, "no subject provided"); 304 } else { 305 306 String label = aCommand.reqEvent.getField(Protocol.P_SUBSCRIPTION_LABEL); 307 subscription = getSubscriber().addSubscription(subject, label); 308 309 responseEvent = new Event(E_SUBSCRIBE_ACK); 311 responseEvent.setField(P_ID, session.getId()); 312 responseEvent.setField(P_SUBJECT, subject); 313 responseEvent.setField(P_SUBSCRIPTION_ID, subscription.getId()); 314 if (label != null) { 315 responseEvent.setField(P_SUBSCRIPTION_LABEL, label); 316 } 317 info("subscribed to " + subject + " sid=" + subscription.getId()); 318 } 319 320 } catch (Throwable t) { 321 responseEvent = new Event(E_NACK); 322 responseEvent.setField(P_ID, session.getId()); 323 responseEvent.setField(P_REASON, "unexpected error: " + t); 324 warn("doSubscribe() error: " + t); 325 t.printStackTrace(); 326 } finally { 327 aCommand.setResponseEvent(responseEvent); 329 } 330 } 331 332 333 protected void doUnsubscribe(Command aCommand) throws IOException { 334 335 336 Event responseEvent = null; 337 try { 338 String subscriptionId = aCommand.reqEvent.getField(Protocol.P_SUBSCRIPTION_ID); 339 if (subscriptionId == null) { 340 getSubscriber().removeSubscriptions(); 342 responseEvent = new Event(E_UNSUBSCRIBE_ACK); 343 responseEvent.setField(P_ID, session.getId()); 344 info("unsubscribed all"); 345 } else { 346 Subscription subscription = getSubscriber().removeSubscription(subscriptionId); 348 if (subscription == null) { 349 responseEvent = new Event(E_NACK); 351 responseEvent.setField(P_ID, session.getId()); 352 responseEvent.setField(P_REASON, "no subscription for sid=" + subscriptionId); 353 warn("unsubscribe: no subscription for sid=" + subscriptionId); 354 } else { 355 responseEvent = new Event(E_UNSUBSCRIBE_ACK); 357 responseEvent.setField(P_ID, session.getId()); 358 responseEvent.setField(P_SUBSCRIPTION_ID, subscription.getId()); 359 responseEvent.setField(P_SUBJECT, subscription.getSubject()); 360 if (subscription.getLabel() != null) { 361 responseEvent.setField(P_SUBSCRIPTION_LABEL, subscription.getLabel()); 362 } 363 info("unsubscribed sid= " + subscriptionId); 364 } 365 } 366 } catch (Throwable t) { 367 responseEvent = new Event(E_NACK); 368 responseEvent.setField(P_ID, session.getId()); 369 responseEvent.setField(P_REASON, "unexpected error: " + t); 370 warn("doUnsubscribe() error: " + t); 371 t.printStackTrace(); 372 } finally { 373 aCommand.setResponseEvent(responseEvent); 375 } 376 } 377 378 public Subscriber getSubscriber() { 379 return session.getSubscriber(); 380 } 381 382 383 protected void sendControlResponse(Command aCommand) { 384 try { 385 386 aCommand.sendResponseHeaders(); 388 389 aCommand.getClientAdapter().start(); 391 392 aCommand.getClientAdapter().push(aCommand.getResponseEvent()); 394 395 aCommand.getClientAdapter().stop(); 397 } catch (Throwable t) { 398 getSubscriber().bailout(); 399 return; 400 } 401 } 402 403 404 405 protected void info(String s) { 406 session.info("[Controller] " + s); 407 } 408 409 410 protected void warn(String s) { 411 session.warn("[Controller] " + s); 412 } 413 414 415 protected void debug(String s) { 416 session.debug("[Controller] " + s); 417 } 418 419 420 } 421 422 451 | Popular Tags |