KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > petals > jbi > transport > JoramTransporter


1 /**
2  * PETALS: PETALS Services Platform
3  * Copyright (C) 2005 EBM WebSourcing
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): EBM WebSourcing
21  * --------------------------------------------------------------------------
22  * $Id: JoramTransporter.java,v 0.3 2005/07/22 10:24:27 alouis Exp $
23  * --------------------------------------------------------------------------
24  */

25
26 package org.objectweb.petals.jbi.transport;
27
28 import javax.jbi.JBIException;
29 import javax.jms.Message JavaDoc;
30 import javax.jms.MessageListener JavaDoc;
31 import javax.jms.ObjectMessage JavaDoc;
32 import javax.naming.InitialContext JavaDoc;
33
34 import org.objectweb.fractal.fraclet.annotation.FractalComponent;
35 import org.objectweb.fractal.fraclet.annotation.Interface;
36 import org.objectweb.fractal.fraclet.annotation.LifeCycle;
37 import org.objectweb.fractal.fraclet.annotation.LifeCycleType;
38 import org.objectweb.fractal.fraclet.annotation.Provides;
39 import org.objectweb.fractal.fraclet.annotation.Requires;
40 import org.objectweb.petals.jbi.messaging.MessageExchangeImpl;
41 import org.objectweb.petals.jbi.routing.Router;
42 import org.objectweb.petals.jbi.routing.RoutingException;
43 import org.objectweb.petals.util.LoggingUtil;
44 import org.objectweb.petals.util.PropertyUtil;
45 import org.objectweb.petals.util.SystemUtil;
46
47 /**
48  * Implementation of the <code>Transporter</code> with JORAM. <br>
49  * This implementation uses the fractal JoramAgent. <br>
50  * Each <code>JoramTransporter</code> registrers a <code>Topic</code> in the
51  * Joram distributed JNDI with the following name :
52  * <em>JoramConnection-[containerId]</em> <br>
53  * The name of a container is the <em>ID</em> specified in the
54  * <em>conf/server.properties</em> file. <br>
55  * This <code>JoramTransporter</code> uses an <code>ObjectSerializer</code>
56  * to create <em>jms.ObjectMessage</em> to send via JMS. So, the
57  * <code>MessageExchange</code> to send is serialized.
58  *
59  * @version $Rev: 477 $ $Date: 2006-05-29 17:18:07 +0200 (lun., 29 mai 2006) $
60  * @since Petals 1.0
61  * @author alouis
62  */

