KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > router > PersistentAckHandler


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  *
19  * Copyright 2001 Dan Greff
20  */

21 package com.presumo.jms.router;
22
23
24 import com.presumo.jms.message.JmsMessage;
25 import com.presumo.jms.message.MessageStateListener;
26 import com.presumo.jms.resources.Resources;
27 import com.presumo.util.log.Logger;
28 import com.presumo.util.log.LoggerFactory;
29
30 import java.util.Hashtable JavaDoc;
31 import java.util.StringTokenizer JavaDoc;
32
33 import javax.jms.DeliveryMode JavaDoc;
34 import javax.jms.JMSException JavaDoc;
35
36 /**
37  *
38  * @author Dan Greff
39  */

40 class PersistentAckHandler
41         implements MessageStateListener
42 {
43
44   /** List of messages received from the remote destionation **/
45   private final Hashtable JavaDoc checkDupsMap = new Hashtable JavaDoc();
46
47   /** List of messages sent to the remote destination **/
48   private final Hashtable JavaDoc sentToMap = new Hashtable JavaDoc();
49
50
51   private StringBuffer JavaDoc msgSafeAcks = new StringBuffer JavaDoc();
52   private StringBuffer JavaDoc msgRoutedAcks = new StringBuffer JavaDoc();
53   private StringBuffer JavaDoc msgDeletedAcks = new StringBuffer JavaDoc();
54
55   private RemoteSession session;
56
57     /////////////////////////////////////////////////////////////////////////
58
// Constructors //
59
/////////////////////////////////////////////////////////////////////////
60

61   PersistentAckHandler(RemoteSession session)
62   {
63     this.session = session;
64   }
65
66
67     /////////////////////////////////////////////////////////////////////////
68
// Package methods //
69
/////////////////////////////////////////////////////////////////////////
70

71
72   /**
73    * Called when the Router hands a message to the RemoteSession
74    * to send to another router. Nothing is done if the message is
75    * NON-PERSISTENT. However, if the message is PERSISTENT then
76    * this RemoteSession represented by this instance must track
77    * the message state through Presumo's acknowledgement
78    * methodology.
79    */

80   final void handleOutgoingMsg(JmsMessage msg)
81   {
82     if (logger.isDebugEnabled())
83       logger.entry("handleOutgoingMsg: " + msg);
84
85     if (msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT)
86     {
87       sentToMap.put(msg.getJMSMessageID(), msg);
88       msg.getAckHelper().addDeletionListener(this);
89     }
90   }
91
92
93   /**
94    * Called when the RemoteSession receives messages from the
95    * remote orginator and has ALREADY stored the messages safely in
96    * the router.
97    */

98   final synchronized void handleIncomingMsgs(JmsMessage [] msgs)
99   {
100     for (int i=0; i < msgs.length; ++i) {
101       JmsMessage msg = msgs[i];
102       if (msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT)
103       {
104         checkDupsMap.put(msg.getJMSMessageID(), msg);
105         addMsgSafeAck(msg.getJMSMessageID());
106       }
107     }
108   }
109
110
111   /**
112    * Constructs and returns Ack string to be sent to the remote
113    * destination.
114    *
115    * @return Ack string to send or null if there are currently no
116    * acks to send
117    */

118   final synchronized String JavaDoc getAckString()
119   {
120     logger.entry("getAckString");
121     
122     String JavaDoc retval = null;
123     if (msgSafeAcks.length() > 0 ||
124         msgRoutedAcks.length() > 0 ||
125         msgDeletedAcks.length() > 0 )
126     {
127       msgSafeAcks.append('#');
128       msgSafeAcks.append(msgRoutedAcks.toString());
129       msgSafeAcks.append('#');
130       msgSafeAcks.append(msgDeletedAcks);
131       
132       retval = msgSafeAcks.toString();
133       
134       msgSafeAcks.setLength(0);
135       msgRoutedAcks.setLength(0);
136       msgDeletedAcks.setLength(0);
137     }
138     
139     logger.exit("getAckString: "+ retval);
140     return retval;
141   }
142
143   /**
144    * Determines if there are acks available to be sent.
145    *
146    * @return true if there is an ack available, false otherwise.
147    */

148   final boolean acksAvailable()
149   {
150     return (msgSafeAcks.length() > 0 ||
151             msgRoutedAcks.length() > 0 ||
152             msgDeletedAcks.length() > 0 );
153   }
154
155
156   /**
157    * Checks to see if the incoming message from the remote originator
158    * has already been sent. If it is a duplicate than there is a chance
159    * the remote originator did not get the MSG_SAFE ack so it will
160    * be resent.
161    * <p>
162    * Two MSG_SAFE acks being received should not affect anything at
163    * the remote destination.
164    */

165   final synchronized boolean isDuplicate(JmsMessage msg)
166   {
167     boolean dup = (checkDupsMap.get(msg.getJMSMessageID()) != null);
168     
169     if (dup) {
170       addMsgSafeAck(msg.getJMSMessageID());
171     }
172
173     return dup;
174   }
175
176   
177   /**
178    * Sets this implementation of MessageStateListener as the
179    * orginator. Called when a message is received and is about to
180    * be routed to other destinations.
181    */

182   final void setOriginator(JmsMessage msg)
183   {
184     if (msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT)
185       msg.getAckHelper().setOriginator(this);
186   }
187
188
189   /**
190    * Parses and handles string of acks sent by the remote destination
191    */

192   final synchronized void handleAcks(String JavaDoc acks)
193   {
194     logger.entry("handleAcks: " +acks);
195     
196     if (acks == null)
197       return;
198     
199     StringBuffer JavaDoc currentValue = new StringBuffer JavaDoc();
200     int currentState = 0;
201     
202     char [] data = acks.toCharArray();
203     
204     for (int i=0; i < data.length; ++i) {
205       char c = data[i];
206       
207       switch (c)
208       {
209         case('#'):
210           ++currentState;
211           break;
212         case(':'):
213           if (currentValue.length() != 0) {
214             JmsMessage msg = null;
215             logger.debug("--------> parsed: " + currentValue);
216             switch (currentState) {
217               case(0):
218                 msg = (JmsMessage) sentToMap.get(currentValue.toString());
219                 msg.getAckHelper().safeAck(this);
220                 break;
221               case(1):
222                 msg = (JmsMessage) sentToMap.get(currentValue.toString());
223                 msg.getAckHelper().routedAck(this);
224                 break;
225               default:
226                 msg = (JmsMessage) checkDupsMap.remove(currentValue.toString());
227                 msg.getAckHelper().deleteAck(this);
228                 break;
229             }
230             currentValue.setLength(0);
231           }
232           break;
233         default:
234           currentValue.append(c);
235       }
236       
237     }
238
239     logger.exit("handleAcks");
240   }
241
242
243   /**
244    * Callback from the actual Message when it has received all
245    * necessary aks (MSG_ROUTED Acks for where the message was sent and
246    * MSG_DELETED ack from the orginator of the message). The message
247    * has deleted itself from persistent storage and is telling this
248    * handler to queue up a MSG_DELETE_ACK to send on to the neighbor.
249    *
250    * @see MessageStateListener#messageDeleted
251    */

252   public void messageDeleted(JmsMessage msg)
253   {
254     logger.entry("messageDeleted: " + msg);
255
256     sentToMap.remove(msg.getJMSMessageID());
257
258     // Queue up a MSG_DELETED ACK
259
msgDeletedAcks.append(msg.getJMSMessageID());
260     msgDeletedAcks.append(':');
261
262     // Notify remote session that there are acks available to be sent.
263
session.acksAvailable();
264
265     logger.exit("messageDeleted");
266   }
267   
268
269   /**
270    * Callback from the actual Message when it has received all necessary
271    * acks indicating that every neighbor now safely has the message.
272    * A MSG_ROUTED ack will be sent back to the originator.
273    *
274    * @see MessageStateListener#messageRouted
275    */

276   public void messageRouted(JmsMessage msg)
277   {
278     logger.entry("messageRouted: "+msg);
279
280     // QUEUE up a MSG_ROUTED ack
281
msgRoutedAcks.append(msg.getJMSMessageID());
282     msgRoutedAcks.append(':');
283
284     // Notify remote session that there are acks available to be sent.
285
session.acksAvailable();
286
287     logger.exit("messageRouted");
288   }
289
290
291
292     /////////////////////////////////////////////////////////////////////////
293
// Private methods //
294
/////////////////////////////////////////////////////////////////////////
295

296   /*
297    * Queue up a MSG_SAFE ack
298    */

299   private void addMsgSafeAck(String JavaDoc msgID)
300   {
301     logger.entry("addMsgSafeAck: " +msgID);
302     
303     msgSafeAcks.append(msgID);
304     msgSafeAcks.append(':');
305
306     // Notify remote session that there are acks available to be sent.
307
session.acksAvailable();
308
309     logger.exit("addMsgSafeAck: " + msgSafeAcks);
310   }
311   
312   ////////////////////////////// Misc stuff ////////////////////////////////
313

314   private static Logger logger =
315    LoggerFactory.getLogger(PersistentAckHandler.class, Resources.getBundle());
316
317   ///////////////////////////////////////////////////////////////////////////
318
}
319
Popular Tags