KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > BridgeTopicImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - ScalAgent Distributed Technologies
4  * Copyright (C) 2003 - Bull SA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (Bull SA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.mom.dest;
25
26 import fr.dyade.aaa.agent.AgentId;
27 import fr.dyade.aaa.agent.Channel;
28 import fr.dyade.aaa.agent.DeleteNot;
29 import fr.dyade.aaa.agent.Notification;
30 import fr.dyade.aaa.agent.UnknownNotificationException;
31 import org.objectweb.joram.mom.MomTracing;
32 import org.objectweb.joram.mom.notifications.*;
33 import org.objectweb.joram.mom.util.*;
34 import org.objectweb.joram.shared.excepts.*;
35 import org.objectweb.joram.shared.messages.Message;
36
37 import java.util.Enumeration JavaDoc;
38 import java.util.Hashtable JavaDoc;
39 import java.util.Properties JavaDoc;
40 import java.util.Vector JavaDoc;
41
42 import org.objectweb.util.monolog.api.BasicLevel;
43
44
45 /**
46  * The <code>BridgeTopicImpl</code> class implements a specific topic which
47  * forwards the messages it receives to a foreign JMS destination, and
48  * gets the messages it is requested to deliver from the same foreign
49  * destination.
50  * <p>
51  * This topic is in fact a bridge linking JORAM and a foreign JMS server,
52  * and which is accessible through the Pub/Sub communication mode.
53  */

54 public class BridgeTopicImpl extends TopicImpl {
55   /** The JMS module for accessing the foreign JMS destination. */
56   private BridgeUnifiedModule jmsModule;
57
58   /** Counter for keeping the original delivery order. */
59   private long arrivalsCounter = 0;
60   /**
61    * Table persisting the outgoing messages until acknowledgement by the
62    * bridge module.
63    * <p>
64    * <b>Key:</b> message identifier<br>
65    * <b>Value:</b> message
66    */

67   private Hashtable JavaDoc outTable;
68
69   /**
70    * Constructs a <code>BridgeTopicImpl</code> instance.
71    *
72    * @param destId Identifier of the agent hosting the topic.
73    * @param adminId Identifier of the administrator of the topic.
74    */

75   public BridgeTopicImpl(AgentId destId, AgentId adminId)
76   {
77     super(destId, adminId);
78     outTable = new Hashtable JavaDoc();
79   }
80
81
82   public String JavaDoc toString()
83   {
84     return "BridgeTopicImpl:" + destId.toString();
85   }
86
87
88   /**
89    * Initiales the topic's JMS module.
90    *
91    * @exception IllegalStateException If the provided JMS properties are
92    * invalid.
93    */

94   public void init(Properties JavaDoc prop) {
95     String JavaDoc jmsMode = (String JavaDoc) prop.get("jmsMode");
96
97     if (jmsMode == null)
98       throw new IllegalArgumentException JavaDoc("Missing jmsMode property");
99
100     if (jmsMode.equalsIgnoreCase("PTP"))
101       jmsModule = new BridgePtpModule();
102     else if (jmsMode.equalsIgnoreCase("PubSub"))
103       jmsModule = new BridgePubSubModule();
104     else if (jmsMode.equalsIgnoreCase("Unified"))
105       jmsModule = new BridgeUnifiedModule();
106     else
107       throw new IllegalArgumentException JavaDoc("Invalid jmsMode value: " + jmsMode);
108
109     // Initializing the JMS module.
110
jmsModule.init(destId, prop);
111   }
112
113   /**
114    * Specializes this <code>TopicImpl</code> method for processing the
115    * specific bridge notifications.
116    */

117   public void react(AgentId from, Notification not)
118               throws UnknownNotificationException
119   {
120     if (not instanceof BridgeDeliveryNot)
121       doReact((BridgeDeliveryNot) not);
122     else if (not instanceof BridgeAckNot)
123       doReact((BridgeAckNot) not);
124     else
125       super.react(from, not);
126   }
127
128   /**
129    * Reacts to <code>BridgeDeliveryNot</code> notifications holding a message
130    * received from the foreign JMS server.
131    */

132   protected void doReact(BridgeDeliveryNot not)
133   {
134     ClientMessages clientMessages = new ClientMessages();
135     clientMessages.addMessage(not.getMessage());
136     super.doProcess(clientMessages);
137   }
138
139   /**
140    * Reacts to <code>BridgeAckNot</code> notifications holding the identifier
141    * of a message successfuly delivered to the foreign JMS server.
142    */

143   protected void doReact(BridgeAckNot not)
144   {
145     outTable.remove(not.getIdentifier());
146   }
147   
148   /**
149    * Method specializing the reaction to a <code>SubscribeRequest</code>
150    * instance.
151    * <p>
152    * This method sets, if needed, a JMS listener on the foreign JMS consumer.
153    *
154    * @exception AccessException If the sender is not a READER.
155    */

156   protected void doReact(AgentId from, SubscribeRequest not)
157                  throws AccessException
158   {
159     super.doReact(from, not);
160
161     // First subscription: setting a listener on the foreign JMS consumer.
162
try {
163       if (subscribers.size() == 1)
164         jmsModule.setMessageListener();
165     }
166     catch (Exception JavaDoc exc) {
167       if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
168         MomTracing.dbgDestination.log(BasicLevel.ERROR,
169                                       "Failing subscribe request on remote "
170                                       + "destination: " + exc);
171     }
172   }
173
174   /**
175    * Method specializing the reaction to an <code>UnsubscribeRequest</code>
176    * instance.
177    * <p>
178    * This method unsets, if needed, the JMS listener on the foreign
179    * JMS consumer.
180    *
181    */

182   protected void doReact(AgentId from, UnsubscribeRequest not)
183   {
184     // Last subscription: removing the JMS listener.
185
if (subscribers.isEmpty())
186       jmsModule.unsetMessageListener();
187
188     super.doReact(from, not);
189   }
190
191   /**
192    * Method specializing the reaction to a <code>TopicForwardNot</code>
193    * instance, carrying messages forwarded by a cluster fellow or a
194    * hierarchical son.
195    * <p>
196    * This method forwards the messages, if needed, to the hierarchical father,
197    * and to the foreign JMS destination.
198    */

199   protected void doReact(AgentId from, TopicForwardNot not)
200   {
201     // If the forward comes from a son, forwarding it to the father, if any.
202
if (not.toFather && fatherId != null)
203       Channel.sendTo(fatherId, not);
204     
205     // Sending the received messages to the foreign JMS destination:
206
Message msg;
207     for (Enumeration JavaDoc msgs = not.messages.getMessages().elements();
208          msgs.hasMoreElements();) {
209
210       if (arrivalsCounter == Long.MAX_VALUE)
211         arrivalsCounter = 0;
212
213       msg = (Message) msgs.nextElement();
214       msg.order = arrivalsCounter++;
215
216       outTable.put(msg.getIdentifier(), msg);
217
218       try {
219         jmsModule.send(msg);
220       }
221       catch (Exception JavaDoc exc) {
222         outTable.remove(msg.getIdentifier());
223         ClientMessages deadM = new ClientMessages();
224         deadM.addMessage(msg);
225         sendToDMQ(deadM, null);
226       }
227     }
228   }
229
230   /**
231    * Method specializing the reaction to a <code>ClientMessages</code>
232    * instance.
233    * <p>
234    * This method may forward the messages to the topic father if any, or
235    * to the cluster fellows if any, and to the foreign JMS destination.
236    */

237   protected void doProcess(ClientMessages not)
238   {
239     // Forwarding the messages to the father or the cluster fellows, if any:
240
forwardMessages(not);
241
242     // Sending the received messages to the foreign JMS destination:
243
Message msg;
244     for (Enumeration JavaDoc msgs = not.getMessages().elements();
245          msgs.hasMoreElements();) {
246       
247       if (arrivalsCounter == Long.MAX_VALUE)
248         arrivalsCounter = 0;
249
250       msg = (Message) msgs.nextElement();
251       msg.order = arrivalsCounter++;
252
253       outTable.put(msg.getIdentifier(), msg);
254
255       try {
256         jmsModule.send(msg);
257       }
258       catch (Exception JavaDoc exc) {
259         outTable.remove(msg.getIdentifier());
260         ClientMessages deadM;
261         deadM = new ClientMessages(not.getClientContext(), not.getRequestId());
262         deadM.addMessage(msg);
263         sendToDMQ(deadM, not.getDMQId());
264       }
265     }
266   }
267
268   /**
269    * Method specifically processing a
270    * <code>fr.dyade.aaa.agent.DeleteNot</code> instance.
271    * <p>
272    * This method closes the JMS resources used for connecting to the foreign
273    * JMS server.
274    */

275   protected void doProcess(DeleteNot not)
276   {
277     jmsModule.close();
278     super.doProcess(not);
279   }
280
281   /** Deserializes a <code>BridgeTopicImpl</code> instance. */
282   private void readObject(java.io.ObjectInputStream JavaDoc in)
283                throws java.io.IOException JavaDoc, ClassNotFoundException JavaDoc
284   {
285     in.defaultReadObject();
286
287     // Re-launching the JMS module.
288
try {
289       jmsModule.connect();
290
291       if (! subscribers.isEmpty())
292         jmsModule.setMessageListener();
293
294       // Re-emetting the pending messages:
295
Message msg;
296       Vector JavaDoc outMessages = new Vector JavaDoc();
297       Message currentMsg;
298       for (Enumeration JavaDoc keys = outTable.keys(); keys.hasMoreElements();) {
299         msg = (Message) outTable.get(keys.nextElement());
300   
301         int i = 0;
302         while (i < outMessages.size()) {
303           currentMsg = (Message) outMessages.get(i);
304   
305           if (msg.order < currentMsg.order)
306             break;
307   
308           i++;
309         }
310         outMessages.insertElementAt(msg, i);
311       }
312
313       while (! outMessages.isEmpty()) {
314         msg = (Message) outMessages.remove(0);
315         jmsModule.send(msg);
316       }
317     }
318     catch (Exception JavaDoc exc) {
319       if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
320         MomTracing.dbgDestination.log(BasicLevel.ERROR, "" + exc);
321     }
322   }
323 }
324
Popular Tags