KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > bridge > BridgeQueueImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - 2007 ScalAgent Distributed Technologies
4  * Copyright (C) 2003 - 2004 Bull SA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (Bull SA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.mom.dest.bridge;
25
26 import java.io.IOException JavaDoc;
27 import java.util.Enumeration JavaDoc;
28 import java.util.Hashtable JavaDoc;
29 import java.util.Properties JavaDoc;
30 import java.util.Vector JavaDoc;
31
32 import org.objectweb.joram.mom.dest.QueueImpl;
33 import org.objectweb.joram.mom.messages.Message;
34 import org.objectweb.joram.mom.notifications.ClientMessages;
35 import org.objectweb.joram.mom.notifications.QueueMsgReply;
36 import org.objectweb.joram.mom.notifications.ReceiveRequest;
37 import org.objectweb.joram.shared.JoramTracing;
38 import org.objectweb.joram.shared.excepts.AccessException;
39 import org.objectweb.joram.shared.selectors.Selector;
40 import org.objectweb.util.monolog.api.BasicLevel;
41
42 import fr.dyade.aaa.agent.AgentId;
43 import fr.dyade.aaa.agent.DeleteNot;
44
45 /**
46  * The <code>BridgeQueueImpl</code> class implements a specific queue which
47  * forwards the messages it receives to a foreign JMS destination, and
48  * gets the messages it is requested to deliver from the same foreign
49  * destination.
50  * <p>
51  * This queue is in fact a bridge linking JORAM and a foreign JMS server.
52  */

53 public class BridgeQueueImpl extends QueueImpl {
54   /** The JMS module for accessing the foreign JMS destination. */
55   private BridgeModule jmsModule;
56   /**
57    * Table persisting the outgoing messages until acknowledgement by the
58    * bridge module.
59    * <p>
60    * <b>Key:</b> message identifier<br>
61    * <b>Value:</b> message
62    */

63   private Hashtable JavaDoc outTable;
64
65   /**
66    * Constructs a <code>BridgeQueueImpl</code> instance.
67    *
68    * @param destId Identifier of the agent hosting the queue.
69    * @param adminId Identifier of the administrator of the queue.
70    * @param prop The initial set of properties.
71    */

72   public BridgeQueueImpl(AgentId destId, AgentId adminId, Properties JavaDoc prop) {
73     super(destId, adminId, prop);
74     outTable = new Hashtable JavaDoc();
75     jmsModule = new BridgeModule();
76
77     // Initializing the JMS module.
78
jmsModule.init(destId, prop);
79   }
80
81   public String JavaDoc toString() {
82     return "BridgeQueueImpl:" + destId.toString();
83   }
84
85   /**
86    * Reacts to <code>BridgeDeliveryNot</code> notifications holding a message
87    * received from the foreign JMS server.
88    *
89    * @param from AgentId
90    * @param not BridgeDeliveryNot
91    */

92   public void bridgeDelivery(AgentId from, BridgeDeliveryNot not) {
93     ClientMessages clientMessages = new ClientMessages();
94     clientMessages.addMessage(not.getMessage());
95     // it come from bridge, so set destId for from
96
//(do not preProcess this ClientMessage).
97
super.doClientMessages(destId, clientMessages);
98   }
99
100   /**
101    * Reacts to <code>BridgeAckNot</code> notifications holding the identifier
102    * of a message successfuly delivered to the foreign JMS server.
103    *
104    * @param not BridgeAckNot
105    */

106   public void bridgeAck(BridgeAckNot not) {
107     outTable.remove(not.getIdentifier());
108   }
109   
110   /**
111    * Method specializing the reaction to a <code>ReceiveRequest</code>
112    * instance, requesting a message.
113    * <p>
114    * This method stores the request and requests a message through the JMS
115    * interface.
116    *
117    * @exception AccessException If the sender is not a reader.
118    */

119   public void receiveRequest(AgentId from, ReceiveRequest not)
120                  throws AccessException {
121     // If client is not a reader, sending an exception.
122
if (! isReader(from))
123       throw new AccessException("READ right not granted");
124
125     // Storing the request:
126
not.requester = from;
127     not.setExpiration(System.currentTimeMillis());
128     requests.add(not);
129
130     // Launching a delivery sequence for this request:
131
int reqIndex = requests.size() - 1;
132     deliverMessages(reqIndex);
133
134     // If the request has not been answered:
135
if ((requests.size() - 1) == reqIndex) {
136       // If it is an immediate delivery request, requesting the foreign JMS
137
// destination for an immediate delivery.
138
if (not.getTimeOut() == -1) {
139         requests.remove(reqIndex);
140
141         org.objectweb.joram.shared.messages.Message message = null;
142
143         try {
144           message = jmsModule.receiveNoWait();
145         } catch (Exception JavaDoc exc) {
146           // JMS module not properly initialized.
147
if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
148             JoramTracing.dbgDestination.log(BasicLevel.ERROR,
149                                             "Failing receive request on remote destination: ", exc);
150         }
151
152         // If message not null but not selected, setting it to null.
153
if ((message != null) &&
154             ! Selector.matches(message, not.getSelector()))
155           message = null;
156
157         QueueMsgReply reply = new QueueMsgReply(not);
158         reply.addMessage(message);
159         forward(from, reply);
160
161         if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
162           JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
163                                         "Receive answered.");
164       } else {
165         // Else, requesting the foreign JMS destination for a delivery:
166
try {
167           jmsModule.receive();
168         } catch (Exception JavaDoc exc) {
169           // JMS module not properly initialized.
170
if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
171             JoramTracing.dbgDestination.log(BasicLevel.ERROR,
172                                             "Failing receive request on remote destination: ", exc);
173         }
174       }
175     }
176   }
177
178   /**
179    * Method specializing the processing of a <code>ClientMessages</code>
180    * instance.
181    * <p>
182    * This method sends the messages to the foreign JMS destination.
183    */

184   public ClientMessages preProcess(AgentId from, ClientMessages not) {
185     if (destId.equals(from))
186       return not;
187     
188     // Sending each message:
189
Message message;
190     for (Enumeration JavaDoc msgs = not.getMessages().elements();
191          msgs.hasMoreElements();) {
192       message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement());
193       message.order = arrivalsCounter++;
194
195       outTable.put(message.getIdentifier(), message);
196
197       try {
198         jmsModule.send(message.msg);
199       } catch (Exception JavaDoc exc) {
200         if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
201           JoramTracing.dbgDestination.log(BasicLevel.ERROR,
202                                           "Failing sending to remote destination: ", exc);
203
204         outTable.remove(message.getIdentifier());
205         ClientMessages deadM;
206         deadM = new ClientMessages(not.getClientContext(), not.getRequestId());
207         deadM.addMessage(message.msg);
208         sendToDMQ(deadM, not.getDMQId());
209       }
210     }
211     return null;
212   }
213
214   /**
215    * Method specifically processing a
216    * <code>fr.dyade.aaa.agent.DeleteNot</code> instance.
217    * <p>
218    * This method closes the JMS resources used for connecting to the foreign
219    * JMS server.
220    */

