KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > DeadMQueueImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2007 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.mom.dest;
25
26 import java.util.Enumeration JavaDoc;
27 import java.util.Vector JavaDoc;
28 import java.util.Properties JavaDoc;
29
30 import fr.dyade.aaa.agent.AgentId;
31 import fr.dyade.aaa.agent.Notification;
32 import fr.dyade.aaa.agent.UnknownAgent;
33
34 import org.objectweb.joram.mom.notifications.*;
35 import org.objectweb.joram.shared.excepts.*;
36 import org.objectweb.joram.mom.messages.Message;
37 import org.objectweb.joram.shared.selectors.Selector;
38
39 import org.objectweb.joram.shared.JoramTracing;
40 import org.objectweb.util.monolog.api.BasicLevel;
41
42 /**
43  * The <code>DeadMQueueImpl</code> class implements the MOM dead message queue
44  * behaviour, basically storing dead messages and delivering them upon clients
45  * requests.
46  */

47 public class DeadMQueueImpl extends QueueImpl {
48   /** Static value holding the default DMQ identifier for a server. */
49   static AgentId id = null;
50   /** Static value holding the default threshold for a server. */
51   static Integer JavaDoc threshold = null;
52
53   /**
54    * Constructs a <code>DeadMQueueImpl</code> instance.
55    *
56    * @param destId Identifier of the agent hosting the queue.
57    * @param adminId Identifier of the administrator of the queue.
58    * @param prop The initial set of properties.
59    */

60   public DeadMQueueImpl(AgentId destId, AgentId adminId, Properties JavaDoc prop) {
61     super(destId, adminId, prop);
62   }
63
64
65   public String JavaDoc toString() {
66     return "DeadMQueueImpl:" + destId.toString();
67   }
68
69
70   /** Static method returning the default DMQ identifier. */
71   public static AgentId getId() {
72     return id;
73   }
74   
75   /** Static method returning the default threshold. */
76   public static Integer JavaDoc getDefaultThreshold() {
77     return threshold;
78   }
79
80   /**
81    * Overrides this <code>DestinationImpl</code> method; this request is
82    * not expected by a dead message queue.
83    *
84    * @exception AccessException Not thrown.
85    */

86   public void setDMQRequest(AgentId from, SetDMQRequest req)
87                  throws AccessException {
88     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN))
89       JoramTracing.dbgDestination.log(BasicLevel.WARN,
90                                     "Unexpected request: " + req);
91   }
92   
93   /**
94    * Overrides this <code>DestinationImpl</code> method; the messages carried
95    * by the <code>ClientMessages</code> instance are stored in their arrival
96    * order, WRITE right is not checked.
97    */

98   public ClientMessages preProcess(AgentId from, ClientMessages not) {
99     // Getting and persisting the messages:
100
Message msg;
101     // Storing each received message:
102
for (Enumeration JavaDoc msgs = not.getMessages().elements();
103          msgs.hasMoreElements();) {
104       msg = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement());
105       msg.setExpiration(0L);
106       msg.order = arrivalsCounter++;
107       messages.add(msg);
108       // Persisting the message.
109
setMsgTxName(msg);
110       msg.save();
111     }
112     return null;
113   }
114   
115   /**
116    * Overrides this <code>QueueImpl</code> method; this request is
117    * not expected by a dead message queue.
118    *
119    * @exception AccessException Not thrown.
120    */

121   public void setThreshRequest(AgentId from, SetThreshRequest req)
122                  throws AccessException {
123     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN))
124       JoramTracing.dbgDestination.log(BasicLevel.WARN,
125                                     "Unexpected request: " + req);
126   }
127
128   /**
129    * Overrides this <code>QueueImpl</code> method; messages matching the
130    * request's selector are actually sent as a reply; no cleaning nor DMQ
131    * sending is done.
132    *
133    * @exception AccessException If the requester is not a reader.
134    */

