KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > petals > jbi > transport > JoramConnection


1 /**
2  * PETALS: PETALS Services Platform
3  * Copyright (C) 2005 EBM WebSourcing
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): EBM WebSourcing
21  * --------------------------------------------------------------------------
22  * $Id: JoramConnection.java,v 0.3 2005/07/22 10:24:27 alouis Exp $
23  * --------------------------------------------------------------------------
24  */

25
26 package org.objectweb.petals.jbi.transport;
27
28 import java.net.ConnectException JavaDoc;
29 import java.net.UnknownHostException JavaDoc;
30 import java.util.concurrent.ArrayBlockingQueue JavaDoc;
31
32 import javax.jbi.JBIException;
33 import javax.jms.Connection JavaDoc;
34 import javax.jms.DeliveryMode JavaDoc;
35 import javax.jms.JMSException JavaDoc;
36 import javax.jms.Message JavaDoc;
37 import javax.jms.MessageConsumer JavaDoc;
38 import javax.jms.MessageListener JavaDoc;
39 import javax.jms.MessageProducer JavaDoc;
40 import javax.jms.Session JavaDoc;
41 import javax.naming.InitialContext JavaDoc;
42 import javax.naming.NamingException JavaDoc;
43
44 import org.objectweb.joram.client.jms.ConnectionFactory;
45 import org.objectweb.joram.client.jms.Topic;
46 import org.objectweb.joram.client.jms.admin.AdminException;
47 import org.objectweb.joram.client.jms.admin.AdminModule;
48 import org.objectweb.joram.client.jms.local.LocalConnectionFactory;
49
50 import org.objectweb.petals.jbi.messaging.MessageExchangeImpl;
51 import org.objectweb.petals.util.JNDIUtil;
52
53 /**
54  * @author Adrien LOUIS - EBM WebSourcing
55  *
56  */