63 @FractalComponent
64 @Provides(interfaces = @Interface(name = "service", signature = org.objectweb.petals.jbi.transport.Transporter.class))
65 public class JoramTransporter implements Transporter {
66
67     protected String JavaDoc host;
68
69     protected int id;
70
71     protected JoramConnection joramConnection;
72
73     protected JoramAgent joramAgent;
74
75     protected Serializer serializer = new ObjectSerializer();
76
77     protected LoggingUtil log;
78
79     protected String JavaDoc pwd;
80
81     @Requires(name = "router", signature = org.objectweb.petals.jbi.routing.Router.class)
82     protected Router router;
83
84     protected int tcp;
85
86     protected String JavaDoc user;
87
88     /**
89      * Send the message to the specified container. The send can be local or
90      * distant. If the "org.objectweb.petals.transport.nonpersistent" exchange
91      * property is set to true, the message is not persisted by the Transport
92      * layer, but it can be lost in case of problem.
93      */

94     public void send(MessageExchangeImpl messageExchange, String JavaDoc containerName,
95             long timeToLive) throws TransportException {
96
97         if (id == Integer.parseInt(containerName)) {
98             sendLocal(messageExchange);
99
100         } else {
101             // check if the message has to be persisted
102
Object JavaDoc noPersist = messageExchange
103                     .getProperty(PROPERTY_NOPERSISTANCE);
104             boolean persist = (noPersist == null || !noPersist.toString()
105                     .toLowerCase().equals("true"));
106             sendDistant(messageExchange, containerName, timeToLive, persist);
107         }
108     }
109
110     /**
111      * init variables with joram.properties file and create/bind the topic for
112      * this container. A listener is created and started to listen incoming
113      * messages
114      *
115      * @throws Exception
116      */

117     protected void startJMS() throws Exception JavaDoc {
118         log.start();
119
120         // create the Joram Server
121
joramAgent = new JoramAgent(log);
122         joramAgent.startServer();
123
124         // id = Integer.parseInt(PropertyUtil.getProperty(
125
// PropertyUtil.SERVER_PROPS, "joram.id"));
126
id = Integer.parseInt(SystemUtil.getContainerName());
127
128         tcp = Integer.parseInt(SystemUtil.getJoramTCPPort());
129         user = SystemUtil.getJoramUser();
130         pwd = SystemUtil.getJoramPassword();
131         host = SystemUtil.getHost();
132
133         // create initialContext
134
InitialContext JavaDoc context = new javax.naming.InitialContext JavaDoc(PropertyUtil
135                 .retrieveJNDIProperties());
136
137         joramConnection = new JoramConnection(id, tcp, user, pwd, host, context);
138
139         log.end();
140     }
141
142     public void startListening() throws TransportException {
143         log.start();
144
145         MessageListener JavaDoc aML = new MessageListener JavaDoc() {
146             public void onMessage(Message JavaDoc aMessage) {
147                 processIncomingMessage(aMessage);
148             }
149         };
150         try {
151             joramConnection.start(aML);
152         } catch (Exception JavaDoc e) {
153             throw new TransportException(e);
154         }
155
156         log.end();
157     }
158
159     /**
160      * TODO manage exceptions This method is called asynchronously when a
161      * message is received from Joram, via the topic listener.
162      *
163      * The recevied message is transform into a <code>MessageExchangeImpl</code>
164      * by using the <code>Serializer</code>, and is sent to the
165      * <code>Router</code> by calling <code>receive()</code>.
166      *
167      * @param msg
168      * the jms message TODO processIncomingMessage :manage the
169      * reception of unknown message type. PB between the calculation
170      * of the next recipient and notion of service provider/consumer
171      */

172     protected void processIncomingMessage(Message JavaDoc msg) {
173         log.start();
174         if (msg instanceof ObjectMessage JavaDoc) {
175             try {
176                 MessageExchangeImpl jbiMsg = serializer.jms2jbi(msg);
177
178                 // transfer the message to the router
179
router.receive(jbiMsg);
180                 
181                 // ack the message
182
msg.acknowledge();
183             } catch (Exception JavaDoc e) {
184                 log.error(e.getMessage(), e);
185             }
186         } else {
187             log.error("the received JMS message is not an objectmessage");
188         }
189         log.end();
190     }
191
192     /**
193      * Send the message to the specified distant container, by serializing the
194      * exhange with a <code>Serializer</code>. We retrieve the related
195      * distant <code>Topic</code> with the namle of the container, transform
196      * the <code>MessageExchangeImpl</code> into a <code>jms.Message</code>,
197      * and send this message onto the distant topic.
198      *
199      * @param messageExchange
200      * must be non null
201      * @param containerName
202      * must be non null
203      * @param timeToLive
204      * @param persist
205      * @throws TransportException
206      * container not found, invalid message.
207      */

208     protected void sendDistant(MessageExchangeImpl messageExchange,
209             String JavaDoc containerName, long timeToLive, boolean persist)
210             throws TransportException {
211         log.start();
212         try {
213             joramConnection.sendTo(containerName, messageExchange, timeToLive,
214                     persist);
215         } catch (TransportException e) {
216             log.error(e.getMessage(), e);
217             throw e;
218         }
219         log.end();
220     }
221
222     /**
223      * init variables with joram.properties file and create/bind the topic for
224      * this container. A listener is created and started to listen incoming
225      * messages
226      *
227      * @throws Exception
228      */

229     @LifeCycle(on = LifeCycleType.START)
230     protected void start() throws Exception JavaDoc {
231         log = new LoggingUtil(null);
232         log.start();
233
234         // create the Joram Server
235
joramAgent = new JoramAgent(log);
236         joramAgent.startServer();
237
238         // id = Integer.parseInt(PropertyUtil.getProperty(
239
// PropertyUtil.SERVER_PROPS, "joram.id"));
240
id = Integer.parseInt(SystemUtil.getContainerName());
241
242         tcp = Integer.parseInt(SystemUtil.getJoramTCPPort());
243         user = SystemUtil.getJoramUser();
244         pwd = SystemUtil.getJoramPassword();
245         host = SystemUtil.getHost();
246
247         // create initialContext
248
InitialContext JavaDoc context = new javax.naming.InitialContext JavaDoc(PropertyUtil
249                 .retrieveJNDIProperties());
250
251         joramConnection = new JoramConnection(id, tcp, user, pwd, host, context);
252
253         log.end();
254     }
255
256     /**
257      * Send the message locally. No serialization of the message
258      *
259      * @param messageExchange
260      * a messageExchange, can be null
261      * @throws TransportException
262      * problem with the <code>Router</code>
263      */

264     protected void sendLocal(MessageExchangeImpl messageExchange)
265             throws TransportException {
266         log.start();
267         try {
268             router.receive(messageExchange);
269         } catch (RoutingException e) {
270             String JavaDoc msg = "Can not send locally the MessageExchange to the local Router.";
271             log.error(msg, e);
272             throw new TransportException(msg, e);
273         }
274         log.end();
275     }
276
277     /**
278      * close the JMS connection (but does not close the joram server)
279      *
280      * @throws JBIException
281      */

282     @LifeCycle(on = LifeCycleType.STOP)
283     public void stop() throws TransportException {
284         log.start();
285         try {
286             joramConnection.stop();
287             joramAgent.stopServer();
288         } catch (Exception JavaDoc e) {
289             String JavaDoc msg = "Can not stop the JMS connection.";
290             log.error(msg, e);
291             throw new TransportException(msg, e);
292         }
293         log.end();
294     }
295
296     /**
297      * Shutdown the Transporter. The related Topic is removed from Joram and
298      * from the distributed Jndi server
299      */

300     public void shutdown() throws TransportException {
301         try {
302             joramConnection.unsubscribe();
303         } catch (Exception JavaDoc e) {
304             String JavaDoc msg = "Can not unreference the Transporter from the Registry.";
305             log.error(msg, e);
306             throw new TransportException(msg, e);
307         }
308         try {
309             joramAgent.shutdown();
310         } catch (Exception JavaDoc e) {
311             String JavaDoc msg = "Can not shutdown correctly the Joram server.";
312             log.error(msg, e);
313             throw new TransportException(msg, e);
314         }
315
316     }
317 }
Popular Tags