1 23 24 package org.apache.slide.webdav.event; 25 26 import java.io.File ; 27 import java.io.FileInputStream ; 28 import java.io.FileOutputStream ; 29 import java.io.IOException ; 30 import java.net.DatagramPacket ; 31 import java.net.DatagramSocket ; 32 import java.net.InetAddress ; 33 import java.net.SocketException ; 34 import java.net.URL ; 35 import java.util.ArrayList ; 36 import java.util.HashMap ; 37 import java.util.Iterator ; 38 import java.util.List ; 39 import java.util.Map ; 40 import java.util.Timer ; 41 import java.util.TimerTask ; 42 43 import org.apache.commons.httpclient.HttpConnection; 44 import org.apache.commons.httpclient.HttpState; 45 import org.apache.slide.common.Domain; 46 import org.apache.slide.event.ContentEvent; 47 import org.apache.slide.event.EventCollection; 48 import org.apache.slide.event.EventCollectionFilter; 49 import org.apache.slide.event.EventCollectionListener; 50 import org.apache.slide.event.ResourceEvent; 51 import org.apache.slide.event.VetoException; 52 import org.apache.slide.util.conf.Configurable; 53 import org.apache.slide.util.conf.Configuration; 54 import org.apache.slide.util.conf.ConfigurationException; 55 import org.apache.slide.util.logger.Logger; 56 import org.apache.slide.webdav.util.NotificationConstants; 57 import org.xml.sax.InputSource ; 58 import org.xml.sax.helpers.AttributesImpl ; 59 60 import de.zeigermann.xml.XMLEncode; 61 import de.zeigermann.xml.XMLOutputStreamWriter; 62 import de.zeigermann.xml.XMLWriter; 63 import de.zeigermann.xml.simpleImporter.ConversionHelpers; 64 import de.zeigermann.xml.simpleImporter.DefaultSimpleImportHandler; 65 import de.zeigermann.xml.simpleImporter.SimpleImporter; 66 import de.zeigermann.xml.simpleImporter.SimplePath; 67 68 71 public class NotificationTrigger implements NotificationConstants, EventCollectionListener, Configurable { 72 protected static final String LOG_CHANNEL = NotificationTrigger.class.getName(); 73 private final static String A_INCLUDE_EVENTS = "include-events"; 74 private final static String A_FILENAME = "filename"; 75 76 private final static String TCP_PROTOCOL = "http://"; 77 private final static String UDP_PROTOCOL = "httpu://"; 78 79 private final static String E_SUBSCRIPTIONS = "subscriptions"; 80 private final static String E_SUBSCRIPTION = "subscription"; 81 private final static String A_ID = "id"; 82 private final static String E_URI = "uri"; 83 private final static String E_DEPTH = "depth"; 84 private final static String E_NOTIFICATION_DELAY = "notification-delay"; 85 private final static String E_NOTIFICATION_TYPE = "notification-type"; 86 private final static String E_CALLBACK = "callback"; 87 private final static String E_SUBSCRIPTION_END = "subscription-end"; 88 89 protected static final Timer timer = new Timer (); 90 91 protected List subscribers = new ArrayList (); 92 protected int subscriberId = 0; 93 protected boolean includeEvents = false; 94 protected DatagramSocket socket; 95 protected String filename = null; 96 97 private static NotificationTrigger notificationTrigger = new NotificationTrigger(); 98 99 private NotificationTrigger() { 100 Domain.log("Creating notification trigger", LOG_CHANNEL, Logger.INFO); 101 try { 102 socket = new DatagramSocket (); 103 } catch ( SocketException exception ) { 104 Domain.log("Server socket creation failed, no UDP notifications available", LOG_CHANNEL, Logger.ERROR); 105 socket = null; 106 } 107 } 108 109 public static NotificationTrigger getInstance() { 110 return notificationTrigger; 111 } 112 113 public int addSubscriber(Subscriber subscriber) { 114 Domain.log("Adding subscriber", LOG_CHANNEL, Logger.INFO); 115 subscriberId++; 116 subscriber.setId(subscriberId); 117 subscribers.add(subscriber); 118 refreshSubscriber(subscriber, true); 119 return subscriberId; 120 } 121 122 public boolean removeSubscriber(Subscriber subscriber) { 123 Domain.log("Removing subscriber with ID: "+subscriber.getId(), LOG_CHANNEL, Logger.INFO); 124 subscriber.getLifetime().cancel(); 125 saveSubscribers(); 126 return subscribers.remove(subscriber); 127 } 128 129 public void refreshSubscriber(final Subscriber subscriber, boolean persist) { 130 TimerTask lifetimeTask = subscriber.getLifetime(); 131 if ( lifetimeTask != null ) lifetimeTask.cancel(); 132 if ( subscriber.getSubscriptionLifetime() > 0 ) { 133 Domain.log("Refreshing subscriber with ID: "+subscriber.getId(), LOG_CHANNEL, Logger.INFO); 134 TimerTask lifetime = new TimerTask () { 135 public void run() { 136 Domain.log("Removing subscriber with ID: "+subscriber.getId(), LOG_CHANNEL, Logger.INFO); 137 refreshSubscriber(subscriber, true); 138 } 139 }; 140 subscriber.setLifetime(lifetime); 141 timer.schedule(lifetime, subscriber.getSubscriptionLifetime()*1000); 142 } 143 if ( persist ) saveSubscribers(); 144 } 145 146 public List getSubscribers() { 147 return subscribers; 148 } 149 150 public Subscriber getSubscriber(int id) { 151 for ( Iterator i = subscribers.iterator(); i.hasNext(); ) { 152 Subscriber subsciber = (Subscriber)i.next(); 153 if ( subsciber.getId() == id ) { 154 return subsciber; 155 } 156 } 157 return null; 158 } 159 160 public void vetoableCollected(EventCollection collection) throws VetoException { 161 } 162 163 public void collected(EventCollection collection) { 164 notifySubscribers(collection); 165 } 166 167 private void notifySubscribers(EventCollection collection) { 168 Map subscriberEnumerations = new HashMap (); 169 List matchingSubscribers = new ArrayList (); 170 171 ContentEvent[] update = EventCollectionFilter.getChangedContents(collection); 174 for ( int i = 0; i < update.length; i++ ) { 175 matchingSubscribers.addAll(getSubscribers(Subscriber.UPDATE, update[i])); 176 } 177 ContentEvent[] create = EventCollectionFilter.getCreatedContents(collection); 178 for ( int i = 0; i < create.length; i++ ) { 179 matchingSubscribers.addAll(getSubscribers(Subscriber.NEW_MEMBER, create[i])); 180 } 181 ContentEvent[] delete = EventCollectionFilter.getRemovedContents(collection); 182 for ( int i = 0; i < delete.length; i++ ) { 183 matchingSubscribers.addAll(getSubscribers(Subscriber.DELETE, delete[i])); 184 } 185 187 for ( Iterator i = matchingSubscribers.iterator(); i.hasNext(); ) { 189 final Subscriber subscriber = (Subscriber)i.next(); 190 191 if (!subscriber.hasCallback()) continue; 193 194 if ( subscriber.getNotificationDelay() == 0 ) { 195 List idList = (List )subscriberEnumerations.get(subscriber.getCallback()); 197 if ( idList == null ) { 198 idList = new ArrayList (); 199 subscriberEnumerations.put(subscriber.getCallback(), idList); 200 } 201 Integer subscriberId = new Integer (subscriber.getId()); 202 if ( !idList.contains(subscriberId) ) { 203 idList.add(subscriberId); 204 } 205 } else { 206 TimerTask notifyTask = subscriber.getNotify(); 208 if ( notifyTask == null ) { 209 Domain.log("Starting notification delay: "+subscriber.getNotificationDelay(), 210 LOG_CHANNEL, Logger.INFO); 211 notifyTask = new TimerTask () { 212 public void run() { 213 notifySubscriber(subscriber.getCallback(), 214 String.valueOf(subscriber.getId())); 215 subscriber.setNotify(null); 216 } 217 }; 218 subscriber.setNotify(notifyTask); 219 timer.schedule(notifyTask, subscriber.getNotificationDelay()*1000); 220 } 221 } 222 } 223 for ( Iterator i = subscriberEnumerations.entrySet().iterator(); i.hasNext(); ) { 224 Map.Entry entry = (Map.Entry )i.next(); 225 String callBack = (String )entry.getKey(); 226 List idList = (List )entry.getValue(); 227 StringBuffer subscriberBuffer = new StringBuffer (128); 228 boolean firstSubscriber = true; 229 for ( Iterator j = idList.iterator(); j.hasNext(); ) { 230 Integer id = (Integer )j.next(); 231 if ( !firstSubscriber ) { 232 subscriberBuffer.append(", "); 233 } 234 firstSubscriber = false; 235 subscriberBuffer.append(id); 236 } 237 if ( !firstSubscriber ) { 238 notifySubscriber(callBack, subscriberBuffer.toString()); 239 } 240 } 241 } 242 243 protected void notifySubscriber(String callback, String subscribers) { 244 if ( callback.startsWith(TCP_PROTOCOL) ) { 245 Domain.log("Notify subscribers with adress='"+callback+"' via TCP with id's "+subscribers, LOG_CHANNEL, Logger.INFO); 246 NotifyMethod notifyMethod = new NotifyMethod(callback.toString()); 247 notifyMethod.addRequestHeader(H_SUBSCRIPTION_ID_RESPONSE, subscribers); 248 try { 249 URL url = new URL (callback); 250 notifyMethod.execute( 251 new HttpState(), new HttpConnection(url.getHost(), 252 url.getPort()!=-1 ? url.getPort() : 80)); 253 } catch (IOException e) { 254 Domain.log("Notification of subscriber '"+callback.toString()+"' failed!"); 255 } 256 } else if ( callback.startsWith(UDP_PROTOCOL) && socket != null ) { 257 Domain.log("Notify subscribers with adress='"+callback+"' via UDP with id's "+subscribers+"\n", LOG_CHANNEL, Logger.INFO); 258 try { 259 URL url = new URL (TCP_PROTOCOL+callback.substring(UDP_PROTOCOL.length())); 260 String notification = "NOTIFY "+callback+" HTTP/1.1\nSubscription-id: "+subscribers; 261 byte[] buf = notification.getBytes(); 262 InetAddress address = InetAddress.getByName(url.getHost()); 263 DatagramPacket packet = new DatagramPacket ( 264 buf, buf.length, address, url.getPort()!=-1 ? url.getPort() : 80); 265 socket.send(packet); 266 } catch (IOException e) { 267 Domain.log("Notification of subscriber '"+callback.toString()+"' failed!", LOG_CHANNEL, Logger.ERROR); 268 } 269 } 270 } 271 272 private List getSubscribers(String type, ResourceEvent event) { 273 List matchingSubscribers = new ArrayList (); 274 for ( Iterator i = subscribers.iterator(); i.hasNext(); ) { 275 Subscriber subscriber = (Subscriber)i.next(); 276 if ( subscriber.matches(type, event)) { 277 matchingSubscribers.add(subscriber); 278 subscriber.addEvent(event); 280 } 281 } 282 return matchingSubscribers; 283 } 284 285 public void configure(Configuration configuration) throws ConfigurationException { 286 Configuration notification = configuration.getConfiguration("notification"); 287 includeEvents = notification.getAttributeAsBoolean(A_INCLUDE_EVENTS, false); 288 Configuration persistSubscriptions = configuration.getConfiguration("persist-subscriptions"); 289 if ( persistSubscriptions != null ) { 290 filename = persistSubscriptions.getAttribute(A_FILENAME); 291 } 292 loadSubscribers(); 293 } 294 295 private void loadSubscribers() { 296 if ( filename != null ) { 297 synchronized ( subscribers ) { 298 File file = new File (filename); 299 if ( file.exists() ) { 300 try { 301 FileInputStream inputStream = new FileInputStream (filename); 302 SimpleImporter importer = new SimpleImporter(); 303 importer.addSimpleImportHandler(new DefaultSimpleImportHandler() { 304 String callback, notificationType, uri; 305 int depth, notificationDelay, subscriptionLifetime, id; 306 List events = new ArrayList (); 307 308 public void startElement(SimplePath path, String name, AttributesImpl attributes, String leadingCDdata) { 309 if ( path.matches(E_SUBSCRIPTION) ) { 310 id = ConversionHelpers.getInt(attributes.getValue(A_ID)); 311 } else if ( path.matches(E_URI) ) { 312 uri = leadingCDdata; 313 } else if ( path.matches(E_DEPTH) ) { 314 depth = Integer.valueOf(leadingCDdata).intValue(); 315 } else if ( path.matches(E_CALLBACK) ) { 316 callback = leadingCDdata; 317 } else if ( path.matches(E_NOTIFICATION_DELAY) ) { 318 notificationDelay = Integer.valueOf(leadingCDdata).intValue(); 319 } else if ( path.matches(E_NOTIFICATION_TYPE) ) { 320 notificationType = leadingCDdata; 321 } else if ( path.matches(E_SUBSCRIPTION_END) ) { 322 subscriptionLifetime = (int)(Long.valueOf(leadingCDdata).longValue() - System.currentTimeMillis()); 323 } 324 } 325 326 public void endElement(SimplePath path, String name) { 327 if ( path.matches(E_SUBSCRIPTION) ) { 328 Subscriber subscriber = new Subscriber(uri, callback, notificationType, notificationDelay, subscriptionLifetime, depth); 329 subscribers.add(subscriber); 330 refreshSubscriber(subscriber, false); 331 } 332 } 333 }); 334 importer.parse(new InputSource (inputStream)); 335 } catch (Exception e) { 336 Domain.log("Exception while restoring subscriptions. Skipping..."); 337 } 338 } 339 } 340 } 341 } 342 343 private void saveSubscribers() { 344 if ( filename != null ) { 345 synchronized ( subscribers ) { 346 try { 347 FileOutputStream outputStream = new FileOutputStream (filename); 348 XMLOutputStreamWriter writer = new XMLOutputStreamWriter(outputStream); 349 writer.writeXMLDeclaration(); 350 writer.writeStartTag(XMLWriter.createStartTag(E_SUBSCRIPTIONS)); 351 for ( Iterator i = subscribers.iterator(); i.hasNext(); ) { 352 Subscriber subscriber = (Subscriber)i.next(); 353 writer.writeStartTag(XMLWriter.createStartTag(E_SUBSCRIPTION, new String [][] { 354 { A_ID, String.valueOf(subscriber.getId()) } })); 355 writer.writeElementWithPCData(XMLWriter.createStartTag(E_URI), XMLEncode.xmlEncodeText(subscriber.getUri()), XMLWriter.createEndTag(E_URI)); 356 writer.writeElementWithPCData(XMLWriter.createStartTag(E_DEPTH), String.valueOf(subscriber.getDepth()), XMLWriter.createEndTag(E_DEPTH)); 357 writer.writeElementWithPCData(XMLWriter.createStartTag(E_CALLBACK), XMLEncode.xmlEncodeText(subscriber.getCallback()), XMLWriter.createEndTag(E_CALLBACK)); 358 writer.writeElementWithPCData(XMLWriter.createStartTag(E_NOTIFICATION_TYPE), XMLEncode.xmlEncodeText(subscriber.getNotificationType()), XMLWriter.createEndTag(E_NOTIFICATION_TYPE)); 359 writer.writeElementWithPCData(XMLWriter.createStartTag(E_NOTIFICATION_DELAY), String.valueOf(subscriber.getNotificationDelay()), XMLWriter.createEndTag(E_NOTIFICATION_DELAY)); 360 writer.writeElementWithPCData(XMLWriter.createStartTag(E_SUBSCRIPTION_END), String.valueOf(subscriber.getSubscriptionEnd()), XMLWriter.createEndTag(E_SUBSCRIPTION_END)); 361 writer.writeEndTag(XMLWriter.createEndTag(E_SUBSCRIPTION)); 362 } 363 writer.writeEndTag(XMLWriter.createEndTag(E_SUBSCRIPTIONS)); 364 outputStream.close(); 365 } catch ( Exception e) { 366 Domain.log(e); 367 } 368 } 369 } 370 } 371 } | Popular Tags |