KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > bridge > BridgeTopicImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - 2007 ScalAgent Distributed Technologies
4  * Copyright (C) 2003 - 2004 Bull SA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (Bull SA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.mom.dest.bridge;
25
26 import java.util.Enumeration JavaDoc;
27 import java.util.Hashtable JavaDoc;
28 import java.util.Properties JavaDoc;
29 import java.util.Vector JavaDoc;
30
31 import org.objectweb.joram.mom.dest.TopicForwardNot;
32 import org.objectweb.joram.mom.dest.TopicImpl;
33 import org.objectweb.joram.mom.messages.Message;
34 import org.objectweb.joram.mom.notifications.ClientMessages;
35 import org.objectweb.joram.mom.notifications.SubscribeRequest;
36 import org.objectweb.joram.mom.notifications.UnsubscribeRequest;
37 import org.objectweb.joram.shared.JoramTracing;
38 import org.objectweb.joram.shared.excepts.AccessException;
39 import org.objectweb.util.monolog.api.BasicLevel;
40
41 import fr.dyade.aaa.agent.AgentId;
42 import fr.dyade.aaa.agent.DeleteNot;
43
44 /**
45  * The <code>BridgeTopicImpl</code> class implements a specific topic which
46  * forwards the messages it receives to a foreign JMS destination, and
47  * gets the messages it is requested to deliver from the same foreign
48  * destination.
49  * <p>
50  * This topic is in fact a bridge linking JORAM and a foreign JMS server,
51  * and which is accessible through the Pub/Sub communication mode.
52  */

53 public class BridgeTopicImpl extends TopicImpl {
54   /** The JMS module for accessing the foreign JMS destination. */
55   private BridgeModule jmsModule;
56
57   /** Counter for keeping the original delivery order. */
58   private long arrivalsCounter = 0;
59
60   /**
61    * Table persisting the outgoing messages until acknowledgement by the
62    * bridge module.
63    * <p>
64    * <b>Key:</b> message identifier<br>
65    * <b>Value:</b> message
66    */

67   private Hashtable JavaDoc outTable;
68
69   /**
70    * Constructs a <code>BridgeTopicImpl</code> instance.
71    *
72    * @param destId Identifier of the agent hosting the topic.
73    * @param adminId Identifier of the administrator of the topic.
74    * @param prop The initial set of properties.
75    */

76   public BridgeTopicImpl(AgentId destId, AgentId adminId, Properties JavaDoc prop) {
77     super(destId, adminId, prop);
78     outTable = new Hashtable JavaDoc();
79     jmsModule = new BridgeModule();
80
81     // Initializing the JMS module.
82
jmsModule.init(destId, prop);
83   }
84
85   public String JavaDoc toString() {
86     return "BridgeTopicImpl:" + destId.toString();
87   }
88
89   /**
90    * Reacts to <code>BridgeDeliveryNot</code> notifications holding a message
91    * received from the foreign JMS server.
92    */

93   public void bridgeDeliveryNot(AgentId from, BridgeDeliveryNot not) {
94     ClientMessages clientMessages = new ClientMessages();
95     clientMessages.addMessage(not.getMessage());
96     super.doClientMessages(destId, clientMessages);
97   }
98
99   /**
100    * Reacts to <code>BridgeAckNot</code> notifications holding the identifier
101    * of a message successfuly delivered to the foreign JMS server.
102    */

103   public void bridgeAckNot(BridgeAckNot not) {
104     outTable.remove(not.getIdentifier());
105   }
106   
107   /**
108    * Method specializing the reaction to a <code>SubscribeRequest</code>
109    * instance.
110    * <p>
111    * This method sets, if needed, a JMS listener on the foreign JMS consumer.
112    *
113    * @exception AccessException If the sender is not a READER.
114    */

115   public void postSubscribe(SubscribeRequest not) {
116
117     // First subscription: setting a listener on the foreign JMS consumer.
118
try {
119       if (subscribers.size() == 1)
120         jmsModule.setMessageListener();
121     } catch (Exception JavaDoc exc) {
122       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
123         JoramTracing.dbgDestination.log(BasicLevel.ERROR,
124                                         "Failing subscribe request on remote destination: ", exc);
125     }
126   }
127
128   /**
129    * Method specializing the reaction to an <code>UnsubscribeRequest</code>
130    * instance.
131    * <p>
132    * This method unsets, if needed, the JMS listener on the foreign
133    * JMS consumer.
134    *
135    */

136   public void preUnsubscribe(UnsubscribeRequest not) {
137     // Last subscription: removing the JMS listener.
138
if (subscribers.isEmpty())
139       jmsModule.unsetMessageListener();
140   }
141
142   /**
143    * Method specializing the reaction to a <code>TopicForwardNot</code>
144    * instance, carrying messages forwarded by a cluster fellow or a
145    * hierarchical son.
146    * <p>
147    * This method forwards the messages, if needed, to the hierarchical father,
148    * and to the foreign JMS destination.
149    */

150   public void topicForwardNot(AgentId from, TopicForwardNot not) {
151     // If the forward comes from a son, forwarding it to the father, if any.
152
if (not.toFather && fatherId != null)
153       forward(fatherId, not);
154     
155     // Sending the received messages to the foreign JMS destination:
156
Message message;
157     for (Enumeration JavaDoc msgs = not.messages.getMessages().elements();
158          msgs.hasMoreElements();) {
159       // AF: TODO it seems not usefull to transform the message !!
160
message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement());
161       message.order = arrivalsCounter++;
162
163       outTable.put(message.getIdentifier(), message);
164
165       try {
166         jmsModule.send(message.msg);
167       } catch (Exception JavaDoc exc) {
168         outTable.remove(message.getIdentifier());
169         ClientMessages deadM;
170         deadM = new ClientMessages();
171         deadM.addMessage(message.msg);
172         sendToDMQ(deadM, null);
173       }
174     }
175   }
176
177   /**
178    * Method specializing the reaction to a <code>ClientMessages</code>
179    * instance.
180    * <p>
181    * This method may forward the messages to the topic father if any, or
182    * to the cluster fellows if any, and to the foreign JMS destination.
183    */

184   public ClientMessages preProcess(AgentId from, ClientMessages not) {
185     if (destId.equals(from))
186       return not;
187     
188     // Forwarding the messages to the father or the cluster fellows, if any:
189
forwardMessages(not);
190
191     // Sending the received messages to the foreign JMS destination:
192
Message message;
193     for (Enumeration JavaDoc msgs = not.getMessages().elements();
194          msgs.hasMoreElements();) {
195       message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement());
196       message.order = arrivalsCounter++;
197
198       outTable.put(message.getIdentifier(), message);
199
200       try {
201         jmsModule.send(message.msg);
202       } catch (Exception JavaDoc exc) {
203         outTable.remove(message.getIdentifier());
204         ClientMessages deadM;
205         deadM = new ClientMessages(not.getClientContext(), not.getRequestId());
206         deadM.addMessage(message.msg);
207         sendToDMQ(deadM, not.getDMQId());
208       }
209     }
210     return null;
211   }
212
213   /**
214    * Method specifically processing a
215    * <code>fr.dyade.aaa.agent.DeleteNot</code> instance.
216    * <p>
217    * This method closes the JMS resources used for connecting to the foreign
218    * JMS server.
219    */

