1 7 package org.jboss.cache.loader; 8 9 import org.jboss.cache.Fqn; 10 import org.jboss.cache.Modification; 11 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig; 12 13 import java.io.BufferedInputStream ; 14 import java.io.BufferedOutputStream ; 15 import java.io.IOException ; 16 import java.io.ObjectInputStream ; 17 import java.io.ObjectOutputStream ; 18 import java.net.Socket ; 19 import java.util.List ; 20 import java.util.Map ; 21 import java.util.Set ; 22 23 37 public class TcpDelegatingCacheLoader extends DelegatingCacheLoader 38 { 39 private Socket sock; 40 private TcpDelegatingCacheLoaderConfig config; 41 ObjectInputStream in; 42 ObjectOutputStream out; 43 44 45 48 public TcpDelegatingCacheLoader() 49 { 50 } 52 53 59 public TcpDelegatingCacheLoader(String host, int port) 60 { 61 this.config = new TcpDelegatingCacheLoaderConfig(host, port); 62 } 63 64 69 public void setConfig(IndividualCacheLoaderConfig base) 70 { 71 if (base instanceof TcpDelegatingCacheLoaderConfig) 72 { 73 this.config = (TcpDelegatingCacheLoaderConfig) base; 74 } 75 else 76 { 77 config = new TcpDelegatingCacheLoaderConfig(base); 78 } 79 } 80 81 public IndividualCacheLoaderConfig getConfig() 82 { 83 return config; 84 } 85 86 public void start() throws Exception 87 { 88 init(); 89 } 90 91 public void stop() 92 { 93 try 94 { 95 if (in != null) in.close(); 96 } 97 catch (IOException e) 98 { 99 } 100 try 101 { 102 if (out != null) out.close(); 103 } 104 catch (IOException e) 105 { 106 } 107 try 108 { 109 if (sock != null) sock.close(); 110 } 111 catch (IOException e) 112 { 113 } 114 } 115 116 117 private void init() throws IOException 118 { 119 sock = new Socket (config.getHost(), config.getPort()); 120 out = new ObjectOutputStream (new BufferedOutputStream (sock.getOutputStream())); 121 out.flush(); 122 in = new ObjectInputStream (new BufferedInputStream (sock.getInputStream())); 123 } 124 125 128 protected Set delegateGetChildrenNames(Fqn fqn) throws Exception 129 { 130 synchronized (out) 131 { 132 out.reset(); 133 out.writeInt(DelegatingCacheLoader.delegateGetChildrenNames); 134 out.writeObject(fqn); 135 out.flush(); 136 Object retval = in.readObject(); 137 if (retval instanceof Exception ) 138 { 139 throw (Exception ) retval; 140 } 141 return (Set ) retval; 142 } 143 } 144 145 147 150 157 160 protected Map delegateGet(Fqn name) throws Exception 161 { 162 synchronized (out) 163 { 164 out.reset(); 165 166 out.writeInt(DelegatingCacheLoader.delegateGet); 167 out.writeObject(name); 168 out.flush(); 169 Object retval = in.readObject(); 170 if (retval instanceof Exception ) 171 { 172 throw (Exception ) retval; 173 } 174 return (Map ) retval; 175 } 176 } 177 178 181 protected boolean delegateExists(Fqn name) throws Exception 182 { 183 synchronized (out) 184 { 185 out.reset(); 186 187 out.writeInt(DelegatingCacheLoader.delegateExists); 188 out.writeObject(name); 189 out.flush(); 190 Object retval = in.readObject(); 191 if (retval instanceof Exception ) 192 { 193 throw (Exception ) retval; 194 } 195 return (Boolean ) retval; 196 } 197 } 198 199 202 protected Object delegatePut(Fqn name, Object key, Object value) throws Exception 203 { 204 synchronized (out) 205 { 206 out.reset(); 207 208 out.writeInt(DelegatingCacheLoader.delegatePutKeyVal); 209 out.writeObject(name); 210 out.writeObject(key); 211 out.writeObject(value); 212 out.flush(); 213 Object retval = in.readObject(); 214 if (retval instanceof Exception ) 215 { 216 throw (Exception ) retval; 217 } 218 return retval; 219 } 220 } 221 222 225 protected void delegatePut(Fqn name, Map attributes) throws Exception 226 { 227 synchronized (out) 228 { 229 out.reset(); 230 231 out.writeInt(DelegatingCacheLoader.delegatePut); 232 out.writeObject(name); 233 out.writeObject(attributes); 234 out.flush(); 235 Object retval = in.readObject(); 236 if (retval instanceof Exception ) 237 { 238 throw (Exception ) retval; 239 } 240 } 241 } 242 243 @Override 244 public void put(List <Modification> modifications) throws Exception 245 { 246 synchronized (out) 247 { 248 out.reset(); 249 250 out.writeInt(DelegatingCacheLoader.putList); 251 int length = modifications != null ? modifications.size() : 0; 252 out.writeInt(length); 253 if (length > 0) 254 { 255 for (Modification m : modifications) 256 { 257 m.writeExternal(out); 258 } 259 } 260 out.flush(); 261 Object retval = in.readObject(); 262 if (retval instanceof Exception ) 263 { 264 throw (Exception ) retval; 265 } 266 } 267 } 268 269 272 protected Object delegateRemove(Fqn name, Object key) throws Exception 273 { 274 synchronized (out) 275 { 276 out.reset(); 277 278 out.writeInt(DelegatingCacheLoader.delegateRemoveKey); 279 out.writeObject(name); 280 out.writeObject(key); 281 out.flush(); 282 Object retval = in.readObject(); 283 if (retval instanceof Exception ) 284 { 285 throw (Exception ) retval; 286 } 287 return retval; 288 } 289 } 290 291 294 protected void delegateRemove(Fqn name) throws Exception 295 { 296 synchronized (out) 297 { 298 out.reset(); 299 300 out.writeInt(DelegatingCacheLoader.delegateRemove); 301 out.writeObject(name); 302 out.flush(); 303 Object retval = in.readObject(); 304 if (retval instanceof Exception ) 305 { 306 throw (Exception ) retval; 307 } 308 } 309 } 310 311 314 protected void delegateRemoveData(Fqn name) throws Exception 315 { 316 synchronized (out) 317 { 318 out.reset(); 319 320 out.writeInt(DelegatingCacheLoader.delegateRemoveData); 321 out.writeObject(name); 322 out.flush(); 323 Object retval = in.readObject(); 324 if (retval instanceof Exception ) 325 { 326 throw (Exception ) retval; 327 } 328 } 329 } 330 331 @Override 332 protected void delegateLoadEntireState(ObjectOutputStream os) throws Exception 333 { 334 throw new UnsupportedOperationException ("operation is not currently supported - need to define semantics first"); 335 } 336 337 @Override 338 protected void delegateLoadState(Fqn subtree, ObjectOutputStream os) throws Exception 339 { 340 throw new UnsupportedOperationException ("operation is not currently supported - need to define semantics first"); 341 } 342 343 @Override 344 protected void delegateStoreEntireState(ObjectInputStream is) throws Exception 345 { 346 throw new UnsupportedOperationException ("operation is not currently supported - need to define semantics first"); 347 } 348 349 @Override 350 protected void delegateStoreState(Fqn subtree, ObjectInputStream is) throws Exception 351 { 352 throw new UnsupportedOperationException ("operation is not currently supported - need to define semantics first"); 353 } 354 355 } 356 | Popular Tags |