KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > loader > tcp > TcpCacheServer


1 package org.jboss.cache.loader.tcp;
2
3 import org.apache.commons.logging.Log;
4 import org.apache.commons.logging.LogFactory;
5 import org.jboss.cache.CacheException;
6 import org.jboss.cache.CacheImpl;
7 import org.jboss.cache.Fqn;
8 import org.jboss.cache.Modification;
9 import org.jboss.cache.NodeSPI;
10 import org.jboss.cache.factories.XmlConfigurationParser;
11 import org.jboss.cache.jmx.JmxUtil;
12 import org.jboss.cache.loader.DelegatingCacheLoader;
13
14 import javax.management.MBeanServer JavaDoc;
15 import javax.management.MBeanServerInvocationHandler JavaDoc;
16 import javax.management.MalformedObjectNameException JavaDoc;
17 import javax.management.ObjectName JavaDoc;
18 import java.io.BufferedInputStream JavaDoc;
19 import java.io.BufferedOutputStream JavaDoc;
20 import java.io.IOException JavaDoc;
21 import java.io.ObjectInputStream JavaDoc;
22 import java.io.ObjectOutputStream JavaDoc;
23 import java.net.InetAddress JavaDoc;
24 import java.net.ServerSocket JavaDoc;
25 import java.net.Socket JavaDoc;
26 import java.net.SocketException JavaDoc;
27 import java.net.UnknownHostException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.HashMap JavaDoc;
30 import java.util.LinkedList JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.Map JavaDoc;
33 import java.util.Set JavaDoc;
34
35 /**
36  * TCP-IP based CacheServer, configure TcpDelegatingCacheLoader with host and port of this server
37  *
38  * @author Bela Ban
39  * @version $Id: TcpCacheServer.java,v 1.24 2007/01/04 05:35:41 msurtani Exp $
40  */

