1 package org.jboss.cache.loader.tcp; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jboss.cache.CacheException; 6 import org.jboss.cache.CacheImpl; 7 import org.jboss.cache.Fqn; 8 import org.jboss.cache.Modification; 9 import org.jboss.cache.NodeSPI; 10 import org.jboss.cache.factories.XmlConfigurationParser; 11 import org.jboss.cache.jmx.JmxUtil; 12 import org.jboss.cache.loader.DelegatingCacheLoader; 13 14 import javax.management.MBeanServer ; 15 import javax.management.MBeanServerInvocationHandler ; 16 import javax.management.MalformedObjectNameException ; 17 import javax.management.ObjectName ; 18 import java.io.BufferedInputStream ; 19 import java.io.BufferedOutputStream ; 20 import java.io.IOException ; 21 import java.io.ObjectInputStream ; 22 import java.io.ObjectOutputStream ; 23 import java.net.InetAddress ; 24 import java.net.ServerSocket ; 25 import java.net.Socket ; 26 import java.net.SocketException ; 27 import java.net.UnknownHostException ; 28 import java.util.ArrayList ; 29 import java.util.HashMap ; 30 import java.util.LinkedList ; 31 import java.util.List ; 32 import java.util.Map ; 33 import java.util.Set ; 34 35 41 public class TcpCacheServer implements TcpCacheServerMBean 42 { 43 ServerSocket srv_sock; 44 InetAddress bind_addr = null; 45 int port = 7500; 46 CacheImpl cache; 47 ObjectName cache_name; 49 String config; 50 boolean running = true; 51 final List <Connection> conns = new LinkedList <Connection>(); 52 String agendId; 53 Thread serverThread; 54 57 boolean daemon = true; 58 static Log mylog = LogFactory.getLog(TcpCacheServer.class); 59 60 61 public TcpCacheServer() 62 { 63 } 64 65 public String getBindAddress() 66 { 67 return bind_addr != null ? bind_addr.toString() : "n/a"; 68 } 69 70 public void setBindAddress(String bind_addr) throws UnknownHostException 71 { 72 if (bind_addr != null) 73 { 74 this.bind_addr = InetAddress.getByName(bind_addr); 75 } 76 } 77 78 public int getPort() 79 { 80 return port; 81 } 82 83 public void setPort(int port) 84 { 85 this.port = port; 86 } 87 88 public String getMBeanServerName() 89 { 90 return agendId; 91 } 92 93 public void setMBeanServerName(String name) 94 { 95 agendId = name; 96 } 97 98 public String getConfig() 99 { 100 return config; 101 } 102 103 public void setConfig(String config) 104 { 105 this.config = config; 106 } 107 108 public CacheImpl getCache() 110 { 111 return cache; 112 } 113 114 public void setCache(CacheImpl cache) 115 { 116 this.cache = cache; 118 } 119 120 public String getCacheName() 121 { 122 return cache_name != null ? cache_name.toString() : "n/a"; 123 } 124 125 public void setCacheName(String cache_name) throws MalformedObjectNameException 126 { 127 this.cache_name = new ObjectName (cache_name); 128 } 129 130 public void start() throws Exception 131 { 132 if (cache == null) 133 { 134 MBeanServer server = JmxUtil.getMBeanServer(); 135 if (cache_name != null && server != null) 137 { 138 cache = (CacheImpl) MBeanServerInvocationHandler.newProxyInstance(server, cache_name, CacheImpl.class, false); 140 } 141 } 142 143 if (cache == null) 144 { if (config != null) 146 { 147 cache = new CacheImpl(); 148 cache.setConfiguration(new XmlConfigurationParser().parseFile(this.config)); 149 cache.create(); 150 cache.start(); 151 } 152 } 153 154 if (cache == null) 155 { 156 throw new CacheException("cache reference is not set"); 157 } 158 159 160 srv_sock = new ServerSocket (port, 10, bind_addr); 161 System.out.println("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort()); 162 mylog.info("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort()); 163 164 running = true; 165 166 serverThread = new Thread ("TcpCacheServer") 167 { 168 public void run() 169 { 170 try 171 { 172 while (running) 173 { 174 Socket client_sock = srv_sock.accept(); 175 Connection conn = new Connection(client_sock, cache); 176 conns.add(conn); 177 conn.start(); 178 } 179 } 180 catch (SocketException se) 181 { 182 if (!running) 183 { 184 mylog.info("Shutting down TcpCacheServer"); 187 } 188 else 189 { 190 mylog.error("Caught exception! Shutting down server thread.", se); 191 } 192 } 193 catch (IOException e) 194 { 195 mylog.error("Caught exception! Shutting down server thread.", e); 196 } 197 } 198 }; 199 serverThread.setDaemon(daemon); 200 serverThread.start(); 201 202 } 203 204 public void stop() 205 { 206 running = false; 207 for (Connection conn : conns) 208 { 209 conn.close(); 210 } 211 conns.clear(); 212 213 if (srv_sock != null) 214 { 215 try 216 { 217 srv_sock.close(); 218 srv_sock = null; 219 } 220 catch (IOException e) 221 { 222 } 224 } 225 } 226 227 228 public String getConnections() 229 { 230 StringBuffer sb = new StringBuffer (); 231 sb.append(conns.size()).append(" connections:\n"); 232 for (Connection c : conns) 233 { 234 sb.append(c).append("\n"); 235 } 236 return sb.toString(); 237 } 238 239 240 public void create() 241 { 242 } 243 244 public void destroy() 245 { 246 } 247 248 249 private class Connection implements Runnable 250 { 251 Socket sock = null; 252 ObjectInputStream input = null; 253 ObjectOutputStream output = null; 254 CacheImpl c; 255 Thread t = null; 256 257 public Connection(Socket sock, CacheImpl cache) throws IOException 258 { 259 this.sock = sock; 260 261 output = new ObjectOutputStream (new BufferedOutputStream (sock.getOutputStream())); 262 output.flush(); 263 264 input = new ObjectInputStream (new BufferedInputStream (sock.getInputStream())); 265 266 c = cache; 267 } 268 269 270 public void start() 271 { 272 t = new Thread (this, "TcpCacheServer.Connection"); 273 t.setDaemon(true); 274 t.start(); 275 } 276 277 public void close() 278 { 279 t = null; 280 try 281 { 282 if (output != null) output.close(); 283 } 284 catch (Throwable th) 285 { 286 } 288 try 289 { 290 if (input != null) input.close(); 291 } 292 catch (Throwable th) 293 { 294 } 296 try 297 { 298 if (sock != null) sock.close(); 299 } 300 catch (Throwable th) 301 { 302 } 304 305 conns.remove(this); 307 } 308 309 public void run() 310 { 311 int op; 312 Fqn fqn; 313 Object key, val, retval; 314 NodeSPI n; 315 boolean flag; 316 317 while (t != null && Thread.currentThread().equals(t)) 318 { 319 try 320 { 321 op = input.readInt(); 322 } 323 catch (IOException e) 324 { 325 mylog.debug("Client closed socket"); 326 close(); 327 break; 328 } 329 330 try 331 { 332 output.reset(); 333 switch (op) 334 { 335 case DelegatingCacheLoader.delegateGetChildrenNames: 336 fqn = (Fqn) input.readObject(); 337 Set children = c.getChildrenNames(fqn); 338 output.writeObject(children); break; 340 case DelegatingCacheLoader.delegateGetKey: 341 fqn = (Fqn) input.readObject(); 342 key = input.readObject(); 343 retval = c.get(fqn, key); 344 output.writeObject(retval); 345 break; 346 case DelegatingCacheLoader.delegateGet: 347 fqn = (Fqn) input.readObject(); 348 n = (NodeSPI) c.get(fqn); 349 if (n == null) 350 { output.writeObject(n); 352 break; 353 } 354 Map map = n.getDataDirect(); 355 if (map == null) map = new HashMap (); 356 output.writeObject(map); 357 break; 358 case DelegatingCacheLoader.delegateExists: 359 fqn = (Fqn) input.readObject(); 360 flag = c.exists(fqn); 361 output.writeObject(Boolean.valueOf(flag)); 362 break; 363 case DelegatingCacheLoader.delegatePutKeyVal: 364 fqn = (Fqn) input.readObject(); 365 key = input.readObject(); 366 val = input.readObject(); 367 retval = c.put(fqn, key, val); 368 output.writeObject(retval); 369 break; 370 case DelegatingCacheLoader.delegatePut: 371 fqn = (Fqn) input.readObject(); 372 map = (Map ) input.readObject(); 373 c.put(fqn, map); 374 output.writeObject(Boolean.TRUE); 375 break; 376 377 case DelegatingCacheLoader.putList: 378 int length = input.readInt(); 379 retval = Boolean.TRUE; 380 if (length > 0) 381 { 382 Modification mod; 383 List <Modification> mods = new ArrayList <Modification>(length); 384 for (int i = 0; i < length; i++) 385 { 386 mod = new Modification(); 387 mod.readExternal(input); 388 mods.add(mod); 389 } 390 try 391 { 392 handleModifications(mods); 393 } 394 catch (Exception ex) 395 { 396 retval = ex; 397 } 398 } 399 output.writeObject(retval); 400 break; 401 case DelegatingCacheLoader.delegateRemoveKey: 402 fqn = (Fqn) input.readObject(); 403 key = input.readObject(); 404 retval = c.remove(fqn, key); 405 output.writeObject(retval); 406 break; 407 case DelegatingCacheLoader.delegateRemove: 408 fqn = (Fqn) input.readObject(); 409 c.remove(fqn); 410 output.writeObject(Boolean.TRUE); 411 break; 412 case DelegatingCacheLoader.delegateRemoveData: 413 fqn = (Fqn) input.readObject(); 414 c.removeData(fqn); 415 output.writeObject(Boolean.TRUE); 416 break; 417 case DelegatingCacheLoader.delegateLoadEntireState: 418 ObjectOutputStream os = (ObjectOutputStream ) input.readObject(); 419 if (c.getCacheLoader() != null) 420 { 421 c.getCacheLoader().loadEntireState(os); 422 } 423 output.writeObject(Boolean.TRUE); 424 break; 425 case DelegatingCacheLoader.delegateStoreEntireState: 426 ObjectInputStream is = (ObjectInputStream ) input.readObject(); 427 if (c.getCacheLoader() != null) 428 { 429 c.getCacheLoader().storeEntireState(is); 430 } 431 output.writeObject(Boolean.TRUE); 432 break; 433 default: 434 mylog.error("Operation " + op + " unknown"); 435 break; 436 } 437 output.flush(); 438 } 439 catch (Exception e) 440 { 441 try 442 { 443 output.writeObject(e); 444 output.flush(); 445 } 446 catch (IOException e1) 447 { 448 e1.printStackTrace(); 449 } 450 } 451 } 452 } 453 454 455 public String toString() 456 { 457 StringBuffer sb = new StringBuffer (); 458 if (sock != null) 459 { 460 sb.append(sock.getRemoteSocketAddress()); 461 } 462 return sb.toString(); 463 } 464 465 protected void handleModifications(List <Modification> modifications) throws CacheException 466 { 467 468 for (Modification m : modifications) 469 { 470 switch (m.getType()) 471 { 472 case PUT_DATA: 473 c.put(m.getFqn(), m.getData()); 474 break; 475 case PUT_DATA_ERASE: 476 c.put(m.getFqn(), m.getData()); 477 break; 478 case PUT_KEY_VALUE: 479 c.put(m.getFqn(), m.getKey(), m.getValue()); 480 break; 481 case REMOVE_DATA: 482 c.removeData(m.getFqn()); 483 break; 484 case REMOVE_KEY_VALUE: 485 c.remove(m.getFqn(), m.getKey()); 486 break; 487 case REMOVE_NODE: 488 c.remove(m.getFqn()); 489 break; 490 case MOVE: 491 c.move(m.getFqn(), m.getFqn2()); 492 break; 493 default: 494 mylog.error("modification type " + m.getType() + " not known"); 495 break; 496 } 497 } 498 } 499 500 501 } 502 503 504 public static void main(String [] args) throws Exception 505 { 506 String bind_addr = null; 507 int port = 7500; 508 TcpCacheServer server; 509 String config = null; 510 511 for (int i = 0; i < args.length; i++) 512 { 513 if (args[i].equals("-bind_addr")) 514 { 515 bind_addr = args[++i]; 516 continue; 517 } 518 if (args[i].equals("-port")) 519 { 520 port = Integer.parseInt(args[++i]); 521 continue; 522 } 523 if (args[i].equals("-config")) 524 { 525 config = args[++i]; 526 continue; 527 } 528 help(); 529 return; 530 } 531 server = new TcpCacheServer(); 532 server.daemon = false; 533 server.setBindAddress(bind_addr); 534 server.setPort(port); 535 server.setConfig(config); 536 server.create(); 537 server.start(); 538 } 539 540 541 private static void help() 542 { 543 System.out.println("TcpCacheServer [-bind_addr <address>] [-port <port>] [-config <config file>] [-help]"); 544 } 545 } 546 | Popular Tags |