KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > wsif > base > WSIFDefaultCorrelationService


1 /*
2  * The Apache Software License, Version 1.1
3  *
4  *
5  * Copyright (c) 2002 The Apache Software Foundation. All rights
6  * reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Apache Software Foundation (http://www.apache.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "WSIF" and "Apache Software Foundation" must
28  * not be used to endorse or promote products derived from this
29  * software without prior written permission. For written
30  * permission, please contact apache@apache.org.
31  *
32  * 5. Products derived from this software may not be called "Apache",
33  * nor may "Apache" appear in their name, without prior written
34  * permission of the Apache Software Foundation.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
40  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
41  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
42  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
43  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
44  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
45  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
46  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
47  * SUCH DAMAGE.
48  * ====================================================================
49  *
50  * This software consists of voluntary contributions made by many
51  * individuals on behalf of the Apache Software Foundation and was
52  * originally based on software copyright (c) 2001, 2002, International
53  * Business Machines, Inc., http://www.apache.org. For more
54  * information on the Apache Software Foundation, please see
55  * <http://www.apache.org/>.
56  */

57
58 package org.apache.wsif.base;
59
60 import java.io.ByteArrayInputStream JavaDoc;
61 import java.io.ByteArrayOutputStream JavaDoc;
62 import java.io.IOException JavaDoc;
63 import java.io.InputStream JavaDoc;
64 import java.io.ObjectOutputStream JavaDoc;
65 import java.io.ObjectStreamClass JavaDoc;
66 import java.io.Serializable JavaDoc;
67 import java.io.StreamCorruptedException JavaDoc;
68 import java.util.ArrayList JavaDoc;
69 import java.util.ConcurrentModificationException JavaDoc;
70 import java.util.HashMap JavaDoc;
71 import java.util.Iterator JavaDoc;
72
73 import org.apache.wsif.WSIFConstants;
74 import org.apache.wsif.WSIFCorrelationId;
75 import org.apache.wsif.WSIFCorrelationService;
76 import org.apache.wsif.WSIFException;
77 import org.apache.wsif.logging.MessageLogger;
78 import org.apache.wsif.logging.Trc;
79
80 /**
81  * WSIFDefaultCorrelationService provides a default implementation of a
82  * WSIFCorrelationService using a Hashmap as the backing store.
83  * @author Ant Elder <antelder@apache.org>
84  */

