1 4 package nl.justobjects.pushlet.client; 5 6 import nl.justobjects.pushlet.core.Event; 7 import nl.justobjects.pushlet.core.EventParser; 8 import nl.justobjects.pushlet.core.Protocol; 9 import nl.justobjects.pushlet.util.PushletException; 10 11 import java.io.IOException ; 12 import java.io.InputStreamReader ; 13 import java.io.Reader ; 14 import java.io.OutputStream ; 15 import java.net.URL ; 16 import java.net.URLConnection ; 17 import java.net.Authenticator ; 18 import java.net.PasswordAuthentication ; 19 import java.util.Map ; 20 21 35 public class PushletClient implements Protocol { 36 37 private String pushletURL; 38 39 40 private boolean debug; 41 42 43 private String id; 44 45 46 private DataEventListener dataEventListener; 47 48 49 public PushletClient(String aPushletURL) { 50 pushletURL = aPushletURL; 51 } 52 53 54 public PushletClient(String aHost, int aPort) { 55 this("http://" + aHost + ":" + aPort + DEFAULT_SERVLET_URI); 56 } 57 58 71 public void setProxyOptions(String aProxyHost, 72 String aProxyPort, String theNonProxyHosts, 73 String aUserName, String aPassword, String anNTLMDomain) { 74 75 System.setProperty("http.proxySet", "true"); 77 System.setProperty("http.proxyHost", aProxyHost); 78 System.setProperty("http.proxyPort", aProxyPort); 79 80 if (theNonProxyHosts != null) { 82 System.setProperty("http.nonProxyHosts", theNonProxyHosts); 83 } 84 85 if (aUserName != null) { 87 System.setProperty("http.proxyUser", aUserName); 88 System.setProperty("http.proxyPassword", aPassword); 89 90 Authenticator.setDefault(new HTTPAuthenticateProxy(aUserName, aPassword)); 92 93 if (anNTLMDomain != null) { 95 System.setProperty("http.auth.ntlm.domain", anNTLMDomain); 96 } 97 } 98 } 99 100 101 public void join() throws PushletException { 102 Event event = new Event(E_JOIN); 103 event.setField(P_FORMAT, FORMAT_XML); 104 Event response = doControl(event); 105 throwOnNack(response); 106 107 id = response.getField(P_ID); 109 } 110 111 112 public void leave() throws PushletException { 113 stopListen(); 114 throwOnInvalidSession(); 115 Event event = new Event(E_LEAVE); 116 event.setField(P_ID, id); 117 Event response = doControl(event); 118 119 throwOnNack(response); 120 id = null; 121 } 122 123 124 public void listen(PushletClientListener aListener) throws PushletException { 125 listen(aListener, MODE_STREAM); 126 } 127 128 129 public void listen(PushletClientListener aListener, String aMode) throws PushletException { 130 listen(aListener, aMode, null); 131 } 132 133 134 public void listen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException { 135 throwOnInvalidSession(); 136 stopListen(); 137 138 String listenURL = pushletURL 139 + "?" + P_EVENT + "=" + E_LISTEN 140 + "&" + P_ID + "=" + id 141 + "&" + P_MODE + "=" + aMode 142 ; 143 if (aSubject != null) { 144 listenURL = listenURL + "&" + P_SUBJECT + "=" + aSubject; 145 } 146 147 dataEventListener = new DataEventListener(aListener, listenURL); 148 dataEventListener.start(); 149 } 150 151 152 public void joinListen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException { 153 stopListen(); 154 155 String listenURL = pushletURL 156 + "?" + P_EVENT + "=" + E_JOIN_LISTEN 157 + "&" + P_FORMAT + "=" + FORMAT_XML 158 + "&" + P_MODE + "=" + aMode 159 + "&" + P_SUBJECT + "=" + aSubject 160 ; 161 162 dataEventListener = new DataEventListener(aListener, listenURL); 163 dataEventListener.start(); 164 } 165 166 public void publish(String aSubject, Map theAttributes) throws PushletException { 167 throwOnInvalidSession(); 168 Event event = new Event(E_PUBLISH, theAttributes); 169 event.setField(P_SUBJECT, aSubject); 170 event.setField(P_ID, id); 171 Event response = doControl(event); 172 throwOnNack(response); 173 } 174 175 176 public String subscribe(String aSubject, String aLabel) throws PushletException { 177 throwOnInvalidSession(); 178 Event event = new Event(E_SUBSCRIBE); 179 event.setField(P_ID, id); 180 event.setField(P_SUBJECT, aSubject); 181 182 if (aLabel != null) { 184 event.setField(P_SUBSCRIPTION_LABEL, aLabel); 185 } 186 187 Event response = doControl(event); 189 throwOnNack(response); 190 191 return response.getField(P_SUBSCRIPTION_ID); 192 } 193 194 195 public String subscribe(String aSubject) throws PushletException { 196 return subscribe(aSubject, null); 197 } 198 199 200 public void unsubscribe(String aSubscriptionId) throws PushletException { 201 throwOnInvalidSession(); 202 Event event = new Event(E_UNSUBSCRIBE); 203 event.setField(P_ID, id); 204 205 if (aSubscriptionId != null) { 207 event.setField(P_SUBSCRIPTION_ID, aSubscriptionId); 208 } 209 210 Event response = doControl(event); 211 throwOnNack(response); 212 } 213 214 215 public void unsubscribe() throws PushletException { 216 unsubscribe(null); 217 } 218 219 220 public void stopListen() throws PushletException { 221 if (dataEventListener != null) { 222 unsubscribe(); 223 dataEventListener.stop(); 224 dataEventListener = null; 225 } 226 } 227 228 public void setDebug(boolean b) { 229 debug = b; 230 } 231 232 private void throwOnNack(Event anEvent) throws PushletException { 233 if (anEvent.getEventType().equals(E_NACK)) { 234 throw new PushletException("Negative response: reason=" + anEvent.getField(P_REASON)); 235 } 236 } 237 238 private void throwOnInvalidSession() throws PushletException { 239 if (id == null) { 240 throw new PushletException("Invalid pushlet session"); 241 } 242 } 243 244 private Reader openURL(String aURL) throws PushletException { 245 try { 247 p("Connecting to " + aURL); 248 URL url = new URL (aURL); 249 URLConnection urlConnection = url.openConnection(); 250 251 urlConnection.setUseCaches(false); 253 urlConnection.setDefaultUseCaches(false); 254 255 259 265 return new InputStreamReader (urlConnection.getInputStream()); 271 272 } catch (Throwable t) { 273 warn("openURL() could not open " + aURL, t); 274 throw new PushletException(" could not open " + aURL, t); 275 } 276 } 277 278 279 private Event doControl(Event aControlEvent) throws PushletException { 280 String controlURL = pushletURL + "?" + aControlEvent.toQueryString(); 281 282 p("doControl to " + controlURL); 283 284 Reader reader = openURL(controlURL); 286 287 Event event = null; 289 try { 290 p("Getting event..."); 291 event = EventParser.parse(reader); 293 p("Event received " + event); 294 return event; 295 } catch (Throwable t) { 296 warn("doControl() exception", t); 298 throw new PushletException(" error parsing response from" + controlURL, t); 299 } 300 } 301 302 303 private void p(String s) { 304 if (debug) { 305 System.out.println("[PushletClient] " + s); 306 } 307 } 308 309 310 private void warn(String s) { 311 warn(s, null); 312 } 313 314 315 private void warn(String s, Throwable t) { 316 System.err.println("[PushletClient] - WARN - " + s + " ex=" + t); 317 318 if (t != null) { 319 t.printStackTrace(); 320 } 321 } 322 323 324 private class DataEventListener implements Runnable { 325 326 private PushletClientListener listener; 327 328 329 private Thread receiveThread = null; 330 private Reader reader; 331 private String refreshURL; 332 private String listenURL; 333 334 public DataEventListener(PushletClientListener aListener, String aListenURL) { 335 listener = aListener; 336 listenURL = aListenURL; 337 } 338 339 public void start() { 340 receiveThread = new Thread (this); 342 receiveThread.start(); 343 344 } 345 346 347 public void stop() { 348 p("In stop()"); 349 bailout(); 350 } 351 352 353 public void run() { 354 p("Start run()"); 355 try { 356 while (receiveThread != null && receiveThread.isAlive()) { 357 reader = openURL(listenURL); 359 360 while (receiveThread != null && receiveThread.isAlive()) { 362 Event event = null; 363 try { 364 event = EventParser.parse(reader); 367 p("Event received " + event); 368 } catch (Throwable t) { 369 370 if (listener != null) { 373 listener.onError("exception during receive: " + t); 374 } 375 376 bailout(); 377 return; 378 } 379 380 if (event != null && listener != null) { 382 String eventType = event.getEventType(); 384 if (eventType.equals(E_HEARTBEAT)) { 385 listener.onHeartbeat(event); 386 } else if (eventType.equals(E_DATA)) { 387 listener.onData(event); 388 } else if (eventType.equals(E_JOIN_LISTEN_ACK)) { 389 id = event.getField(P_ID); 390 } else if (eventType.equals(E_LISTEN_ACK)) { 391 p("Listen ack ok"); 392 } else if (eventType.equals(E_REFRESH_ACK)) { 393 } else if (eventType.equals(E_ABORT)) { 395 listener.onAbort(event); 396 } else if (eventType.equals(E_REFRESH)) { 397 refresh(event); 398 } else { 399 warn("unsupported event type received: " + eventType); 400 } 401 } 402 } 403 } 404 } catch (Throwable t) { 405 warn("Exception in run() ", t); 406 bailout(); 407 } 408 } 409 410 private void disconnect() { 411 p("start disconnect()"); 412 if (reader != null) { 413 try { 414 p("Closed reader ok"); 417 } catch (Exception ignore) { 418 } finally { 419 reader = null; 420 } 421 } 422 p("end disconnect()"); 423 } 424 425 426 public void stopThread() { 427 p("In stopThread()"); 428 429 Thread targetThread = receiveThread; 431 432 receiveThread = null; 433 434 if ((targetThread != null) && targetThread.isAlive()) { 438 439 targetThread.interrupt(); 440 441 try { 442 443 targetThread.join(500); 445 } catch (InterruptedException ignore) { 446 } 447 448 if (targetThread.isAlive()) { 451 452 targetThread.stop(); 455 456 try { 458 targetThread.join(500); 459 } catch (Throwable ignore) { 460 } 461 } 462 463 p("Stopped receiveThread alive=" + targetThread.isAlive()); 464 465 } 466 } 467 468 469 public void bailout() { 470 p("In bailout()"); 471 stopThread(); 472 disconnect(); 473 } 474 475 476 private void refresh(Event aRefreshEvent) throws PushletException { 477 try { 478 Thread.sleep(Long.parseLong(aRefreshEvent.getField(P_WAIT))); 480 } catch (Throwable t) { 481 warn("abort while refresing"); 482 refreshURL = null; 483 return; 484 } 485 486 if (receiveThread == null) { 488 return; 489 } 490 491 refreshURL = pushletURL 493 + "?" + P_ID + "=" + id 494 + "&" + P_EVENT + "=" + E_REFRESH 495 ; 496 497 if (reader != null) { 498 try { 499 reader.close(); 500 501 } catch (IOException ignore) { 502 503 } 504 reader = null; 505 } 506 507 reader = openURL(refreshURL); 508 } 509 } 510 511 512 private static class HTTPAuthenticateProxy extends Authenticator { 513 514 518 519 private String thePassword = ""; 520 private String theUser = ""; 521 522 public HTTPAuthenticateProxy(String username, String password) { 523 524 thePassword = password; 525 theUser = username; 526 } 527 528 protected PasswordAuthentication getPasswordAuthentication() { 529 532 return new PasswordAuthentication (theUser, thePassword.toCharArray()); 533 } 534 535 } 536 537 } 538 539 540 | Popular Tags |