1 16 package org.apache.cocoon.components.slide.impl; 17 18 import java.util.ArrayList ; 19 import java.util.Collections ; 20 import java.util.Hashtable ; 21 import java.util.Iterator ; 22 import java.util.List ; 23 24 import javax.jms.DeliveryMode ; 25 import javax.jms.JMSException ; 26 import javax.jms.Session ; 27 import javax.jms.Topic ; 28 import javax.jms.TopicConnection ; 29 import javax.jms.TopicConnectionFactory ; 30 import javax.jms.TopicPublisher ; 31 import javax.jms.TopicSession ; 32 import javax.naming.Context ; 33 import javax.naming.InitialContext ; 34 import javax.naming.NamingException ; 35 36 import org.apache.slide.common.NamespaceAccessToken; 37 import org.apache.slide.common.ServiceAccessException; 38 import org.apache.slide.common.SlideToken; 39 import org.apache.slide.content.AbstractContentInterceptor; 40 import org.apache.slide.content.NodeRevisionContent; 41 import org.apache.slide.content.NodeRevisionDescriptor; 42 import org.apache.slide.content.NodeRevisionDescriptors; 43 import org.apache.slide.lock.ObjectLockedException; 44 import org.apache.slide.security.AccessDeniedException; 45 import org.apache.slide.structure.LinkedObjectNotFoundException; 46 import org.apache.slide.structure.ObjectNotFoundException; 47 import org.apache.slide.util.logger.Logger; 48 49 53 public class JMSContentInterceptor extends AbstractContentInterceptor { 54 55 56 58 private static final String LOG_CHANNEL = "JMSContentInterceptor"; 59 60 private static final String PARAM_TOPIC_FACTORY = "topic-factory"; 61 private static final String PARAM_TOPIC = "topic"; 62 private static final String PARAM_PERSISTENT = "persistent-delivery"; 63 private static final String PARAM_PRIORITY = "priority"; 64 private static final String PARAM_TIME_TO_LIVE = "time-to-live"; 65 private static final String PARAM_INITIAL_CONTEXT_FACTORY = Context.INITIAL_CONTEXT_FACTORY; 66 private static final String PARAM_PROVIDER_URL = Context.PROVIDER_URL; 67 68 private static final String DEFAULT_TOPIC_FACTORY = "JmsTopicConnectionFactory"; 69 private static final String DEFAULT_TOPIC = "topic1"; 70 private static final String DEFAULT_PERSISTENT = "false"; 71 private static final String DEFAULT_PRIORITY = "4"; 72 private static final String DEFAULT_TIME_TO_LIVE = "1000"; 73 private static final String DEFAULT_INITIAL_CONTEXT_FACTORY = "org.exolab.jms.jndi.InitialContextFactory"; 74 private static final String DEFAULT_PROVIDER_URL = "rmi://localhost:1099/"; 75 76 77 79 private TopicConnection m_connection; 81 private TopicSession m_session; 82 private Topic m_topic; 83 private TopicPublisher m_publisher; 84 85 private String m_topicFactoryName; 87 private String m_topicName; 88 private int m_deliveryMode; 89 private int m_priority; 90 private long m_timeToLive; 91 private Hashtable m_jndiProps; 92 93 private List m_queue = Collections.synchronizedList(new ArrayList ()); 95 96 private boolean m_started = false; 98 99 100 102 public JMSContentInterceptor() { 103 } 104 105 136 public void setParameters(Hashtable params) { 137 super.setParameters(params); 138 139 m_topicFactoryName = getParameter(PARAM_TOPIC_FACTORY,DEFAULT_TOPIC_FACTORY); 141 m_topicName = getParameter(PARAM_TOPIC,DEFAULT_TOPIC); 142 boolean persistent = Boolean.valueOf(getParameter(PARAM_PERSISTENT,DEFAULT_PERSISTENT)).booleanValue(); 143 144 m_deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; 145 m_priority = Integer.valueOf(getParameter(PARAM_PRIORITY,DEFAULT_PRIORITY)).intValue(); 146 m_timeToLive = Long.valueOf(getParameter(PARAM_TIME_TO_LIVE,DEFAULT_TIME_TO_LIVE)).longValue(); 147 148 m_jndiProps = new Hashtable (); 150 m_jndiProps.put(Context.INITIAL_CONTEXT_FACTORY, 151 getParameter(PARAM_INITIAL_CONTEXT_FACTORY, 152 DEFAULT_INITIAL_CONTEXT_FACTORY) 153 ); 154 m_jndiProps.put(Context.PROVIDER_URL, 155 getParameter(PARAM_PROVIDER_URL, 156 DEFAULT_PROVIDER_URL) 157 ); 158 } 159 160 163 public void setNamespace(NamespaceAccessToken nat) { 164 super.setNamespace(nat); 165 166 try { 168 Context context = new InitialContext (m_jndiProps); 169 TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory ) 170 context.lookup(m_topicFactoryName); 171 m_connection = topicConnectionFactory.createTopicConnection(); 172 m_connection.start(); 173 m_session = m_connection.createTopicSession(false,Session.DUPS_OK_ACKNOWLEDGE); 174 m_topic = m_session.createTopic(m_topicName); 175 m_publisher = m_session.createPublisher(m_topic); 176 177 184 Thread t = new Thread (new Runnable () { 185 public void run() { 186 m_started = true; 187 while (m_started) { 188 try { 189 Thread.sleep(1000); 190 } catch (InterruptedException e) { 191 } 193 if (m_queue.size() == 0) continue; 194 List list = m_queue; 195 m_queue = Collections.synchronizedList(new ArrayList ()); 196 Iterator iter = list.iterator(); 197 while (iter.hasNext()) { 198 String msg = (String ) iter.next(); 199 if (getLogger().isEnabled(Logger.INFO)) { 200 getLogger().log("Sending message: " + msg,Logger.INFO); 201 } 202 try { 203 m_publisher.publish(m_session.createTextMessage(msg), 204 m_deliveryMode,m_priority,m_timeToLive); 205 } 206 catch (JMSException e) { 207 getLogger().log("Failure sending JMS message.",e,LOG_CHANNEL,Logger.ERROR); 208 } 209 } 210 } 211 } 212 }); 213 t.setPriority(Thread.NORM_PRIORITY); 214 t.start(); 215 } catch (NamingException e) { 216 getLogger().log("Failure while connecting to JMS server.",e,LOG_CHANNEL,Logger.ERROR); 217 } catch (JMSException e) { 218 getLogger().log("Failure while connecting to JMS server.",e,LOG_CHANNEL,Logger.ERROR); 219 } 220 221 } 222 223 private String getParameter(String name, String defaultValue) { 224 String value = super.getParameter(name); 225 if (value == null) { 226 value = defaultValue; 227 } 228 return value; 229 } 230 231 232 234 public void postRemoveContent( 235 SlideToken slideToken, 236 NodeRevisionDescriptors revisions, 237 NodeRevisionDescriptor revision) 238 throws 239 AccessDeniedException, 240 ObjectNotFoundException, 241 LinkedObjectNotFoundException, 242 ObjectLockedException, 243 ServiceAccessException { 244 245 if (!m_started) { 246 return; 247 } 248 if (revisions == null) { 249 return; 250 } 251 queueMessage(revisions.getUri(),"remove"); 252 253 } 254 255 public void postStoreContent( 256 SlideToken slideToken, 257 NodeRevisionDescriptors revisions, 258 NodeRevisionDescriptor revision, 259 NodeRevisionContent content) 260 throws 261 AccessDeniedException, 262 ObjectNotFoundException, 263 LinkedObjectNotFoundException, 264 ObjectLockedException, 265 ServiceAccessException { 266 267 if (!m_started) { 268 return; 269 } 270 if (revisions == null) { 271 return; 272 } 273 queueMessage(revisions.getUri(),"store"); 274 } 275 276 private void queueMessage(String uri, String type) { 277 String msg = "slide-interceptor:" + type + "|" + getNamespace().getName() + uri; 278 m_queue.add(msg); 279 } 280 281 private Logger getLogger() { 282 return getNamespace().getLogger(); 283 } 284 } 285 | Popular Tags |