1 23 24 package org.apache.webdav.lib; 25 26 import java.io.BufferedReader ; 27 import java.io.ByteArrayInputStream ; 28 import java.io.IOException ; 29 import java.io.InputStreamReader ; 30 import java.net.DatagramPacket ; 31 import java.net.DatagramSocket ; 32 import java.net.ServerSocket ; 33 import java.net.Socket ; 34 import java.util.ArrayList ; 35 import java.util.HashMap ; 36 import java.util.Iterator ; 37 import java.util.List ; 38 import java.util.Map ; 39 import java.util.StringTokenizer ; 40 import java.util.Timer ; 41 import java.util.TimerTask ; 42 import java.util.logging.Level ; 43 import java.util.logging.Logger ; 44 45 import org.apache.commons.httpclient.Credentials; 46 import org.apache.commons.httpclient.HttpConnection; 47 import org.apache.commons.httpclient.HttpState; 48 import org.apache.commons.httpclient.HttpStatus; 49 import org.apache.commons.httpclient.methods.PutMethod; 50 import org.apache.commons.httpclient.protocol.Protocol; 51 import org.apache.webdav.lib.methods.DepthSupport; 52 import org.apache.webdav.lib.methods.XMLResponseMethodBase; 53 import org.apache.webdav.lib.util.XMLPrinter; 54 import org.xml.sax.InputSource ; 55 import org.xml.sax.helpers.AttributesImpl ; 56 57 import de.zeigermann.xml.simpleImporter.DefaultSimpleImportHandler; 58 import de.zeigermann.xml.simpleImporter.SimpleImporter; 59 import de.zeigermann.xml.simpleImporter.SimplePath; 60 61 67 public class NotificationListener { 68 private static Logger logger = Logger.getLogger(NotificationListener.class.getName()); 69 70 protected static final Timer timer = new Timer (); 71 72 private final static int CONNECTION_TIMEOUT = 30000; 73 74 private String notificationHost, repositoryHost, repositoryDomain; 75 private int notificationPort, repositoryPort; 76 private Protocol protocol; 77 private Credentials credentials; 78 private boolean udp = true; 79 80 private List subscribers = new ArrayList (); 81 private String subscribersAsString; 82 83 95 public NotificationListener(String host, int port, String repositoryHost, int repositoryPort, Protocol protocol, Credentials credentials, String repositoryDomain, int pollInterval, boolean udp) { 96 this.credentials = credentials; 97 this.notificationHost = host; 98 this.notificationPort = port; 99 this.repositoryHost = repositoryHost; 100 this.repositoryPort = repositoryPort; 101 this.protocol = protocol; 102 this.repositoryDomain = repositoryDomain; 103 this.udp = udp; 104 105 if ( udp ) { 106 Thread listenerThread = new Thread (new Runnable () { 107 public void run() { 108 DatagramSocket serverSocket = null; 109 try { 110 serverSocket = new DatagramSocket (notificationPort); 111 while (true) { 112 byte[] buf = new byte[256]; 113 DatagramPacket packet = new DatagramPacket (buf, buf.length); 114 serverSocket.receive(packet); 115 BufferedReader reader = new BufferedReader (new InputStreamReader (new ByteArrayInputStream (buf))); 116 parseNotification(reader); 117 } 118 } catch (IOException e) { 119 logger.log(Level.SEVERE, "Error while listening to socket", e); 120 } 121 } 122 }); 123 listenerThread.setDaemon(true); 124 listenerThread.start(); 125 } else { 126 Thread listenerThread = new Thread (new Runnable () { 127 public void run() { 128 ServerSocket serverSocket = null; 129 try { 130 serverSocket = new ServerSocket (notificationPort); 131 while (true) { 132 new ConnectionThread(serverSocket.accept()).start(); 133 } 134 } catch (IOException e) { 135 logger.log(Level.SEVERE, "Error while listening to socket", e); 136 } 137 } 138 }); 139 listenerThread.setDaemon(true); 140 listenerThread.start(); 141 } 142 143 TimerTask poll = new TimerTask () { 144 public void run() { 145 if ( subscribersAsString != null ) { 146 poll(subscribersAsString); 147 } 148 } 149 }; 150 timer.schedule(poll, pollInterval, pollInterval); 151 } 152 153 172 public boolean subscribe(String method, String uri, int depth, int lifetime, int notificationDelay, Subscriber listener, Credentials credentials) { 173 SubscribeMethod subscribeMethod = new SubscribeMethod(repositoryDomain+uri); 174 subscribeMethod.addRequestHeader(SubscribeMethod.H_NOTIFICATION_TYPE, method); 175 if ( udp ) { 176 subscribeMethod.addRequestHeader(SubscribeMethod.H_CALL_BACK, "httpu://"+notificationHost+":"+notificationPort); 177 } else { 178 subscribeMethod.addRequestHeader(SubscribeMethod.H_CALL_BACK, "http://"+notificationHost+":"+notificationPort); 179 } 180 subscribeMethod.addRequestHeader(SubscribeMethod.H_NOTIFICATION_DELAY, String.valueOf(notificationDelay)); 181 subscribeMethod.addRequestHeader(SubscribeMethod.H_SUBSCRIPTION_LIFETIME, String.valueOf(lifetime)); 182 subscribeMethod.addRequestHeader(SubscribeMethod.H_DEPTH, ((depth == DepthSupport.DEPTH_INFINITY ) ? "infinity" : String.valueOf(depth))); 183 try { 184 subscribeMethod.setDoAuthentication(true); 185 HttpState httpState = new HttpState(); 186 httpState.setCredentials(null, repositoryHost, credentials); 187 HttpConnection httpConnection = new HttpConnection(repositoryHost, repositoryPort, protocol); 188 httpConnection.setConnectionTimeout(CONNECTION_TIMEOUT); 189 int state = subscribeMethod.execute(httpState, httpConnection); 190 if ( state == HttpStatus.SC_OK ) { 191 String subscriptionId = subscribeMethod.getResponseHeader(SubscribeMethod.H_SUBSCRIPTION_ID).getValue(); 192 logger.log(Level.INFO, "Received subscription id="+subscriptionId+", listener: "+listener); 193 int id = Integer.valueOf(subscriptionId).intValue(); 194 synchronized ( subscribers ) { 195 subscribers.add(new Subscription(id, uri, listener)); 196 } 197 if ( subscribersAsString == null ) { 198 subscribersAsString = String.valueOf(id); 199 } else { 200 subscribersAsString = subscribersAsString + ", "+String.valueOf(id); 201 } 202 return true; 203 } else { 204 logger.log(Level.SEVERE, "Subscription for uri='"+uri+"' failed. State: "+state); 205 } 206 } catch (IOException e) { 207 logger.log(Level.SEVERE, "Subscription of listener '"+listener+"' failed!", e); 208 } 209 return false; 210 } 211 212 public boolean unsubscribe(String uri, Subscriber listener, Credentials credentials) { 213 UnsubscribeMethod unsubscribeMethod = new UnsubscribeMethod(repositoryDomain+uri); 214 synchronized ( subscribers ) { 215 for ( Iterator i = subscribers.iterator(); i.hasNext(); ) { 216 Subscription subscription = (Subscription)i.next(); 217 if ( subscription.getSubscriber().equals(listener) ) { 218 String id = String.valueOf(subscription.getId()); 219 unsubscribeMethod.addRequestHeader(UnsubscribeMethod.H_SUBSCRIPTION_ID, id); 220 try { 221 unsubscribeMethod.setDoAuthentication(true); 222 HttpState httpState = new HttpState(); 223 httpState.setCredentials(null, repositoryHost, credentials); 224 HttpConnection httpConnection = new HttpConnection(repositoryHost, repositoryPort, protocol); 225 httpConnection.setConnectionTimeout(CONNECTION_TIMEOUT); 226 int state = unsubscribeMethod.execute(httpState, httpConnection); 227 if ( state == HttpStatus.SC_OK ) { 228 i.remove(); 229 return true; 230 } else { 231 logger.log(Level.SEVERE, "Unsubscription failed. State: "+state); 232 } 233 } catch (IOException e) { 234 logger.log(Level.SEVERE, "Unsubscription of listener '"+listener+"' failed!", e); 235 } 236 } 237 } 238 } 239 logger.log(Level.SEVERE, "Listener not unsubscribed!"); 240 return false; 241 } 242 243 public void fireEvent(Map information, Credentials credentials) throws IOException { 244 EventMethod eventMethod = new EventMethod(repositoryDomain); 245 eventMethod.addEvent(new Event(information)); 246 fireEvent(eventMethod, credentials); 247 } 248 249 public void fireVetoableEvent(Map information, Credentials credentials) throws IOException { 250 EventMethod eventMethod = new EventMethod(repositoryDomain); 251 eventMethod.addVetoableEvent(new Event(information)); 252 fireEvent(eventMethod, credentials); 253 } 254 255 protected void fireEvent(EventMethod eventMethod, Credentials credentials) throws IOException { 256 eventMethod.setDoAuthentication(true); 257 HttpState httpState = new HttpState(); 258 httpState.setCredentials(null, repositoryHost, credentials); 259 int state = eventMethod.execute(httpState, new HttpConnection(repositoryHost, repositoryPort, protocol)); 260 if ( state == HttpStatus.SC_OK ) { 261 } else { 262 logger.log(Level.SEVERE, "Event failed. State: "+state); 263 } 264 } 265 266 protected void fireEvent(int id, Map information) { 267 for ( Iterator i = subscribers.iterator(); i.hasNext(); ) { 268 Subscription subscriber = (Subscription)i.next(); 269 if ( subscriber.getId() == id ) { 270 subscriber.fireEvent(information); 271 break; 272 } 273 } 274 } 275 276 protected void poll(String notifiedSubscribers) { 277 StringBuffer registeredSubscribers = new StringBuffer (256); 278 StringTokenizer tokenizer = new StringTokenizer (notifiedSubscribers, ","); 279 boolean first = true; 280 while ( tokenizer.hasMoreTokens() ) { 281 String subscriber = tokenizer.nextToken().trim(); 282 if ( isRegistered(Integer.valueOf(subscriber).intValue()) ) { 283 if ( !first ) registeredSubscribers.append(','); 284 registeredSubscribers.append(subscriber); 285 first = false; 286 } 287 } 288 if ( !first ) { 289 String pollSubscribers = registeredSubscribers.toString(); 290 logger.log(Level.INFO, "Poll for subscribers: "+pollSubscribers); 291 PollMethod pollMethod = new PollMethod(repositoryDomain+"/"); 292 pollMethod.addRequestHeader(SubscribeMethod.H_SUBSCRIPTION_ID, pollSubscribers); 293 try { 294 pollMethod.setDoAuthentication(true); 295 HttpState httpState = new HttpState(); 296 httpState.setCredentials(null, repositoryHost, credentials); 297 HttpConnection httpConnection = new HttpConnection(repositoryHost, repositoryPort, protocol); 298 httpConnection.setConnectionTimeout(CONNECTION_TIMEOUT); 299 int state = pollMethod.execute(httpState, httpConnection); 300 if ( state == HttpStatus.SC_MULTI_STATUS ) { 301 List events = pollMethod.getEvents(); 302 for ( Iterator i = events.iterator(); i.hasNext(); ) { 303 Event event = (Event)i.next(); 304 fireEvent(event.getId(), event.getInformation()); 305 } 306 } else { 307 logger.log(Level.SEVERE, "Poll failed. State: "+state); 308 } 309 } catch (IOException e) { 310 logger.log(Level.SEVERE, "Poll for subscribers '"+subscribers+"' failed!"); 311 } 312 } 313 } 314 315 private boolean isRegistered(int id) { 316 for ( Iterator i = subscribers.iterator(); i.hasNext(); ) { 317 Subscription subscription = (Subscription)i.next(); 318 if ( subscription.getId() == id ) return true; 319 } 320 return false; 321 } 322 323 private void parseNotification(BufferedReader reader) throws IOException { 324 String inputLine; 325 if ( (inputLine = reader.readLine()) != null ) { 326 if ( inputLine.startsWith("NOTIFY") ) { 327 while ( (inputLine = reader.readLine()) != null ) { 328 if ( inputLine.startsWith(SubscribeMethod.H_SUBSCRIPTION_ID_RESPONSE) ) { 329 String subscribers = inputLine.substring(SubscribeMethod.H_SUBSCRIPTION_ID_RESPONSE.length()+2); 330 logger.log(Level.INFO, "Notification received for subscribers: "+subscribers); 331 poll(subscribers); 332 } 333 } 334 } 335 } 336 reader.close(); 337 } 338 339 public class Event { 340 int id; 341 Map information = new HashMap (); 342 343 public Event() { 344 } 345 346 public Event(int id) { 347 this.id = id; 348 } 349 350 public Event(Map information) { 351 this.information = information; 352 } 353 354 public void setId(int id) { 355 this.id = id; 356 } 357 358 public int getId() { 359 return id; 360 } 361 362 public void addInformation(String key, String value) { 363 information.put(key, value); 364 } 365 366 public Map getInformation() { 367 return information; 368 } 369 } 370 371 private class Subscription { 372 private int id; 373 private String uri; 374 private Subscriber subscriber; 375 376 public Subscription(int id, String uri, Subscriber subscriber) { 377 this.id = id; 378 this.uri = uri; 379 this.subscriber = subscriber; 380 } 381 382 public void fireEvent(Map information) { 383 subscriber.notify(uri, information); 384 } 385 386 public Subscriber getSubscriber() { 387 return subscriber; 388 } 389 390 public int getId() { 391 return id; 392 } 393 } 394 395 private class ConnectionThread extends Thread { 396 private Socket socket = null; 397 398 public ConnectionThread(Socket socket) { 399 super("ConnectionThread"); 400 this.socket = socket; 401 } 402 403 public void run() { 404 try { 405 BufferedReader in = new BufferedReader (new InputStreamReader (socket.getInputStream())); 406 parseNotification(in); 407 socket.close(); 408 } catch (IOException e) { 409 logger.log(Level.SEVERE, "Error while listening to connection", e); 410 } 411 } 412 } 413 414 private class PollMethod extends PutMethod { 415 public static final String NAME = "POLL"; 416 417 protected final static String E_SUBSCRIPTION_ID = "subscriptionID"; 418 protected final static String E_LISTENER = "li"; 419 protected final static String E_FIRE_EVENTS = "fire-events"; 420 protected final static String E_EVENT = "event"; 421 protected final static String E_VETOABLE_EVENT = "vetoable-event"; 422 protected final static String E_INFORMATION = "information"; 423 protected final static String E_STATUS = "status"; 424 425 public final static String A_NAME = "name"; 426 427 protected final static String SUBSCRIPTION= ":"+E_SUBSCRIPTION_ID; 428 protected final static String ID = E_LISTENER; 429 protected final static String EVENT = ":"+E_EVENT; 430 protected final static String INFORMATION = ":"+E_INFORMATION; 431 protected final static String STATUS = ":"+E_STATUS; 432 protected final static String STATUS_OK = "HTTP/1.1 200 OK"; 433 434 public PollMethod() { 435 } 436 437 public PollMethod(String uri) { 438 super(uri); 439 } 440 441 public String getName() { 442 return NAME; 443 } 444 445 public List getEvents() { 446 List events = new ArrayList (); 447 try { 448 SimpleImporter importer = new SimpleImporter(); 449 importer.setIncludeLeadingCDataIntoStartElementCallback(true); 450 ResponseHandler handler = new ResponseHandler(events); 451 importer.addSimpleImportHandler(handler); 452 importer.parse(new InputSource (getResponseBodyAsStream())); 453 return handler.getEvents(); 454 } catch (Throwable exception) { 455 logger.log(Level.SEVERE, "Exception while polling for new events: ", exception); 456 } 457 return events; 458 } 459 460 private class ResponseHandler extends DefaultSimpleImportHandler { 461 private List events; 462 private int id; 463 private Event event; 464 private boolean parseEvents; 465 466 public ResponseHandler(List listeners) { 467 this.events = listeners; 468 } 469 470 public List getEvents() { 471 return events; 472 } 473 474 public void startElement(SimplePath path, String name, AttributesImpl attributes, String leadingCDdata) { 475 if (path.matches(STATUS)) { 476 parseEvents = false; 477 if ( leadingCDdata.equals(STATUS_OK) ) parseEvents = true; 478 } 479 if ( parseEvents ) { 480 if (path.matches(SUBSCRIPTION+"/"+ID)) { 481 id = Integer.valueOf(leadingCDdata).intValue(); 482 event = new Event(id); 483 events.add(event); 484 } else if (path.matches(INFORMATION)) { 485 String key = attributes.getValue(PollMethod.A_NAME); 486 String value = leadingCDdata; 487 event.addInformation(key, value); 488 } 489 } 490 } 491 } 492 } 493 494 private class SubscribeMethod extends PutMethod { 495 public static final String NAME = "SUBSCRIBE"; 496 497 public final static String H_NOTIFICATION_TYPE = "Notification-type"; 498 public final static String H_NOTIFICATION_DELAY = "Notification-delay"; 499 public final static String H_SUBSCRIPTION_LIFETIME = "Subscription-lifetime"; 500 public final static String H_SUBSCRIPTION_ID = "Subscription-ID"; 501 public final static String H_SUBSCRIPTION_ID_RESPONSE = "Subscription-id"; 502 public final static String H_CALL_BACK = "Call-back"; 503 public final static String H_DEPTH = "Depth"; 504 505 public SubscribeMethod(String uri) { 506 super(uri); 507 } 508 509 public String getName() { 510 return NAME; 511 } 512 } 513 514 private class UnsubscribeMethod extends PutMethod { 515 public static final String NAME = "UNSUBSCRIBE"; 516 517 public final static String H_SUBSCRIPTION_ID = "Subscription-id"; 518 519 public UnsubscribeMethod(String uri) { 520 super(uri); 521 } 522 523 public String getName() { 524 return NAME; 525 } 526 } 527 528 private class EventMethod extends XMLResponseMethodBase { 529 protected final static String E_FIRE_EVENTS = "fire-events"; 530 protected final static String E_EVENT = "event"; 531 protected final static String E_VETOABLE_EVENT = "vetoable-event"; 532 protected final static String E_INFORMATION = "information"; 533 protected final static String E_STATUS = "status"; 534 535 protected final static String A_INFORMATION_KEY = "name"; 536 537 public static final String NAME = "EVENT"; 538 539 private List vetoableEvents = new ArrayList (); 540 private List events = new ArrayList (); 541 542 public EventMethod(String uri) { 543 super(uri); 544 } 545 546 public void addEvent(Event event) { 547 events.add(event); 548 } 549 550 public void addVetoableEvent(Event event) { 551 vetoableEvents.add(event); 552 } 553 554 public String getName() { 555 return NAME; 556 } 557 558 564 protected String generateRequestBody() { 565 XMLPrinter printer = new XMLPrinter(); 566 printer.writeXMLHeader(); 567 printer.writeElement("D", "DAV:", E_FIRE_EVENTS, XMLPrinter.OPENING); 568 for ( Iterator i = events.iterator(); i.hasNext(); ) { 569 Event event = (Event)i.next(); 570 printer.writeElement("D", E_EVENT, XMLPrinter.OPENING); 571 Map information = event.getInformation(); 572 for ( Iterator j = information.entrySet().iterator(); j.hasNext(); ) { 573 Map.Entry entry = (Map.Entry )j.next(); 574 String name = (String )entry.getKey(); 575 String value = (String )entry.getValue(); 576 printer.writeElement("D", E_INFORMATION+" "+A_INFORMATION_KEY+"=\""+name+"\"", XMLPrinter.OPENING); 577 printer.writeText(value); 578 printer.writeElement("D", E_INFORMATION, XMLPrinter.CLOSING); 579 } 580 printer.writeElement("D", E_EVENT, XMLPrinter.CLOSING); 581 } 582 for ( Iterator i = vetoableEvents.iterator(); i.hasNext(); ) { 583 Event event = (Event)i.next(); 584 printer.writeElement("D", E_VETOABLE_EVENT, XMLPrinter.OPENING); 585 Map information = event.getInformation(); 586 for ( Iterator j = information.entrySet().iterator(); j.hasNext(); ) { 587 Map.Entry entry = (Map.Entry )j.next(); 588 String name = (String )entry.getKey(); 589 String value = (String )entry.getValue(); 590 printer.writeElement("D", E_INFORMATION+" "+A_INFORMATION_KEY+"=\""+name+"\"", XMLPrinter.OPENING); 591 printer.writeText(value); 592 printer.writeElement("D", E_INFORMATION, XMLPrinter.CLOSING); 593 } 594 printer.writeElement("D", E_VETOABLE_EVENT, XMLPrinter.CLOSING); 595 } 596 printer.writeElement("D", E_FIRE_EVENTS, XMLPrinter.CLOSING); 597 return printer.toString(); 598 } 599 } 600 } | Popular Tags |