221   protected void doDeleteNot(DeleteNot not) {
222     jmsModule.close();
223     super.doDeleteNot(not);
224   }
225
226
227   /** Deserializes a <code>BridgeQueueImpl</code> instance. */
228   private void readObject(java.io.ObjectInputStream JavaDoc in)
229                throws IOException JavaDoc, ClassNotFoundException JavaDoc {
230     in.defaultReadObject();
231
232     messages = new Vector JavaDoc();
233     deliveredMsgs = new Hashtable JavaDoc();
234
235     // Retrieving the persisted messages, if any.
236
// AF: TODO
237
Vector JavaDoc persistedMsgs = null;
238 // persistedMsgs = MessagePersistenceModule.loadAll(getDestinationId());
239

240     // AF: This code is already in QueueImpl, it seems the message are
241
// loaded 2 times !!
242
if (persistedMsgs != null) {
243       Message persistedMsg;
244       AgentId consId;
245       while (! persistedMsgs.isEmpty()) {
246         persistedMsg = (Message) persistedMsgs.remove(0);
247         consId = (AgentId) consumers.get(persistedMsg.getIdentifier());
248         if (consId == null) {
249           addMessage(persistedMsg);
250         } else if (isLocal(consId)) {
251           if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
252             JoramTracing.dbgDestination.log(
253               BasicLevel.DEBUG, " -> deny " + persistedMsg.getIdentifier());
254           consumers.remove(persistedMsg.getIdentifier());
255           contexts.remove(persistedMsg.getIdentifier());
256           addMessage(persistedMsg);
257         } else {
258           deliveredMsgs.put(persistedMsg.getIdentifier(), persistedMsg);
259         }
260       }
261     }
262
263     // Re-launching the JMS module.
264
try {
265       jmsModule.connect();
266
267       // Re-emitting the receive requests:
268
for (int i = 0; i < requests.size(); i++)
269         jmsModule.receive();
270
271       // Re-emetting the pending messages:
272
Message momMsg;
273       Vector JavaDoc outMessages = new Vector JavaDoc();
274       Message currentMsg;
275       for (Enumeration JavaDoc keys = outTable.keys(); keys.hasMoreElements();) {
276         momMsg = (Message) outTable.get(keys.nextElement());
277   
278         int i = 0;
279         while (i < outMessages.size()) {
280           currentMsg = (Message) outMessages.get(i);
281   
282           if (momMsg.order < currentMsg.order)
283             break;
284   
285           i++;
286         }
287         outMessages.insertElementAt(momMsg, i);
288       }
289
290       while (! outMessages.isEmpty()) {
291         momMsg = (Message) outMessages.remove(0);
292         jmsModule.send(momMsg.msg);
293       }
294     } catch (Exception JavaDoc exc) {
295       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
296         JoramTracing.dbgDestination.log(BasicLevel.ERROR, "", exc);
297     }
298   }
299 }
300
Popular Tags