85 public class WSIFDefaultCorrelationService implements WSIFCorrelationService {
86
87     private HashMap JavaDoc correlatorStore; // associates IDs with WSIFOperators
88
private HashMap JavaDoc timeouts; // associates IDs with a timeout time
89
private Thread JavaDoc timeoutWatcher; // watches for timeout times expiring
90
private boolean shutdown; // has the correlation service been shutdown
91

92     /**
93      * WSIFCorrelationServiceLocator should be used to
94      * create a correlation service.
95      */

96     public WSIFDefaultCorrelationService() {
97         Trc.entry(this);
98         Trc.exit();
99     }
100
101     /**
102      * Adds an entry to the correlation service.
103      * @param correlator the key to associate with the state. This will be
104      * a JMS message correlation ID.
105      * @param state the state to be stored. This will be a WSIFOperation.
106      * @param timeout a timeout period after which the key and associated
107      * state will be deleted from the correlation service. A
108      * value of zero indicates there should be no timeout.
109      */

110     public synchronized void put(
111         WSIFCorrelationId correlator,
112         Serializable JavaDoc state,
113         long timeout)
114         throws WSIFException {
115         Trc.entry(this, correlator, state, new Long JavaDoc(timeout));
116         if (correlator != null && state != null) {
117             if (correlatorStore == null) {
118                 initialise();
119             }
120             try {
121                 correlatorStore.put(correlator, serialize(state));
122                 if (timeout > 0) {
123                     if (timeouts == null) {
124                         initTimeouts();
125                     }
126                     timeouts.put(
127                         correlator,
128                         new Long JavaDoc(System.currentTimeMillis() + timeout));
129                 }
130             } catch (IOException JavaDoc ex) {
131                 Trc.exception(ex);
132                 throw new WSIFException(ex.toString());
133             }
134         } else {
135             throw new IllegalArgumentException JavaDoc(
136                 "cannot put null "
137                     + ((correlator == null) ? "correlator" : "state"));
138         }
139         Trc.exit();
140     }
141
142     /**
143      * Retrieves an entry (a WSIFOperation) from the correlation service.
144      * @param id the key of the state to retrieved
145      * @return the state associated with the id, or null if there is no
146      * match for the id.
147      */

148     public synchronized Serializable JavaDoc get(WSIFCorrelationId id)
149         throws WSIFException {
150         Trc.entry(this, id);
151         if (correlatorStore == null) {
152             throw new WSIFException("get called on correlation service but put never done");
153         } else if (id == null) {
154             throw new IllegalArgumentException JavaDoc("cannot get null");
155         } else {
156             try {
157                 Serializable JavaDoc s =
158                     (Serializable JavaDoc) unserialize((byte[]) correlatorStore
159                         .get(id));
160                 Trc.exit(s);
161                 return s;
162             } catch (Exception JavaDoc ex) {
163                 Trc.exception(ex);
164                 throw new WSIFException(ex.toString());
165             }
166         }
167     }
168
169     /**
170      * Removes an entry form the correlation service.
171      * @param id the key of entry to be removed
172      */

173     public synchronized void remove(WSIFCorrelationId id)
174         throws WSIFException {
175         Trc.entry(this, id);
176         if (correlatorStore == null) {
177             throw new WSIFException("corelation service has been shutdown");
178         } else if (id == null) {
179             throw new IllegalArgumentException JavaDoc("cannot remove null");
180         } else {
181             correlatorStore.remove(id);
182             if (timeouts != null) {
183                 timeouts.remove(id);
184             }
185             Trc.exit();
186         }
187     }
188
189     private synchronized void remove(ArrayList JavaDoc expiredKeys) {
190         Trc.entry(this, expiredKeys);
191         if (expiredKeys != null && correlatorStore != null) {
192             Serializable JavaDoc id;
193             for (Iterator JavaDoc i = expiredKeys.iterator(); i.hasNext();) {
194                 id = (Serializable JavaDoc) i.next();
195                 correlatorStore.remove(id);
196                 timeouts.remove(id);
197                 MessageLogger.log("WSIF.0008W", id);
198             }
199         }
200         Trc.exit();
201     }
202
203     /**
204      * Shutsdown the correlation service.
205      */

206     public void shutdown() {
207         Trc.entry(this);
208         shutdown = true;
209         Trc.exit();
210     }
211
212     private void initialise() {
213         shutdown = false;
214         correlatorStore = new HashMap JavaDoc();
215     }
216
217     private void initTimeouts() {
218         timeouts = new HashMap JavaDoc();
219         timeoutWatcher = new Thread JavaDoc() {
220             public void run() {
221                 while (!shutdown) {
222                     try {
223                         sleep(WSIFConstants.CORRELATION_TIMEOUT_DELAY);
224                     } catch (InterruptedException JavaDoc ex) {
225                         Trc.ignoredException(ex);
226                     }
227                     checkForTimeouts();
228                 }
229                 if (correlatorStore != null)
230                     correlatorStore = null;
231                 if (timeouts != null)
232                     timeouts = null;
233             }
234         };
235         timeoutWatcher.setName("WSIFDefaultCorrelationService timeout watcher");
236         timeoutWatcher.start();
237     }
238
239     private void checkForTimeouts() {
240         Long JavaDoc expireTime;
241         Serializable JavaDoc key;
242         ArrayList JavaDoc expiredKeys = new ArrayList JavaDoc();
243         Long JavaDoc now = new Long JavaDoc(System.currentTimeMillis());
244         // add to expiredKeys all the keys whose timouts have expired
245
try {
246             for (Iterator JavaDoc i = timeouts.keySet().iterator(); i.hasNext();) {
247                 key = (Serializable JavaDoc) i.next();
248                 expireTime = (Long JavaDoc) timeouts.get(key);
249                 if (now.compareTo(expireTime) > 0) {
250                     // now greater than expireTime
251
expiredKeys.add(key);
252                 }
253             }
254         } catch (ConcurrentModificationException JavaDoc ex) {
255             Trc.ignoredException(ex);
256         } // ignore this, get the others next time
257

258         if (expiredKeys.size() > 0) {
259             remove(expiredKeys);
260         }
261
262     }
263
264     private byte[] serialize(Object JavaDoc o) throws IOException JavaDoc {
265         if (o == null) {
266             return null;
267         } else {
268             ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
269             ObjectOutputStream JavaDoc so = new ObjectOutputStream JavaDoc(baos);
270             so.writeObject(o);
271             so.flush();
272             return baos.toByteArray();
273         }
274     }
275
276     private Object JavaDoc unserialize(byte[] bytes)
277         throws IOException JavaDoc, ClassNotFoundException JavaDoc {
278         if (bytes == null) {
279             return null;
280         } else {
281             ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(bytes);
282             //ObjectInputStream si = new ObjectInputStream(bais);
283
WSIFObjectInputStream si = new WSIFObjectInputStream(bais);
284             return si.readObject();
285         }
286     }
287
288 }
289
Popular Tags