220   protected void doDeleteNot(DeleteNot not) {
221     jmsModule.close();
222     super.doDeleteNot(not);
223   }
224
225   /** Deserializes a <code>BridgeTopicImpl</code> instance. */
226   private void readObject(java.io.ObjectInputStream JavaDoc in)
227                throws java.io.IOException JavaDoc, ClassNotFoundException JavaDoc {
228     in.defaultReadObject();
229
230     // Re-launching the JMS module.
231
try {
232       jmsModule.connect();
233
234       if (! subscribers.isEmpty())
235         jmsModule.setMessageListener();
236
237       // Re-emetting the pending messages:
238
Message momMsg;
239       Vector JavaDoc outMessages = new Vector JavaDoc();
240       Message currentMsg;
241       for (Enumeration JavaDoc keys = outTable.keys(); keys.hasMoreElements();) {
242         momMsg = (Message) outTable.get(keys.nextElement());
243   
244         int i = 0;
245         while (i < outMessages.size()) {
246           currentMsg = (Message) outMessages.get(i);
247   
248           if (momMsg.order < currentMsg.order)
249             break;
250   
251           i++;
252         }
253         outMessages.insertElementAt(momMsg, i);
254       }
255
256       while (! outMessages.isEmpty()) {
257         momMsg = (Message) outMessages.remove(0);
258         jmsModule.send(momMsg.msg);
259       }
260     }
261     catch (Exception JavaDoc exc) {
262       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
263         JoramTracing.dbgDestination.log(BasicLevel.ERROR, "", exc);
264     }
265   }
266 }
267
Popular Tags