KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > aspects > versioned > DistributedSynchronizationManager


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.aspects.versioned;
23
24 import org.jboss.ha.framework.interfaces.HAPartition;
25 import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
26 import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
27 import org.jboss.logging.Logger;
28 import org.jboss.util.id.GUID;
29
30 import java.io.Serializable JavaDoc;
31 import java.util.ArrayList JavaDoc;
32 import java.util.Hashtable JavaDoc;
33 import java.util.Iterator JavaDoc;
34 import java.util.List JavaDoc;
35 import java.util.Vector JavaDoc;
36
37 /**
38  * Adds replication
39  *
40  * @author <a HREF="mailto:bill@jboss.org">Bill Burke</a>
41  * @version $Revision: 37406 $
42  */

43 public class DistributedSynchronizationManager extends LocalSynchronizationManager implements HAPartitionStateTransfer, HAMembershipListener
44 {
45
46    protected static Logger log = Logger.getLogger(DistributedSynchronizationManager.class);
47    protected HAPartition partition;
48    protected String JavaDoc domainName;
49    protected Hashtable JavaDoc heldLocks = new Hashtable JavaDoc();
50
51    public DistributedSynchronizationManager(String JavaDoc domainName, DistributedVersionManager versionManager, HAPartition partition)
52    {
53       super(versionManager);
54       this.partition = partition;
55       this.domainName = domainName + "/SynchManager";
56    }
57
58    public void create() throws Exception JavaDoc
59    {
60       //partition.subscribeToStateTransferEvents(domainName, this);
61
partition.registerRPCHandler(domainName, this);
62    }
63
64    public void start() throws Exception JavaDoc
65    {
66       pullState();
67    }
68
69    protected void pullState() throws Exception JavaDoc
70    {
71       Object JavaDoc[] args = {};
72       ArrayList JavaDoc rsp = partition.callMethodOnCluster(domainName, "getCurrentState", args, true);
73       if (rsp.size() > 0)
74          setCurrentState((Serializable JavaDoc)rsp.get(0));
75    }
76
77    public Serializable JavaDoc getCurrentState()
78    {
79       if(log.isTraceEnabled() )
80          log.trace("getCurrentState called");
81       return stateTable;
82    }
83
84    public void setCurrentState(Serializable JavaDoc newState)
85    {
86       if( log.isTraceEnabled() )
87          log.trace("setCurrentState called");
88       try
89       {
90          synchronized (tableLock)
91          {
92             this.stateTable = (Hashtable JavaDoc)newState;
93             log.trace("setCurrentState, size: " + stateTable.size());
94             Iterator JavaDoc it = stateTable.values().iterator();
95             while (it.hasNext())
96             {
97                DistributedState state = (DistributedState)it.next();
98                if (objectTable.containsKey(state.getGUID())) continue;
99                state.buildObject(this, versionManager);
100             }
101          }
102       }
103       catch (Exception JavaDoc ex)
104       {
105          log.error("failed to set state sent from cluster", ex);
106       }
107    }
108
109
110    public void membershipChanged(Vector JavaDoc deadMembers, Vector JavaDoc newMembers, Vector JavaDoc allMembers)
111    {
112       for (int i = 0; i < deadMembers.size(); i++)
113       {
114          Hashtable JavaDoc held = (Hashtable JavaDoc)heldLocks.remove(deadMembers.get(i));
115          if (held != null)
116          {
117             Iterator JavaDoc it = held.values().iterator();
118             while (it.hasNext())
119             {
120                List JavaDoc list = (List JavaDoc)it.next();
121                releaseHeldLocks(list);
122             }
123          }
124       }
125    }
126
127    public void sendNewObjects(List JavaDoc newObjects) throws Exception JavaDoc
128    {
129       log.trace("sending new objects");
130       try
131       {
132          Object JavaDoc[] args = {newObjects};
133          checkResponses(partition.callMethodOnCluster(domainName, "addNewObjects", args, true));
134       }
135       catch (Exception JavaDoc ex)
136       {
137          log.error("serious cache problems, data inconsistency is imminent", ex);
138          throw ex;
139       }
140
141    }
142
143    protected void sendClusterUpdatesAndRelease(GUID globalTxId, List JavaDoc clusterUpdates) throws Exception JavaDoc
144    {
145       try
146       {
147          Object JavaDoc[] args = {partition.getNodeName(), globalTxId, clusterUpdates};
148          checkResponses(partition.callMethodOnCluster(domainName, "updateObjects", args, true));
149
150       }
151       catch (Exception JavaDoc ex)
152       {
153          log.error("serious cache problems, data inconsistency is imminent", ex);
154          throw ex;
155       }
156    }
157    protected void acquireRemoteLocks(GUID globalTxId, List JavaDoc guids) throws Exception JavaDoc
158    {
159       try
160       {
161
162          Object JavaDoc[] args = {partition.getNodeName(), globalTxId, guids};
163          checkResponses(partition.callMethodOnCluster(domainName, "acquireLocks", args, true));
164       }
165       catch (Exception JavaDoc ex)
166       {
167          try
168          {
169             Object JavaDoc[] args = {partition.getNodeName()};
170             partition.callMethodOnCluster(domainName, "releaseHeldLocks", args, true);
171          }
172          catch (Exception JavaDoc ignored)
173          {
174          }
175          throw ex;
176       }
177    }
178
179    public void noTxUpdate(DistributedUpdate update) throws Exception JavaDoc
180    {
181       throw new RuntimeException JavaDoc("NOT IMPLEMENTED");
182    }
183
184    public void addNewObjects(List JavaDoc newObjects) throws Exception JavaDoc
185    {
186       // updates must be in table first
187
synchronized (tableLock)
188       {
189          for (int i = 0; i < newObjects.size(); i++)
190          {
191             DistributedState state = (DistributedState)newObjects.get(i);
192             // REVISIT synch
193
stateTable.put(state.getGUID(), state);
194          }
195          for (int i = 0; i < newObjects.size(); i++)
196          {
197             DistributedState state = (DistributedState)newObjects.get(i);
198             if (objectTable.containsKey(state.getGUID())) continue;
199             state.buildObject(this, versionManager);
200          }
201       }
202    }
203
204    public void updateObjects(String JavaDoc nodeName, GUID globalTxId, ArrayList JavaDoc updates) throws Exception JavaDoc
205    {
206       log.trace("updateObjects");
207       synchronized (tableLock)
208       {
209          for (int i = 0; i < updates.size(); i++)
210          {
211             DistributedUpdate update = (DistributedUpdate)updates.get(i);
212             // REVISIT: synch
213
DistributedState state = (DistributedState)stateTable.get(update.getGUID());
214             state.mergeState(update);
215             state.releaseWriteLock();
216          }
217       }
218       Hashtable JavaDoc table = (Hashtable JavaDoc)heldLocks.get(nodeName);
219       table.remove(globalTxId);
220       log.trace("end updateObjects");
221    }
222
223    public void releaseHeldLocks(String JavaDoc nodeName, GUID globalTxId)
224    {
225       Hashtable JavaDoc held = (Hashtable JavaDoc)heldLocks.get(nodeName);
226       if (held == null) return;
227
228       List JavaDoc locks = (List JavaDoc)held.remove(globalTxId);
229       if (locks != null) releaseHeldLocks(locks);
230    }
231
232    public void acquireLocks(String JavaDoc nodeName, GUID globalTxId, List JavaDoc list) throws Exception JavaDoc
233    {
234       log.trace("acquireLocks");
235       ArrayList JavaDoc locks = new ArrayList JavaDoc();
236       try
237       {
238          for (int i = 0; i < list.size(); i++)
239          {
240             GUID guid = (GUID)list.get(i);
241             DistributedState state = getState(guid);
242             state.acquireWriteLock();
243             locks.add(state);
244          }
245          Hashtable JavaDoc held = (Hashtable JavaDoc)heldLocks.get(nodeName);
246          if (held == null)
247          {
248             held = new Hashtable JavaDoc();
249             heldLocks.put(nodeName, held);
250          }
251          held.put(globalTxId, locks);
252       }
253       catch (Exception JavaDoc ex)
254       {
255          releaseHeldLocks(locks);
256          throw ex;
257       }
258       log.trace("end acquireLocks");
259    }
260
261    /**
262     * Checks whether any of the responses are exceptions. If yes, re-throws
263     * them (as exceptions or runtime exceptions).
264     * @param rsps
265     * @throws Exception
266     */

267    protected void checkResponses(List JavaDoc rsps) throws Exception JavaDoc {
268       Object JavaDoc rsp;
269       if(rsps != null) {
270          for(Iterator JavaDoc it=rsps.iterator(); it.hasNext();) {
271             rsp=it.next();
272             if(rsp != null) {
273                if(rsp instanceof RuntimeException JavaDoc)
274                   throw (RuntimeException JavaDoc)rsp;
275                if(rsp instanceof Exception JavaDoc)
276                   throw (Exception JavaDoc)rsp;
277             }
278          }
279       }
280    }
281
282 }
283
Popular Tags