1 25 26 package org.objectweb.petals.jbi.transport; 27 28 import java.net.ConnectException ; 29 import java.net.UnknownHostException ; 30 import java.util.concurrent.ArrayBlockingQueue ; 31 32 import javax.jbi.JBIException; 33 import javax.jms.Connection ; 34 import javax.jms.DeliveryMode ; 35 import javax.jms.JMSException ; 36 import javax.jms.Message ; 37 import javax.jms.MessageConsumer ; 38 import javax.jms.MessageListener ; 39 import javax.jms.MessageProducer ; 40 import javax.jms.Session ; 41 import javax.naming.InitialContext ; 42 import javax.naming.NamingException ; 43 44 import org.objectweb.joram.client.jms.ConnectionFactory; 45 import org.objectweb.joram.client.jms.Topic; 46 import org.objectweb.joram.client.jms.admin.AdminException; 47 import org.objectweb.joram.client.jms.admin.AdminModule; 48 import org.objectweb.joram.client.jms.local.LocalConnectionFactory; 49 50 import org.objectweb.petals.jbi.messaging.MessageExchangeImpl; 51 import org.objectweb.petals.util.JNDIUtil; 52 53 57 public class JoramConnection { 58 59 private int POOLSIZE = 8; 60 61 protected static final String TOPIC_NAME = "JoramConnection-"; 62 63 protected String host; 64 65 protected InitialContext ictx; 66 67 protected int id; 68 69 protected String pwd; 70 71 protected Connection connection; 72 73 protected ArrayBlockingQueue <Session > sessionsPool; 74 75 protected Session session; 76 77 protected Topic topic; 78 79 protected int tcp; 80 81 protected String user; 82 83 protected boolean connectionStarted = false; 84 85 protected Serializer serializer = new ObjectSerializer(); 86 87 public JoramConnection(int id, int tcp, String user, String pwd, 88 String host, InitialContext ictx) { 89 this.id = id; 90 this.tcp = tcp; 91 this.user = user; 92 this.pwd = pwd; 93 this.host = host; 94 this.ictx = ictx; 95 } 96 97 public static String containerNameToTopicName(String containerName) { 98 return TOPIC_NAME + containerName; 99 } 100 101 110 public void start(MessageListener aML) throws JMSException , 111 ConnectException , JBIException { 112 113 ConnectionFactory connectionFactory = null; 114 try { 115 connectAdminModule(); 116 topic = bindTopic(); 117 connectionFactory = (ConnectionFactory) LocalConnectionFactory 118 .create(); 119 disconnectAdminModule(); 120 } catch (Exception e) { 121 throw new JBIException(e); 122 } 123 124 128 connection = connectionFactory.createConnection(user, pwd); 129 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 130 MessageConsumer consumer = session.createDurableSubscriber(topic, "" 131 + id); 132 consumer.setMessageListener(aML); 133 134 sessionsPool = new ArrayBlockingQueue <Session >(POOLSIZE); 136 for (int i = 0; i < POOLSIZE; i++) { 137 sessionsPool.add(connection.createSession(false, 138 Session.CLIENT_ACKNOWLEDGE)); 139 } 140 141 connection.start(); 142 connectionStarted = true; 143 } 144 145 154 public void stop() throws JMSException , NamingException { 155 156 for (Session session : sessionsPool) { 157 session.close(); 158 } 159 if (connectionStarted) { 160 connection.stop(); 161 connectionStarted = false; 162 } 163 } 164 165 174 protected Topic bindTopic() throws NamingException , ConnectException , 175 AdminException { 176 Topic t = null; 177 178 String topicName = TOPIC_NAME + id; 179 180 boolean alreadyBound = JNDIUtil.isBound(ictx, "/", topicName); 181 182 if (alreadyBound) { 183 t = (Topic) ictx.lookup(topicName); 185 } else { 186 t = Topic.create(topicName); 188 t.setFreeReading(); 189 t.setFreeWriting(); 190 ictx.bind(topicName, t); 191 } 192 193 return t; 194 } 195 196 protected void unbindTopic() throws NamingException { 197 198 String topicName = TOPIC_NAME + id; 199 200 boolean alreadyBound = JNDIUtil.isBound(ictx, "/", topicName); 201 202 if (alreadyBound) { 203 ictx.unbind(topicName); 205 } 206 } 207 208 218 public void sendTo(String containerName, 219 MessageExchangeImpl messageExchange, long timeToLive, boolean persistence) 220 throws TransportException { 221 222 Session distantSession = null; 223 MessageProducer producer; 224 225 try { 226 Topic remoteTopic = findTopicForContainer(containerName); 228 229 distantSession = sessionsPool.take(); 231 232 producer = distantSession.createProducer(remoteTopic); 234 235 Message msg = serializer.jbi2jms(messageExchange, distantSession); 237 238 int persistenceMode = (persistence) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; 240 241 producer.send(msg, persistenceMode, 0, timeToLive); 242 243 producer.close(); 244 245 sessionsPool.offer(distantSession); 246 247 distantSession =null; 248 249 } catch (InterruptedException e) { 250 throw new TransportException( 251 "Problem accessing the JMS sessions pool.", e); 252 } catch (JMSException e) { 253 throw new TransportException( 254 "Can not send the message to the JMS destination.", e); 255 } finally { 256 if (distantSession != null) { 258 sessionsPool.offer(distantSession); 259 } 260 } 261 } 262 263 273 public void unsubscribe() throws JMSException , NamingException , 274 ConnectException , UnknownHostException , AdminException { 275 276 if (connectionStarted) 277 connection.stop(); 278 279 session.close(); 280 connection.start(); 281 282 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 283 session.unsubscribe("" + id); 284 session.close(); 285 286 connection.close(); 287 connectionStarted = false; 288 289 unbindTopic(); 290 } 291 292 301 protected Topic findTopicForContainer(String containerName) 302 throws TransportException { 303 Topic remoteTopic = null; 304 try { 305 remoteTopic = (Topic) ictx.lookup(JoramConnection 306 .containerNameToTopicName(containerName)); 307 } catch (NamingException e) { 308 throw new TransportException( 309 "Can not find the Topic associate to the destination container '" 310 + containerName + "'", e); 311 } 312 return remoteTopic; 313 } 314 315 318 protected void connectAdminModule() throws ConnectException , 319 UnknownHostException , AdminException { 320 AdminModule.connect(host, tcp, user, pwd, 3); 321 } 322 323 326 protected void disconnectAdminModule() { 327 AdminModule.disconnect(); 328 } 329 330 } 331 | Popular Tags |