1 16 package org.apache.cocoon.components.jms; 17 18 import java.util.Hashtable ; 19 import java.util.Iterator ; 20 import java.util.LinkedList ; 21 import java.util.List ; 22 23 import javax.jms.JMSException ; 24 import javax.jms.MessageListener ; 25 import javax.jms.Session ; 26 import javax.jms.Topic ; 27 import javax.jms.TopicConnection ; 28 import javax.jms.TopicConnectionFactory ; 29 import javax.jms.TopicPublisher ; 30 import javax.jms.TopicSession ; 31 import javax.jms.TopicSubscriber ; 32 import javax.naming.Context ; 33 import javax.naming.InitialContext ; 34 import javax.naming.NamingException ; 35 36 import org.apache.avalon.framework.CascadingException; 37 import org.apache.avalon.framework.activity.Disposable; 38 import org.apache.avalon.framework.activity.Initializable; 39 import org.apache.avalon.framework.configuration.Configurable; 40 import org.apache.avalon.framework.configuration.Configuration; 41 import org.apache.avalon.framework.configuration.ConfigurationException; 42 import org.apache.avalon.framework.logger.AbstractLogEnabled; 43 import org.apache.avalon.framework.parameters.Parameters; 44 import org.apache.avalon.framework.thread.ThreadSafe; 45 46 68 public class JMSConnectionImpl extends AbstractLogEnabled 69 implements Configurable, 70 Disposable, 71 ThreadSafe, 72 Initializable, 73 JMSConnection { 74 75 private boolean available = false; 76 protected String topicFactoryName; 77 protected String topicName; 78 protected String ackModeName = "dups"; 79 protected String durableSubscriptionID; 80 81 protected TopicConnection connection = null; 82 protected TopicSession session = null; 83 protected List subscribers = null; 84 protected Topic topic = null; 85 protected int ackMode = Session.DUPS_OK_ACKNOWLEDGE; 86 protected Context context = null; 87 protected TopicConnectionFactory topicConnectionFactory; 88 89 private Parameters jndiParams; 90 91 92 public void configure(Configuration conf) throws ConfigurationException { 93 Parameters parameters = Parameters.fromConfiguration(conf); 94 this.jndiParams = Parameters.fromConfiguration(conf.getChild("jndi-info")); 95 this.topicFactoryName = 96 parameters.getParameter("topic-factory", null); 97 this.topicName = parameters.getParameter("topic", null); 98 this.durableSubscriptionID = 99 parameters.getParameter( 100 "durable-subscription-id",null); 101 102 this.ackModeName = 103 parameters.getParameter("ack-mode", this.ackModeName).toLowerCase(); 104 this.ackMode = Session.CLIENT_ACKNOWLEDGE; 107 if (this.ackModeName.equals("auto")) { 108 this.ackMode = Session.AUTO_ACKNOWLEDGE; 109 } else if (this.ackModeName.equals("dups")) { 110 this.ackMode = Session.DUPS_OK_ACKNOWLEDGE; 111 } else if (!this.ackModeName.equals("client")) { 112 this.ackMode = -1; 114 } 115 } 116 117 120 public void initialize() throws Exception { 121 try { 122 this.context = setupContext(); 123 this.setupConnection(); 124 this.setupSession(); 125 this.available = true; 126 } catch (NamingException e) { 127 if (getLogger().isWarnEnabled()) { 128 Throwable rootCause = e.getRootCause(); 129 if (rootCause != null) { 130 String message = e.getRootCause().getMessage(); 131 if (rootCause instanceof ClassNotFoundException ) { 132 String info = "WARN! *** JMS block is installed but jms client library not found. ***\n" + 133 "- For the jms block to work you must install and start a JMS server and " + 134 "place the client jar in WEB-INF/lib."; 135 if (message.indexOf("exolab") > 0 ) { 136 info += "\n- The default server, OpenJMS is configured in cocoon.xconf but is not bundled with Cocoon."; 137 } 138 System.err.println(info); 139 getLogger().warn(info,e); 140 } else { 141 System.out.println(message); 142 getLogger().warn("Cannot get Initial Context. Is the JNDI server reachable?",e); 143 } 144 } 145 else { 146 getLogger().warn("Failed to initialize JMS.",e); 147 } 148 } 149 } catch (JMSException e) { 150 if (getLogger().isWarnEnabled()) { 151 getLogger().warn("Failed to initialize JMS.",e); 152 } 153 } 154 } 155 156 159 public void dispose() { 160 try { 161 this.disconnect(); 162 } catch (JMSException e) { 163 } catch (NamingException e) { 164 } 165 } 166 167 173 public synchronized void registerListener( 174 MessageListener listener, 175 String selector) 176 throws CascadingException, JMSException , NamingException { 177 178 if (!this.available) { 179 throw new CascadingException("Attempt to register Listener on unavailable JMS Connection"); 181 } 182 183 TopicSubscriber subscriber = null; 184 if (this.durableSubscriptionID != null) { 185 subscriber = 186 this.getSession().createDurableSubscriber( 187 this.topic, 188 this.durableSubscriptionID, 189 selector, 190 false); 191 } else { 192 subscriber = this.getSession().createSubscriber(this.topic, selector, false); 193 } 194 if (this.subscribers == null) { 195 this.subscribers = new LinkedList (); 196 } 197 this.subscribers.add(subscriber); 198 199 subscriber.setMessageListener(listener); 200 } 201 202 209 public TopicPublisher getPublisher() throws JMSException , NamingException { 210 TopicSession session = this.getSession(); 211 if (session != null) { 212 return session.createPublisher(this.topic); 213 } else { 214 return null; 215 } 216 } 217 218 226 public TopicSession getSession() throws NamingException , JMSException { 227 return this.session; 228 } 229 230 236 protected Context setupContext() throws NamingException { 237 238 String [] jndiKeys = jndiParams.getNames(); 239 InitialContext ctx; 240 if (jndiKeys.length > 0) { 241 Hashtable properties = null; 243 properties = new Hashtable (); 244 for (int i = 0 ; i < jndiKeys.length ; i++) { 245 properties.put(jndiKeys[i],jndiParams.getParameter(jndiKeys[i],"")); 246 } 247 ctx = new InitialContext (properties); 248 } else { 249 ctx = new InitialContext (); 251 } 252 return ctx; 253 } 254 255 256 262 private void setupConnection() throws NamingException , JMSException { 263 if (this.context != null) { 266 this.topicConnectionFactory = 267 (TopicConnectionFactory ) this.context.lookup(this.topicFactoryName); 268 this.connection = this.topicConnectionFactory.createTopicConnection(); 269 this.connection.start(); 270 } 271 } 272 273 278 private void setupSession() throws JMSException { 279 if (this.connection != null) { 280 this.session = connection.createTopicSession(false, this.ackMode); 281 this.topic = session.createTopic(this.topicName); 282 } 283 } 284 285 291 private void disconnect() throws JMSException , NamingException { 292 if (this.subscribers != null) { 293 for (Iterator i = this.subscribers.iterator(); i.hasNext();) { 294 ((TopicSubscriber ) i.next()).close(); 295 } 296 this.subscribers.clear(); 297 } 298 if ( this.session != null ) { 299 this.session.close(); 300 } 301 if ( this.connection != null ) { 302 this.connection.close(); 303 } 304 } 305 306 } 307 | Popular Tags |