KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > BridgeQueueImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - ScalAgent Distributed Technologies
4  * Copyright (C) 2003 - Bull SA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (Bull SA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.mom.dest;
25
26 import fr.dyade.aaa.agent.AgentId;
27 import fr.dyade.aaa.agent.Channel;
28 import fr.dyade.aaa.agent.DeleteNot;
29 import fr.dyade.aaa.agent.Notification;
30 import fr.dyade.aaa.agent.UnknownNotificationException;
31 import org.objectweb.joram.mom.MomTracing;
32 import org.objectweb.joram.mom.notifications.*;
33 import org.objectweb.joram.mom.util.*;
34 import org.objectweb.joram.shared.excepts.*;
35 import org.objectweb.joram.shared.messages.Message;
36 import org.objectweb.joram.shared.selectors.Selector;
37
38 import java.io.IOException JavaDoc;
39 import java.util.Enumeration JavaDoc;
40 import java.util.Hashtable JavaDoc;
41 import java.util.Properties JavaDoc;
42 import java.util.Vector JavaDoc;
43
44 import org.objectweb.util.monolog.api.BasicLevel;
45
46 /**
47  * The <code>BridgeQueueImpl</code> class implements a specific queue which
48  * forwards the messages it receives to a foreign JMS destination, and
49  * gets the messages it is requested to deliver from the same foreign
50  * destination.
51  * <p>
52  * This queue is in fact a bridge linking JORAM and a foreign JMS server,
53  * and which is accessible through the PTP communication mode.
54  */

55 public class BridgeQueueImpl extends QueueImpl {
56   /** The JMS module for accessing the foreign JMS destination. */
57   private BridgeUnifiedModule jmsModule;
58   /**
59    * Table persisting the outgoing messages until acknowledgement by the
60    * bridge module.
61    * <p>
62    * <b>Key:</b> message identifier<br>
63    * <b>Value:</b> message
64    */

65   private Hashtable JavaDoc outTable;
66
67   /**
68    * Constructs a <code>BridgeQueueImpl</code> instance.
69    *
70    * @param destId Identifier of the agent hosting the queue.
71    * @param adminId Identifier of the administrator of the queue.
72    */

73   public BridgeQueueImpl(AgentId destId, AgentId adminId)
74   {
75     super(destId, adminId);
76     outTable = new Hashtable JavaDoc();
77   }
78
79
80   public String JavaDoc toString()
81   {
82     return "BridgeQueueImpl:" + destId.toString();
83   }
84
85
86   /**
87    * Initiales the queue's JMS module.
88    *
89    * @exception IllegalStateException If the provided JMS properties are
90    * invalid.
91    */

92   public void init(Properties JavaDoc prop) {
93     String JavaDoc jmsMode = (String JavaDoc) prop.get("jmsMode");
94
95     if (jmsMode == null)
96       throw new IllegalArgumentException JavaDoc("Missing 'jmsMode' property");
97
98     if (jmsMode.equalsIgnoreCase("PTP"))
99       jmsModule = new BridgePtpModule();
100     else if (jmsMode.equalsIgnoreCase("PubSub"))
101       jmsModule = new BridgePubSubModule();
102     else if (jmsMode.equalsIgnoreCase("Unified"))
103       jmsModule = new BridgeUnifiedModule();
104     else
105       throw new IllegalArgumentException JavaDoc("Invalid 'jmsMode' value: "
106                                          + jmsMode);
107
108     // Initializing the JMS module.
109
jmsModule.init(destId, prop);
110   }
111
112   /**
113    * Specializes this <code>QueueImpl</code> method for processing the
114    * specific bridge notifications.
115    */

116   public void react(AgentId from, Notification not)
117               throws UnknownNotificationException
118   {
119     if (not instanceof BridgeDeliveryNot) {
120       doReact((BridgeDeliveryNot) not);
121     }
122     else if (not instanceof BridgeAckNot)
123       doReact((BridgeAckNot) not);
124     else
125       super.react(from, not);
126   }
127
128   /**
129    * Reacts to <code>BridgeDeliveryNot</code> notifications holding a message
130    * received from the foreign JMS server.
131    */

132   protected void doReact(BridgeDeliveryNot not)
133   {
134     ClientMessages clientMessages = new ClientMessages();
135     clientMessages.addMessage(not.getMessage());
136     super.doProcess(clientMessages);
137   }
138
139   /**
140    * Reacts to <code>BridgeAckNot</code> notifications holding the identifier
141    * of a message successfuly delivered to the foreign JMS server.
142    */

143   protected void doReact(BridgeAckNot not)
144   {
145     outTable.remove(not.getIdentifier());
146   }
147   
148   /**
149    * Method specializing the reaction to a <code>ReceiveRequest</code>
150    * instance, requesting a message.
151    * <p>
152    * This method stores the request and requests a message through the JMS
153    * interface.
154    *
155    * @exception AccessException If the sender is not a reader.
156    */

157   protected void doReact(AgentId from, ReceiveRequest not)
158                  throws AccessException
159   {
160     // If client is not a reader, sending an exception.
161
if (! isReader(from))
162       throw new AccessException("READ right not granted");
163
164     // Storing the request:
165
not.requester = from;
166     not.setExpiration(System.currentTimeMillis());
167     requests.add(not);
168
169     // Launching a delivery sequence for this request:
170
int reqIndex = requests.size() - 1;
171     deliverMessages(reqIndex);
172
173     // If the request has not been answered:
174
if ((requests.size() - 1) == reqIndex) {
175       // If it is an immediate delivery request, requesting the foreign JMS
176
// destination for an immediate delivery.
177
if (not.getTimeOut() == -1) {
178         requests.remove(reqIndex);
179
180         Message msg = null;
181
182         try {
183           msg = jmsModule.receiveNoWait();
184         }
185         // JMS module not properly initialized.
186
catch (Exception JavaDoc exc) {
187           if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
188             MomTracing.dbgDestination.log(BasicLevel.ERROR,
189                                           "Failing receive request on remote "
190                                           + "destination: " + exc);
191         }
192
193         // If message not null but not selected, setting it to null.
194
if (msg != null && ! Selector.matches(msg, not.getSelector()))
195           msg = null;
196
197         QueueMsgReply reply = new QueueMsgReply(not);
198         reply.addMessage(msg);
199         Channel.sendTo(from, reply);
200
201         if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
202           MomTracing.dbgDestination.log(BasicLevel.DEBUG,
203                                         "Receive answered.");
204       }
205       // Else, requesting the foreign JMS destination for a delivery:
206
else {
207         try {
208           jmsModule.receive();
209         }
210         // JMS module not properly initialized.
211
catch (Exception JavaDoc exc) {
212           if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
213             MomTracing.dbgDestination.log(BasicLevel.ERROR,
214                                           "Failing receive request on remote "
215                                           + "destination: " + exc);
216         }
217       }
218     }
219   }
220
221   /**
222    * Method specializing the processing of a <code>ClientMessages</code>
223    * instance.
224    * <p>
225    * This method sends the messages to the foreign JMS destination.
226    */

227   protected void doProcess(ClientMessages not)
228   {
229     // Sending each message:
230
Message msg;
231     for (Enumeration JavaDoc msgs = not.getMessages().elements();
232          msgs.hasMoreElements();) {
233
234       if (arrivalsCounter == Long.MAX_VALUE)
235         arrivalsCounter = 0;
236
237       msg = (Message) msgs.nextElement();
238       msg.order = arrivalsCounter++;
239
240       outTable.put(msg.getIdentifier(), msg);
241
242       try {
243         jmsModule.send(msg);
244       }
245       catch (Exception JavaDoc exc) {
246         if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
247           MomTracing.dbgDestination.log(BasicLevel.ERROR,
248                                         "Failing sending to remote "
249                                         + "destination: "
250                                         + exc);
251
252         outTable.remove(msg.getIdentifier());
253         ClientMessages deadM;
254         deadM = new ClientMessages(not.getClientContext(), not.getRequestId());
255         deadM.addMessage(msg);
256         sendToDMQ(deadM, not.getDMQId());
257       }
258     }
259   }
260
261   /**
262    * Method specifically processing a
263    * <code>fr.dyade.aaa.agent.DeleteNot</code> instance.
264    * <p>
265    * This method closes the JMS resources used for connecting to the foreign
266    * JMS server.
267    */

268   protected void doProcess(DeleteNot not)
269   {
270     jmsModule.close();
271     super.doProcess(not);
272   }
273
274
275   /** Deserializes a <code>BridgeQueueImpl</code> instance. */
276   private void readObject(java.io.ObjectInputStream JavaDoc in)
277                throws IOException JavaDoc, ClassNotFoundException JavaDoc
278   {
279     in.defaultReadObject();
280
281     messages = new Vector JavaDoc();
282     deliveredMsgs = new Hashtable JavaDoc();
283
284     // Retrieving the persisted messages, if any.
285
Vector JavaDoc persistedMsgs = MessagePersistenceModule.loadAll(getDestinationId());
286
287     if (persistedMsgs != null) {
288 // persistence message are only in memory.
289
// MessagePersistenceModule.deleteAll(getDestinationId());
290
Message persistedMsg;
291       AgentId consId;
292       while (! persistedMsgs.isEmpty()) {
293         persistedMsg = (Message) persistedMsgs.remove(0);
294         consId = (AgentId) consumers.get(persistedMsg.getIdentifier());
295         if (consId == null)
296           storeMessage(persistedMsg);
297         else {
298           deliveredMsgs.put(persistedMsg.getIdentifier(), persistedMsg);
299           persistedMsg.save(getDestinationId());
300         }
301       }
302     }
303
304     // Re-launching the JMS module.
305
try {
306       jmsModule.connect();
307
308       // Re-emitting the receive requests:
309
for (int i = 0; i < requests.size(); i++)
310         jmsModule.receive();
311
312       // Re-emetting the pending messages:
313
Message msg;
314       Vector JavaDoc outMessages = new Vector JavaDoc();
315       Message currentMsg;
316       for (Enumeration JavaDoc keys = outTable.keys(); keys.hasMoreElements();) {
317         msg = (Message) outTable.get(keys.nextElement());
318   
319         int i = 0;
320         while (i < outMessages.size()) {
321           currentMsg = (Message) outMessages.get(i);
322   
323           if (msg.order < currentMsg.order)
324             break;
325   
326           i++;
327         }
328         outMessages.insertElementAt(msg, i);
329       }
330
331       while (! outMessages.isEmpty()) {
332         msg = (Message) outMessages.remove(0);
333         jmsModule.send(msg);
334       }
335     }
336     catch (Exception JavaDoc exc) {
337       if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
338         MomTracing.dbgDestination.log(BasicLevel.ERROR, "" + exc);
339     }
340   }
341 }
342
Popular Tags