135   public void browseRequest(AgentId from, BrowseRequest not)
136                  throws AccessException {
137     // If client is not a reader, throwing an exception.
138
if (! isReader(from))
139       throw new AccessException("READ right not granted");
140
141     // Building the reply:
142
BrowseReply rep = new BrowseReply(not);
143
144     // Adding the messages to it:
145
Message message;
146     for (int i = 0; i < messages.size(); i++) {
147       message = (Message) messages.get(i);
148       // Message matching the selector: adding it.
149
if (Selector.matches(message.msg, not.getSelector()))
150         rep.addMessage(message.msg);
151     }
152     // Delivering the reply:
153
forward(from, rep);
154
155     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
156       JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Request answered.");
157   }
158
159   /**
160    * Overrides this <code>QueueImpl</code> method;
161    * <code>AcknowledgeRequest</code> requests are actually not processed
162    * in dead message queues.
163    */

164   public void acknowledgeRequest(AgentId from, AcknowledgeRequest not) {}
165  
166   /**
167    * Overrides this <code>QueueImpl</code> method;
168    * <code>DenyRequest</code> requests are actually not processed
169    * in dead message queues.
170    */

171   public void denyRequest(AgentId from, DenyRequest not) {}
172
173   /**
174    * Overrides this <code>QueueImpl</code> method; if the sent notification
175    * was a <code>QueueMsgReply</code> instance, putting the sent message back
176    * in queue.
177    */

178   protected void doUnknownAgent(UnknownAgent uA) {
179     AgentId client = uA.agent;
180     Notification not = uA.not;
181
182     // If the notification is not a delivery, doing nothing.
183
if (! (not instanceof QueueMsgReply))
184       return;
185
186     // Putting the message back in queue:
187
Vector JavaDoc msgList = ((QueueMsgReply) not).getMessages();
188     for (int i = 0; i < msgList.size(); i++) {
189       Message msg = (Message)msgList.elementAt(i);
190       msg.order = arrivalsCounter++;
191       messages.add(msg);
192       // Persisting the message.
193
setMsgTxName(msg);
194       msg.save();
195     }
196     // Launching a delivery sequence:
197
deliverMessages(0);
198   }
199
200   /**
201    * Overrides this <code>QueueImpl</code> method; delivered messages are not
202    * kept for acknowledgement or denying; validity of messages is
203    * not checked and message fields are not updated; also, no sending to
204    * any DMQ.
205    */

206   protected void deliverMessages(int index) {
207     ReceiveRequest notRec = null;
208     boolean replied;
209     int j = 0;
210     Message message;
211     QueueMsgReply notMsg;
212
213     // Processing each request as long as there are deliverable messages:
214
while (! messages.isEmpty() && index < requests.size()) {
215       notRec = (ReceiveRequest) requests.get(index);
216       replied = false;
217       notMsg = new QueueMsgReply(notRec);
218
219       // Checking the deliverable messages:
220
while (j < messages.size()) {
221         message = (Message) messages.get(j);
222         
223         // If the selector matches, sending it:
224
if (Selector.matches(message.msg, notRec.getSelector())) {
225           notMsg.addMessage(message.msg);
226           
227           if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
228             JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Message "
229                                           + message.getIdentifier()
230                                           + " sent to "
231                                           + notRec.requester
232                                           + " as a reply to "
233                                           + notRec.getRequestId());
234
235           // Removing the message:
236
messages.remove(j);
237           message.delete();
238           // Removing the request.
239
replied = true;
240           requests.remove(index);
241           break;
242         }
243         // If selector does not match: going on
244
else
245           j++;
246       }
247
248       if (notMsg.getSize() > 0) {
249         forward(notRec.requester, notMsg);
250       }
251
252       // Next request:
253
j = 0;
254       if (! replied)
255         index++;
256     }
257   }
258
259   /**
260    * Overwrites this <code>DestinationImpl</code> method so that no messages
261    * may be sent by the DMQ to itself.
262    */

263   protected void sendToDMQ(Vector JavaDoc deadMessages, AgentId dmqId) {}
264 }
265
Popular Tags