41 public class TcpCacheServer implements TcpCacheServerMBean
42 {
43    ServerSocket JavaDoc srv_sock;
44    InetAddress JavaDoc bind_addr = null;
45    int port = 7500;
46    CacheImpl cache;
47    //TreeCacheMBean cache;
48
ObjectName JavaDoc cache_name;
49    String JavaDoc config;
50    boolean running = true;
51    final List JavaDoc<Connection> conns = new LinkedList JavaDoc<Connection>();
52    String JavaDoc agendId;
53    Thread JavaDoc serverThread;
54    /**
55     * whether or not to start the server thread as a daemon. Should be false if started from the command line, true if started as an MBean.
56     */

57    boolean daemon = true;
58    static Log mylog = LogFactory.getLog(TcpCacheServer.class);
59
60
61    public TcpCacheServer()
62    {
63    }
64
65    public String JavaDoc getBindAddress()
66    {
67       return bind_addr != null ? bind_addr.toString() : "n/a";
68    }
69
70    public void setBindAddress(String JavaDoc bind_addr) throws UnknownHostException JavaDoc
71    {
72       if (bind_addr != null)
73       {
74          this.bind_addr = InetAddress.getByName(bind_addr);
75       }
76    }
77
78    public int getPort()
79    {
80       return port;
81    }
82
83    public void setPort(int port)
84    {
85       this.port = port;
86    }
87
88    public String JavaDoc getMBeanServerName()
89    {
90       return agendId;
91    }
92
93    public void setMBeanServerName(String JavaDoc name)
94    {
95       agendId = name;
96    }
97
98    public String JavaDoc getConfig()
99    {
100       return config;
101    }
102
103    public void setConfig(String JavaDoc config)
104    {
105       this.config = config;
106    }
107
108    //public TreeCacheMBean getCache() {
109
public CacheImpl getCache()
110    {
111       return cache;
112    }
113
114    public void setCache(CacheImpl cache)
115    {
116       //public void setCache(TreeCacheMBean cache) {
117
this.cache = cache;
118    }
119
120    public String JavaDoc getCacheName()
121    {
122       return cache_name != null ? cache_name.toString() : "n/a";
123    }
124
125    public void setCacheName(String JavaDoc cache_name) throws MalformedObjectNameException JavaDoc
126    {
127       this.cache_name = new ObjectName JavaDoc(cache_name);
128    }
129
130    public void start() throws Exception JavaDoc
131    {
132       if (cache == null)
133       {
134          MBeanServer JavaDoc server = JmxUtil.getMBeanServer();
135          // 1. check whether we have an object name, pointing to the cache MBean
136
if (cache_name != null && server != null)
137          {
138             //cache = (CacheImpl) MBeanProxyExt.create(CacheImpl.class, cache_name, server);
139
cache = (CacheImpl) MBeanServerInvocationHandler.newProxyInstance(server, cache_name, CacheImpl.class, false);
140          }
141       }
142
143       if (cache == null)
144       {// still not set
145
if (config != null)
146          {
147             cache = new CacheImpl();
148             cache.setConfiguration(new XmlConfigurationParser().parseFile(this.config));
149             cache.create();
150             cache.start();
151          }
152       }
153
154       if (cache == null)
155       {
156          throw new CacheException("cache reference is not set");
157       }
158
159
160       srv_sock = new ServerSocket JavaDoc(port, 10, bind_addr);
161       System.out.println("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort());
162       mylog.info("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort());
163
164       running = true;
165
166       serverThread = new Thread JavaDoc("TcpCacheServer")
167       {
168          public void run()
169          {
170             try
171             {
172                while (running)
173                {
174                   Socket JavaDoc client_sock = srv_sock.accept();
175                   Connection conn = new Connection(client_sock, cache);
176                   conns.add(conn);
177                   conn.start();
178                }
179             }
180             catch (SocketException JavaDoc se)
181             {
182                if (!running)
183                {
184                   // this is because of the stop() lifecycle method being called.
185
// ignore.
186
mylog.info("Shutting down TcpCacheServer");
187                }
188                else
189                {
190                   mylog.error("Caught exception! Shutting down server thread.", se);
191                }
192             }
193             catch (IOException JavaDoc e)
194             {
195                mylog.error("Caught exception! Shutting down server thread.", e);
196             }
197          }
198       };
199       serverThread.setDaemon(daemon);
200       serverThread.start();
201
202    }
203
204    public void stop()
205    {
206       running = false;
207       for (Connection conn : conns)
208       {
209          conn.close();
210       }
211       conns.clear();
212
213       if (srv_sock != null)
214       {
215          try
216          {
217             srv_sock.close();
218             srv_sock = null;
219          }
220          catch (IOException JavaDoc e)
221          {
222             // nada
223
}
224       }
225    }
226
227
228    public String JavaDoc getConnections()
229    {
230       StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
231       sb.append(conns.size()).append(" connections:\n");
232       for (Connection c : conns)
233       {
234          sb.append(c).append("\n");
235       }
236       return sb.toString();
237    }
238
239
240    public void create()
241    {
242    }
243
244    public void destroy()
245    {
246    }
247
248
249    private class Connection implements Runnable JavaDoc
250    {
251       Socket JavaDoc sock = null;
252       ObjectInputStream JavaDoc input = null;
253       ObjectOutputStream JavaDoc output = null;
254       CacheImpl c;
255       Thread JavaDoc t = null;
256
257       public Connection(Socket JavaDoc sock, CacheImpl cache) throws IOException JavaDoc
258       {
259          this.sock = sock;
260
261          output = new ObjectOutputStream JavaDoc(new BufferedOutputStream JavaDoc(sock.getOutputStream()));
262          output.flush();
263
264          input = new ObjectInputStream JavaDoc(new BufferedInputStream JavaDoc(sock.getInputStream()));
265
266          c = cache;
267       }
268
269
270       public void start()
271       {
272          t = new Thread JavaDoc(this, "TcpCacheServer.Connection");
273          t.setDaemon(true);
274          t.start();
275       }
276
277       public void close()
278       {
279          t = null;
280          try
281          {
282             if (output != null) output.close();
283          }
284          catch (Throwable JavaDoc th)
285          {
286             // nada
287
}
288          try
289          {
290             if (input != null) input.close();
291          }
292          catch (Throwable JavaDoc th)
293          {
294             // nada
295
}
296          try
297          {
298             if (sock != null) sock.close();
299          }
300          catch (Throwable JavaDoc th)
301          {
302             // nada
303
}
304
305          // remove self from connections list
306
conns.remove(this);
307       }
308
309       public void run()
310       {
311          int op;
312          Fqn fqn;
313          Object JavaDoc key, val, retval;
314          NodeSPI n;
315          boolean flag;
316
317          while (t != null && Thread.currentThread().equals(t))
318          {
319             try
320             {
321                op = input.readInt();
322             }
323             catch (IOException JavaDoc e)
324             {
325                mylog.debug("Client closed socket");
326                close();
327                break;
328             }
329
330             try
331             {
332                output.reset();
333                switch (op)
334                {
335                   case DelegatingCacheLoader.delegateGetChildrenNames:
336                      fqn = (Fqn) input.readObject();
337                      Set JavaDoc children = c.getChildrenNames(fqn);
338                      output.writeObject(children);// this may be null - that's okay
339
break;
340                   case DelegatingCacheLoader.delegateGetKey:
341                      fqn = (Fqn) input.readObject();
342                      key = input.readObject();
343                      retval = c.get(fqn, key);
344                      output.writeObject(retval);
345                      break;
346                   case DelegatingCacheLoader.delegateGet:
347                      fqn = (Fqn) input.readObject();
348                      n = (NodeSPI) c.get(fqn);
349                      if (n == null)
350                      {// node doesn't exist - return null
351
output.writeObject(n);
352                         break;
353                      }
354                      Map JavaDoc map = n.getDataDirect();
355                      if (map == null) map = new HashMap JavaDoc();
356                      output.writeObject(map);
357                      break;
358                   case DelegatingCacheLoader.delegateExists:
359                      fqn = (Fqn) input.readObject();
360                      flag = c.exists(fqn);
361                      output.writeObject(Boolean.valueOf(flag));
362                      break;
363                   case DelegatingCacheLoader.delegatePutKeyVal:
364                      fqn = (Fqn) input.readObject();
365                      key = input.readObject();
366                      val = input.readObject();
367                      retval = c.put(fqn, key, val);
368                      output.writeObject(retval);
369                      break;
370                   case DelegatingCacheLoader.delegatePut:
371                      fqn = (Fqn) input.readObject();
372                      map = (Map JavaDoc) input.readObject();
373                      c.put(fqn, map);
374                      output.writeObject(Boolean.TRUE);
375                      break;
376
377                   case DelegatingCacheLoader.putList:
378                      int length = input.readInt();
379                      retval = Boolean.TRUE;
380                      if (length > 0)
381                      {
382                         Modification mod;
383                         List JavaDoc<Modification> mods = new ArrayList JavaDoc<Modification>(length);
384                         for (int i = 0; i < length; i++)
385                         {
386                            mod = new Modification();
387                            mod.readExternal(input);
388                            mods.add(mod);
389                         }
390                         try
391                         {
392                            handleModifications(mods);
393                         }
394                         catch (Exception JavaDoc ex)
395                         {
396                            retval = ex;
397                         }
398                      }
399                      output.writeObject(retval);
400                      break;
401                   case DelegatingCacheLoader.delegateRemoveKey:
402                      fqn = (Fqn) input.readObject();
403                      key = input.readObject();
404                      retval = c.remove(fqn, key);
405                      output.writeObject(retval);
406                      break;
407                   case DelegatingCacheLoader.delegateRemove:
408                      fqn = (Fqn) input.readObject();
409                      c.remove(fqn);
410                      output.writeObject(Boolean.TRUE);
411                      break;
412                   case DelegatingCacheLoader.delegateRemoveData:
413                      fqn = (Fqn) input.readObject();
414                      c.removeData(fqn);
415                      output.writeObject(Boolean.TRUE);
416                      break;
417                   case DelegatingCacheLoader.delegateLoadEntireState:
418                      ObjectOutputStream JavaDoc os = (ObjectOutputStream JavaDoc) input.readObject();
419                      if (c.getCacheLoader() != null)
420                      {
421                         c.getCacheLoader().loadEntireState(os);
422                      }
423                      output.writeObject(Boolean.TRUE);
424                      break;
425                   case DelegatingCacheLoader.delegateStoreEntireState:
426                      ObjectInputStream JavaDoc is = (ObjectInputStream JavaDoc) input.readObject();
427                      if (c.getCacheLoader() != null)
428                      {
429                         c.getCacheLoader().storeEntireState(is);
430                      }
431                      output.writeObject(Boolean.TRUE);
432                      break;
433                   default:
434                      mylog.error("Operation " + op + " unknown");
435                      break;
436                }
437                output.flush();
438             }
439             catch (Exception JavaDoc e)
440             {
441                try
442                {
443                   output.writeObject(e);
444                   output.flush();
445                }
446                catch (IOException JavaDoc e1)
447                {
448                   e1.printStackTrace();
449                }
450             }
451          }
452       }
453
454
455       public String JavaDoc toString()
456       {
457          StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
458          if (sock != null)
459          {
460             sb.append(sock.getRemoteSocketAddress());
461          }
462          return sb.toString();
463       }
464
465       protected void handleModifications(List JavaDoc<Modification> modifications) throws CacheException
466       {
467
468          for (Modification m : modifications)
469          {
470             switch (m.getType())
471             {
472                case PUT_DATA:
473                   c.put(m.getFqn(), m.getData());
474                   break;
475                case PUT_DATA_ERASE:
476                   c.put(m.getFqn(), m.getData());
477                   break;
478                case PUT_KEY_VALUE:
479                   c.put(m.getFqn(), m.getKey(), m.getValue());
480                   break;
481                case REMOVE_DATA:
482                   c.removeData(m.getFqn());
483                   break;
484                case REMOVE_KEY_VALUE:
485                   c.remove(m.getFqn(), m.getKey());
486                   break;
487                case REMOVE_NODE:
488                   c.remove(m.getFqn());
489                   break;
490                case MOVE:
491                   c.move(m.getFqn(), m.getFqn2());
492                   break;
493                default:
494                   mylog.error("modification type " + m.getType() + " not known");
495                   break;
496             }
497          }
498       }
499
500
501    }
502
503
504    public static void main(String JavaDoc[] args) throws Exception JavaDoc
505    {
506       String JavaDoc bind_addr = null;
507       int port = 7500;
508       TcpCacheServer server;
509       String JavaDoc config = null;
510
511       for (int i = 0; i < args.length; i++)
512       {
513          if (args[i].equals("-bind_addr"))
514          {
515             bind_addr = args[++i];
516             continue;
517          }
518          if (args[i].equals("-port"))
519          {
520             port = Integer.parseInt(args[++i]);
521             continue;
522          }
523          if (args[i].equals("-config"))
524          {
525             config = args[++i];
526             continue;
527          }
528          help();
529          return;
530       }
531       server = new TcpCacheServer();
532       server.daemon = false;
533       server.setBindAddress(bind_addr);
534       server.setPort(port);
535       server.setConfig(config);
536       server.create();
537       server.start();
538    }
539
540
541    private static void help()
542    {
543       System.out.println("TcpCacheServer [-bind_addr <address>] [-port <port>] [-config <config file>] [-help]");
544    }
545 }
546
Popular Tags