1 57 58 package org.apache.wsif.base; 59 60 import java.io.ByteArrayInputStream ; 61 import java.io.ByteArrayOutputStream ; 62 import java.io.IOException ; 63 import java.io.InputStream ; 64 import java.io.ObjectOutputStream ; 65 import java.io.ObjectStreamClass ; 66 import java.io.Serializable ; 67 import java.io.StreamCorruptedException ; 68 import java.util.ArrayList ; 69 import java.util.ConcurrentModificationException ; 70 import java.util.HashMap ; 71 import java.util.Iterator ; 72 73 import org.apache.wsif.WSIFConstants; 74 import org.apache.wsif.WSIFCorrelationId; 75 import org.apache.wsif.WSIFCorrelationService; 76 import org.apache.wsif.WSIFException; 77 import org.apache.wsif.logging.MessageLogger; 78 import org.apache.wsif.logging.Trc; 79 80 85 public class WSIFDefaultCorrelationService implements WSIFCorrelationService { 86 87 private HashMap correlatorStore; private HashMap timeouts; private Thread timeoutWatcher; private boolean shutdown; 92 96 public WSIFDefaultCorrelationService() { 97 Trc.entry(this); 98 Trc.exit(); 99 } 100 101 110 public synchronized void put( 111 WSIFCorrelationId correlator, 112 Serializable state, 113 long timeout) 114 throws WSIFException { 115 Trc.entry(this, correlator, state, new Long (timeout)); 116 if (correlator != null && state != null) { 117 if (correlatorStore == null) { 118 initialise(); 119 } 120 try { 121 correlatorStore.put(correlator, serialize(state)); 122 if (timeout > 0) { 123 if (timeouts == null) { 124 initTimeouts(); 125 } 126 timeouts.put( 127 correlator, 128 new Long (System.currentTimeMillis() + timeout)); 129 } 130 } catch (IOException ex) { 131 Trc.exception(ex); 132 throw new WSIFException(ex.toString()); 133 } 134 } else { 135 throw new IllegalArgumentException ( 136 "cannot put null " 137 + ((correlator == null) ? "correlator" : "state")); 138 } 139 Trc.exit(); 140 } 141 142 148 public synchronized Serializable get(WSIFCorrelationId id) 149 throws WSIFException { 150 Trc.entry(this, id); 151 if (correlatorStore == null) { 152 throw new WSIFException("get called on correlation service but put never done"); 153 } else if (id == null) { 154 throw new IllegalArgumentException ("cannot get null"); 155 } else { 156 try { 157 Serializable s = 158 (Serializable ) unserialize((byte[]) correlatorStore 159 .get(id)); 160 Trc.exit(s); 161 return s; 162 } catch (Exception ex) { 163 Trc.exception(ex); 164 throw new WSIFException(ex.toString()); 165 } 166 } 167 } 168 169 173 public synchronized void remove(WSIFCorrelationId id) 174 throws WSIFException { 175 Trc.entry(this, id); 176 if (correlatorStore == null) { 177 throw new WSIFException("corelation service has been shutdown"); 178 } else if (id == null) { 179 throw new IllegalArgumentException ("cannot remove null"); 180 } else { 181 correlatorStore.remove(id); 182 if (timeouts != null) { 183 timeouts.remove(id); 184 } 185 Trc.exit(); 186 } 187 } 188 189 private synchronized void remove(ArrayList expiredKeys) { 190 Trc.entry(this, expiredKeys); 191 if (expiredKeys != null && correlatorStore != null) { 192 Serializable id; 193 for (Iterator i = expiredKeys.iterator(); i.hasNext();) { 194 id = (Serializable ) i.next(); 195 correlatorStore.remove(id); 196 timeouts.remove(id); 197 MessageLogger.log("WSIF.0008W", id); 198 } 199 } 200 Trc.exit(); 201 } 202 203 206 public void shutdown() { 207 Trc.entry(this); 208 shutdown = true; 209 Trc.exit(); 210 } 211 212 private void initialise() { 213 shutdown = false; 214 correlatorStore = new HashMap (); 215 } 216 217 private void initTimeouts() { 218 timeouts = new HashMap (); 219 timeoutWatcher = new Thread () { 220 public void run() { 221 while (!shutdown) { 222 try { 223 sleep(WSIFConstants.CORRELATION_TIMEOUT_DELAY); 224 } catch (InterruptedException ex) { 225 Trc.ignoredException(ex); 226 } 227 checkForTimeouts(); 228 } 229 if (correlatorStore != null) 230 correlatorStore = null; 231 if (timeouts != null) 232 timeouts = null; 233 } 234 }; 235 timeoutWatcher.setName("WSIFDefaultCorrelationService timeout watcher"); 236 timeoutWatcher.start(); 237 } 238 239 private void checkForTimeouts() { 240 Long expireTime; 241 Serializable key; 242 ArrayList expiredKeys = new ArrayList (); 243 Long now = new Long (System.currentTimeMillis()); 244 try { 246 for (Iterator i = timeouts.keySet().iterator(); i.hasNext();) { 247 key = (Serializable ) i.next(); 248 expireTime = (Long ) timeouts.get(key); 249 if (now.compareTo(expireTime) > 0) { 250 expiredKeys.add(key); 252 } 253 } 254 } catch (ConcurrentModificationException ex) { 255 Trc.ignoredException(ex); 256 } 258 if (expiredKeys.size() > 0) { 259 remove(expiredKeys); 260 } 261 262 } 263 264 private byte[] serialize(Object o) throws IOException { 265 if (o == null) { 266 return null; 267 } else { 268 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 269 ObjectOutputStream so = new ObjectOutputStream (baos); 270 so.writeObject(o); 271 so.flush(); 272 return baos.toByteArray(); 273 } 274 } 275 276 private Object unserialize(byte[] bytes) 277 throws IOException , ClassNotFoundException { 278 if (bytes == null) { 279 return null; 280 } else { 281 ByteArrayInputStream bais = new ByteArrayInputStream (bytes); 282 WSIFObjectInputStream si = new WSIFObjectInputStream(bais); 284 return si.readObject(); 285 } 286 } 287 288 } 289 | Popular Tags |