1 17 package org.apache.slide.cluster; 18 19 import java.util.Enumeration ; 20 import java.util.EventListener ; 21 import java.util.Iterator ; 22 import java.util.Map ; 23 24 import org.apache.commons.httpclient.Credentials; 25 import org.apache.commons.httpclient.UsernamePasswordCredentials; 26 import org.apache.commons.httpclient.protocol.Protocol; 27 import org.apache.slide.authenticate.CredentialsToken; 28 import org.apache.slide.authenticate.SecurityToken; 29 import org.apache.slide.common.Domain; 30 import org.apache.slide.common.NamespaceAccessToken; 31 import org.apache.slide.common.SlideTokenImpl; 32 import org.apache.slide.common.Uri; 33 import org.apache.slide.store.ExtendedStore; 34 import org.apache.slide.store.Store; 35 import org.apache.slide.util.conf.Configurable; 36 import org.apache.slide.util.conf.Configuration; 37 import org.apache.slide.util.conf.ConfigurationException; 38 import org.apache.slide.util.logger.Logger; 39 import org.apache.webdav.lib.NotificationListener; 40 import org.apache.webdav.lib.Subscriber; 41 import org.apache.webdav.lib.methods.DepthSupport; 42 43 178 public class ClusterCacheRefresher implements EventListener , Configurable { 179 protected static final String LOG_CHANNEL = ClusterCacheRefresher.class.getName(); 180 181 protected NotificationListener listener; 182 183 public ClusterCacheRefresher() { 184 Domain.log("Creating ClusterCacheRefresher", LOG_CHANNEL, Logger.INFO); 185 } 186 187 public void configure(Configuration configuration) throws ConfigurationException { 188 Domain.log("Configuring ClusterCacheRefresher", LOG_CHANNEL, Logger.INFO); 189 190 Enumeration nodes = configuration.getConfigurations("node"); 191 while (nodes.hasMoreElements()) { 192 Configuration node = (Configuration) nodes.nextElement(); 193 final String host = node.getAttribute("local-host"); 194 final int port = node.getAttributeAsInt("local-port"); 195 final String repositoryHost = node.getAttribute("repository-host"); 196 final int repositoryPort = node.getAttributeAsInt("repository-port"); 197 String repositoryProtocolString = node.getAttribute("repository-protocol", "http"); 198 final Protocol protocol; 199 try { 200 protocol = Protocol.getProtocol(repositoryProtocolString); 201 } catch (IllegalStateException exception) { 202 throw new ConfigurationException("Unknown repository-protocol: " + repositoryProtocolString 203 + ". Must be \"http\" or \"https\".", configuration); 204 } 205 String username = node.getAttribute("username", ""); 206 String password = node.getAttribute("password", ""); 207 final Credentials credentials = new UsernamePasswordCredentials(username, password); 208 final String repositoryDomain = node.getAttribute("repository-domain", "/slide"); 209 final int pollInterval = node.getAttributeAsInt("poll-interval", 60000); 210 final boolean udp = node.getAttributeAsBoolean("udp", true); 211 final String uri = node.getAttribute("base-uri", "/"); 212 final int depth = DepthSupport.DEPTH_INFINITY; 213 final int lifetime = node.getAttributeAsInt("subscription-lifetime", 3600); 214 final int notificationDelay = node.getAttributeAsInt("notification-delay", 0); 215 216 final Subscriber contentSubscriber = new Subscriber() { 217 public void notify(String uri, Map information) { 218 NamespaceAccessToken nat = Domain.accessNamespace(new SecurityToken(this), Domain.getDefaultNamespace()); 219 try { 220 nat.begin(); 221 Iterator keys = information.keySet().iterator(); 222 while (keys.hasNext()) { 223 String key = keys.next().toString(); 224 if ("uri".equals(key)) { 225 Uri theUri = nat.getUri(new SlideTokenImpl(new CredentialsToken("")), stripUri(information.get(key).toString())); 226 Store store = theUri.getStore(); 227 if (store instanceof ExtendedStore) { 228 Domain.log("Resetting cache for " + theUri, LOG_CHANNEL, Logger.INFO); 229 ((ExtendedStore) store).removeObjectFromCache(theUri); 230 } 231 } 232 } 233 nat.commit(); 234 } catch(Exception e) { 235 if (Domain.isEnabled(LOG_CHANNEL, Logger.ERROR)) { 236 Domain.log("Error clearing cache: " + e + ". See stderr for stacktrace.", LOG_CHANNEL, Logger.ERROR); 237 e.printStackTrace(); 238 } 239 } 240 } 241 }; 242 243 final Subscriber structureSubscriber = new Subscriber() { 244 public void notify(String uri, Map information) { 245 NamespaceAccessToken nat = Domain.accessNamespace(new SecurityToken(this), Domain.getDefaultNamespace()); 246 try { 247 nat.begin(); 248 Iterator keys = information.keySet().iterator(); 249 while (keys.hasNext()) { 250 String key = keys.next().toString(); 251 if ("uri".equals(key)) { 252 Uri theUri = nat.getUri(new SlideTokenImpl(new CredentialsToken("")), stripUri(information.get(key).toString())); 253 Store store = theUri.getParentUri().getStore(); 254 if (store instanceof ExtendedStore) { 255 Domain.log("Resetting cache for " + theUri.getParentUri(), LOG_CHANNEL, Logger.INFO); 256 ((ExtendedStore) store).removeObjectFromCache(theUri.getParentUri()); 257 } 258 } 259 } 260 nat.commit(); 261 } catch(Exception e) { 262 if (Domain.isEnabled(LOG_CHANNEL, Logger.ERROR)) { 263 Domain.log("Error clearing cache: " + e + ". See stderr for stacktrace.", LOG_CHANNEL, Logger.ERROR); 264 e.printStackTrace(); 265 } 266 } 267 } 268 }; 269 270 284 Thread t = new Thread (new Runnable () { 285 286 private boolean success; 287 288 public void run() { 289 success = true; 290 listener = new NotificationListener(host, port, repositoryHost, repositoryPort, protocol, credentials, 291 repositoryDomain, pollInterval, udp); 292 293 success = listener.subscribe("Update", uri, depth, lifetime, notificationDelay, contentSubscriber, credentials); 294 success = listener.subscribe("Update/newmember", uri, depth, lifetime, notificationDelay, structureSubscriber, credentials); 295 success = listener.subscribe("Delete", uri, depth, lifetime, notificationDelay, structureSubscriber, credentials); 296 success = listener.subscribe("Move", uri, depth, lifetime, notificationDelay, structureSubscriber, credentials); 297 298 if ( !success ) { 299 try { 301 Thread.sleep(10000); 302 } catch (InterruptedException e) { 303 } 305 } else { 306 try { 308 Thread.sleep(lifetime*1000-60); 309 } catch (InterruptedException e) { 310 } 312 } 313 } 314 }); 315 t.setDaemon(true); 316 t.start(); 317 } 318 } 319 320 327 private String stripUri(String uri) { 328 if ( uri.indexOf("/") == 0 ) { 331 uri = uri.substring(1); 332 } 333 if ( uri.indexOf("/") > -1 ) { 334 uri = uri.substring(uri.indexOf("/")); 335 } 336 return uri; 337 } 338 } | Popular Tags |