57 public class JoramConnection {
58
59     private int POOLSIZE = 8;
60
61     protected static final String JavaDoc TOPIC_NAME = "JoramConnection-";
62
63     protected String JavaDoc host;
64
65     protected InitialContext JavaDoc ictx;
66
67     protected int id;
68
69     protected String JavaDoc pwd;
70
71     protected Connection JavaDoc connection;
72
73     protected ArrayBlockingQueue JavaDoc<Session JavaDoc> sessionsPool;
74
75     protected Session JavaDoc session;
76
77     protected Topic topic;
78
79     protected int tcp;
80
81     protected String JavaDoc user;
82
83     protected boolean connectionStarted = false;
84
85     protected Serializer serializer = new ObjectSerializer();
86
87     public JoramConnection(int id, int tcp, String JavaDoc user, String JavaDoc pwd,
88         String JavaDoc host, InitialContext JavaDoc ictx) {
89         this.id = id;
90         this.tcp = tcp;
91         this.user = user;
92         this.pwd = pwd;
93         this.host = host;
94         this.ictx = ictx;
95     }
96
97     public static String JavaDoc containerNameToTopicName(String JavaDoc containerName) {
98         return TOPIC_NAME + containerName;
99     }
100
101     /**
102      * Create and bind the Topic, if necessary, and Subscribeon the topic of
103      * this server. The specified MessageListener is called when a message
104      * incomes
105      *
106      * @param aML
107      * @throws JMSException
108      * @throws JBIException
109      */

110     public void start(MessageListener JavaDoc aML) throws JMSException JavaDoc,
111         ConnectException JavaDoc, JBIException {
112
113         ConnectionFactory connectionFactory = null;
114         try {
115             connectAdminModule();
116             topic = bindTopic();
117             connectionFactory = (ConnectionFactory) LocalConnectionFactory
118                 .create();
119             disconnectAdminModule();
120         } catch (Exception JavaDoc e) {
121             throw new JBIException(e);
122         }
123
124         // FIXME optim multithread
125
//connectionFactory.getParameters().multiThreadSync = true;
126
//connectionFactory.getParameters().multiThreadSyncDelay = 1;
127

128         connection = connectionFactory.createConnection(user, pwd);
129         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
130         MessageConsumer JavaDoc consumer = session.createDurableSubscriber(topic, ""
131             + id);
132         consumer.setMessageListener(aML);
133
134         // create the sessions pool to send messages
135
sessionsPool = new ArrayBlockingQueue JavaDoc<Session JavaDoc>(POOLSIZE);
136         for (int i = 0; i < POOLSIZE; i++) {
137             sessionsPool.add(connection.createSession(false,
138                 Session.CLIENT_ACKNOWLEDGE));
139         }
140
141         connection.start();
142         connectionStarted = true;
143     }
144
145     /**
146      * Close jms and jndi connections. Clean sessions pool The Topic
147      * subscribtion is not removed.
148      *
149      * @throws JMSException
150      * can not close the jms connection or session
151      * @throws NamingException
152      * can not close the jndi context access
153      */

154     public void stop() throws JMSException JavaDoc, NamingException JavaDoc {
155
156         for (Session JavaDoc session : sessionsPool) {
157             session.close();
158         }
159         if (connectionStarted) {
160             connection.stop();
161             connectionStarted = false;
162         }
163     }
164
165     /**
166      * Create and bind the topic for this server in the global JNDI (create or
167      * retrieve it). Connection to the Joram AdminModule must be done.
168      *
169      * @return
170      * @throws NamingException
171      * @throws ConnectException
172      * @throws AdminException
173      */

174     protected Topic bindTopic() throws NamingException JavaDoc, ConnectException JavaDoc,
175         AdminException {
176         Topic t = null;
177
178         String JavaDoc topicName = TOPIC_NAME + id;
179
180         boolean alreadyBound = JNDIUtil.isBound(ictx, "/", topicName);
181
182         if (alreadyBound) {
183             // retreive the existing topic
184
t = (Topic) ictx.lookup(topicName);
185         } else {
186             // create and bind a new topic
187
t = Topic.create(topicName);
188             t.setFreeReading();
189             t.setFreeWriting();
190             ictx.bind(topicName, t);
191         }
192
193         return t;
194     }
195
196     protected void unbindTopic() throws NamingException JavaDoc {
197
198         String JavaDoc topicName = TOPIC_NAME + id;
199
200         boolean alreadyBound = JNDIUtil.isBound(ictx, "/", topicName);
201
202         if (alreadyBound) {
203             // retreive the existing topic
204
ictx.unbind(topicName);
205         }
206     }
207
208     /**
209      * send the MessageExchange to the remoteTopic. the exchange is tranformed
210      * into a jms Message
211      *
212      * @param remoteTopic
213      * @param messageExchange
214      * @param timeToLive
215      * @param persistence
216      * @throws TransportException
217      */

218     public void sendTo(String JavaDoc containerName,
219         MessageExchangeImpl messageExchange, long timeToLive, boolean persistence)
220         throws TransportException {
221
222         Session JavaDoc distantSession = null;
223         MessageProducer JavaDoc producer;
224
225         try {
226             // find the topic destination in jndi
227
Topic remoteTopic = findTopicForContainer(containerName);
228
229             // retrieve a session - block until a session is available
230
distantSession = sessionsPool.take();
231
232             // create a producer
233
producer = distantSession.createProducer(remoteTopic);
234
235             // set the TextMessage object with MessageExchange object
236
Message JavaDoc msg = serializer.jbi2jms(messageExchange, distantSession);
237
238             // set persistence to true or false
239
int persistenceMode = (persistence) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
240             
241             producer.send(msg, persistenceMode, 0, timeToLive);
242
243             producer.close();
244
245             sessionsPool.offer(distantSession);
246             
247             distantSession =null;
248
249         } catch (InterruptedException JavaDoc e) {
250             throw new TransportException(
251                 "Problem accessing the JMS sessions pool.", e);
252         } catch (JMSException JavaDoc e) {
253             throw new TransportException(
254                 "Can not send the message to the JMS destination.", e);
255         } finally {
256             // release the session
257
if (distantSession != null) {
258                 sessionsPool.offer(distantSession);
259             }
260         }
261     }
262
263     /**
264      * Unsubscribe from the Topic. The topic is removed from the Joram
265      * configuration and from the Jndi server.
266      *
267      * @throws JMSException
268      * @throws NamingException
269      * @throws AdminException
270      * @throws UnknownHostException
271      * @throws ConnectException
272      */

273     public void unsubscribe() throws JMSException JavaDoc, NamingException JavaDoc,
274         ConnectException JavaDoc, UnknownHostException JavaDoc, AdminException {
275
276         if (connectionStarted)
277             connection.stop();
278
279         session.close();
280         connection.start();
281
282         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
283         session.unsubscribe("" + id);
284         session.close();
285
286         connection.close();
287         connectionStarted = false;
288
289         unbindTopic();
290     }
291
292     /**
293      * Find in the JNDI context the registered Topic associated to the specified
294      * containerName
295      *
296      * @param containerName
297      * @return the topic related to the containerName
298      * @throws TransportException
299      * topic not found, or jndi access exception
300      */

301     protected Topic findTopicForContainer(String JavaDoc containerName)
302         throws TransportException {
303         Topic remoteTopic = null;
304         try {
305             remoteTopic = (Topic) ictx.lookup(JoramConnection
306                 .containerNameToTopicName(containerName));
307         } catch (NamingException JavaDoc e) {
308             throw new TransportException(
309                 "Can not find the Topic associate to the destination container '"
310                     + containerName + "'", e);
311         }
312         return remoteTopic;
313     }
314
315     /**
316      * @see AdminModule#connect(String, int, String, String, int, String)
317      */

318     protected void connectAdminModule() throws ConnectException JavaDoc,
319         UnknownHostException JavaDoc, AdminException {
320         AdminModule.connect(host, tcp, user, pwd, 3);
321     }
322
323     /**
324      * @see AdminModule#disconnect()
325      */

326     protected void disconnectAdminModule() {
327         AdminModule.disconnect();
328     }
329
330 }
331
Popular Tags