KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > aspects > versioned > DistributedTxCache


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.aspects.versioned;
23 import java.io.Serializable JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.LinkedHashMap JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.Map JavaDoc;
28 import java.util.Set JavaDoc;
29
30 import javax.management.MBeanServer JavaDoc;
31 import javax.management.ObjectInstance JavaDoc;
32 import javax.management.Query JavaDoc;
33 import javax.management.QueryExp JavaDoc;
34
35 import org.jboss.aop.InstanceAdvised;
36 import org.jboss.ha.framework.interfaces.HAPartition;
37 import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
38 import org.jboss.ha.framework.server.ClusterPartitionMBean;
39 import org.jboss.logging.Logger;
40 import org.jboss.mx.util.MBeanProxyExt;
41 import org.jboss.mx.util.MBeanServerLocator;
42 import org.jboss.util.id.GUID;
43 /**
44  * This is a LRU cache. The TxCache itself is not transactional
45  * but any accesses to objects within the cache ARE transactional.
46  */

47 public class DistributedTxCache implements HAPartitionStateTransfer
48 {
49
50    private static class LRUCache extends LinkedHashMap JavaDoc
51    {
52       private static final long serialVersionUID = -402696519285213913L;
53
54       private int maxSize;
55       public LRUCache(int max)
56       {
57          super(16, 0.75F, true);
58          this.maxSize = max;
59       }
60       protected boolean removeEldestEntry(Map.Entry JavaDoc eldest)
61       {
62          return this.size() > maxSize;
63       }
64    }
65
66    protected static Logger log = Logger.getLogger(DistributedTxCache.class);
67    protected long lockTimeout;
68    protected DistributedSynchronizationManager synchManager;
69    protected DistributedVersionManager versionManager;
70    protected String JavaDoc partitionName;
71    protected HAPartition partition;
72    protected String JavaDoc cacheName;
73    protected LRUCache cache = null;
74    protected int maxSize;
75
76    public DistributedTxCache(int maxSize, long lockTimeout, String JavaDoc cacheName)
77    {
78       this(maxSize, lockTimeout, cacheName, "DefaultPartition");
79    }
80
81    public DistributedTxCache(int maxSize, long lockTimeout, String JavaDoc cacheName, String JavaDoc pName)
82    {
83       this.lockTimeout = lockTimeout;
84       this.partitionName = pName;
85       this.maxSize = maxSize;
86       this.cacheName = "DistributedTxCache/" + cacheName;
87    }
88
89    // HAPartition.HAPartitionStateTransfer Implementation --------------------------------------------------------
90

91    protected HAPartition findHAPartitionWithName (String JavaDoc name) throws Exception JavaDoc
92    {
93       HAPartition result = null;
94       MBeanServer JavaDoc server = MBeanServerLocator.locate();
95       // Class name match does not work with the AOP proxy :(
96
// QueryExp classEQ = Query.eq(Query.classattr(),
97
// Query.value(ClusterPartition.class.getName()));
98
QueryExp JavaDoc exp = Query.and(
99                         Query.match(
100                            Query.attr("Name"),
101                            Query.value("ClusterPartition")
102                         ),
103                         Query.match(
104                            Query.attr("PartitionName"),
105                            Query.value(name)
106                         )
107                       );
108
109       Set JavaDoc mbeans = server.queryMBeans (null, exp);
110       if (mbeans != null && mbeans.size () > 0)
111       {
112          for (Iterator JavaDoc iter = mbeans.iterator(); iter.hasNext();)
113          {
114             ObjectInstance JavaDoc inst = (ObjectInstance JavaDoc) iter.next();
115             try
116             {
117                ClusterPartitionMBean cp = (ClusterPartitionMBean) MBeanProxyExt.create (
118                                                    ClusterPartitionMBean.class,
119                                                    inst.getObjectName (),
120                                                    server);
121                result = cp.getHAPartition();
122                break;
123             }
124             catch (Exception JavaDoc e) {}
125          }
126       }
127       
128       return result;
129    }
130
131    public void create() throws Exception JavaDoc
132    {
133       this.partition = findHAPartitionWithName(partitionName);
134       //REVISIT: doesn't really buy us anything until JGroups synchronizes
135
// initial state correctly
136
//partition.subscribeToStateTransferEvents(cacheName, this);
137
//REVISIT AGAIN: Actually I talked to Bela about this. I can change the
138
//Clustering framework to do state transfer correctly
139
partition.registerRPCHandler(cacheName, this);
140       synchManager = new DistributedSynchronizationManager(cacheName, null, partition);
141       versionManager = new DistributedVersionManager(lockTimeout, synchManager);
142       synchManager.versionManager = versionManager;
143       synchManager.create();
144    }
145
146    public synchronized void start() throws Exception JavaDoc
147    {
148       synchManager.start();
149       pullState();
150       if (cache == null) cache = new LRUCache(maxSize);
151    }
152
153    protected void pullState() throws Exception JavaDoc
154    {
155       Object JavaDoc[] args = {};
156       List JavaDoc rsp = partition.callMethodOnCluster(cacheName, "getCurrentState", args, true);
157       if (rsp.size() > 0)
158       {
159          setCurrentState((Serializable JavaDoc)rsp.get(0));
160       }
161    }
162
163
164    public synchronized void _insert(Object JavaDoc key, Object JavaDoc obj)
165    {
166       cache.put(key, obj);
167    }
168
169    public void insert(Object JavaDoc key, Object JavaDoc obj) throws Exception JavaDoc
170    {
171       try
172       {
173          obj = versionManager.makeVersioned(obj);
174          if (versionManager.isVersioned(obj))
175          {
176             log.trace("Inserting versioned object");
177             obj = VersionManager.getGUID((InstanceAdvised)obj);
178          }
179          else
180          {
181             log.trace("Inserting a non-Versioned object");
182          }
183          Object JavaDoc[] args = {key, obj};
184          partition.callMethodOnCluster(cacheName, "_insert", args, false);
185       }
186       catch (Exception JavaDoc ex)
187       {
188          ex.printStackTrace();
189          throw ex;
190       }
191    }
192
193    public synchronized void _remove(Object JavaDoc key)
194    {
195       cache.remove(key);
196    }
197
198    public void remove(Object JavaDoc key)
199    {
200       Object JavaDoc[] args = {key};
201       try
202       {
203          partition.callMethodOnCluster(cacheName, "_remove", args, false);
204       }
205       catch (Exception JavaDoc ex)
206       {
207          throw new RuntimeException JavaDoc(ex);
208       }
209    }
210
211    
212    public synchronized void _flush()
213    {
214       cache.clear();
215    }
216
217    public void flush(Object JavaDoc key)
218    {
219       Object JavaDoc[] args = {};
220       try
221       {
222          partition.callMethodOnCluster(cacheName, "_flush", args, false);
223       }
224       catch (Exception JavaDoc ex)
225       {
226          throw new RuntimeException JavaDoc(ex);
227       }
228    }
229
230    
231    public synchronized Object JavaDoc get(Object JavaDoc key)
232    {
233       Object JavaDoc obj = cache.get(key);
234       if (obj instanceof GUID)
235       {
236          GUID guid = (GUID)obj;
237          obj = synchManager.getObject(guid);
238       }
239       return obj;
240    }
241
242    public Serializable JavaDoc getCurrentState()
243    {
244       log.trace("getCurrentState called on cache");
245       return cache;
246    }
247
248    public void setCurrentState(Serializable JavaDoc newState)
249    {
250       log.trace("setCurrentState called on cache");
251       synchronized (this)
252       {
253          this.cache = (LRUCache)newState;
254       }
255    }
256
257 }
258
Popular Tags