KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > requestmanager > distributed > ControllerFailureCleanupThread


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2006 Continuent, Inc.
4  * Contact: sequoia@continuent.org
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * Initial developer(s): Emmanuel Cecchet.
19  * Contributor(s): ______________________.
20  */

21
22 package org.continuent.sequoia.controller.requestmanager.distributed;
23
24 import java.sql.SQLException JavaDoc;
25 import java.util.ArrayList JavaDoc;
26 import java.util.Collection JavaDoc;
27 import java.util.ConcurrentModificationException JavaDoc;
28 import java.util.HashMap JavaDoc;
29 import java.util.Hashtable JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.util.LinkedList JavaDoc;
32 import java.util.List JavaDoc;
33 import java.util.Map JavaDoc;
34
35 import org.continuent.hedera.adapters.MulticastRequestAdapter;
36 import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
37 import org.continuent.sequoia.common.log.Trace;
38 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
39 import org.continuent.sequoia.controller.requests.AbstractRequest;
40 import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
41 import org.continuent.sequoia.controller.virtualdatabase.protocol.FlushGroupCommunicationMessages;
42
43 /**
44  * This class defines a ControllerFailureCleanupThread
45  *
46  * @author <a HREF="mailto:emmanuel.cecchet@continuent.com">Emmanuel Cecchet</a>
47  * @version 1.0
48  */

49 public class ControllerFailureCleanupThread extends Thread JavaDoc
50 {
51   private Hashtable JavaDoc cleanupThreadsList;
52   private HashMap JavaDoc writesFlushed;
53   private DistributedVirtualDatabase dvdb;
54   private DistributedRequestManager drm;
55   private long failedControllerId;
56   private long failoverTimeoutInMs;
57   private List JavaDoc persistentConnectionsToRecover;
58   private List JavaDoc transactionsToRecover;
59   private Trace logger = Trace
60                                                 .getLogger("org.continuent.sequoia.controller.requestmanager.cleanup");
61
62   /**
63    * Creates a new <code>ControllerFailureCleanupThread</code> object
64    *
65    * @param failoverTimeoutInMs time to wait before proceeding to the cleanup
66    * @param failedControllerId id of the controller that failed
67    * @param distributedVirtualDatabase the distributed virtual database on which
68    * we should proceed with the cleanup
69    * @param cleanupThreads list of cleanup threads from which we should remove
70    * ourselves when we terminate
71    * @param writesFlushed map of controllers for which writes have been flushed
72    */

73   public ControllerFailureCleanupThread(
74       DistributedVirtualDatabase distributedVirtualDatabase,
75       long failedControllerId, long failoverTimeoutInMs,
76       Hashtable JavaDoc cleanupThreads, HashMap JavaDoc writesFlushed)
77   {
78     super("ControllerFailureCleanupThread for controller " + failedControllerId);
79     this.dvdb = distributedVirtualDatabase;
80     drm = (DistributedRequestManager) dvdb.getRequestManager();
81     this.failedControllerId = failedControllerId;
82     this.failoverTimeoutInMs = failoverTimeoutInMs;
83     this.cleanupThreadsList = cleanupThreads;
84     this.writesFlushed = writesFlushed;
85   }
86
87   /**
88    * {@inheritDoc}
89    *
90    * @see java.lang.Runnable#run()
91    */

92   public void run()
93   {
94     try
95     {
96       doRun();
97     }
98     catch (Throwable JavaDoc t)
99     {
100       logger.fatal("Cleanup failed", t);
101     }
102   }
103
104   /**
105    * @see java.lang.Thread#run()
106    */

107   public void doRun()
108   {
109     Long JavaDoc controllerIdKey = new Long JavaDoc(failedControllerId);
110     try
111     {
112       if (failoverTimeoutInMs == 0)
113       {
114         synchronized (writesFlushed)
115         {
116           writesFlushed.put(controllerIdKey, Boolean.TRUE);
117           writesFlushed.notifyAll();
118         }
119         return;
120       }
121
122       // Set "writes flushed" flag to false
123
synchronized (writesFlushed)
124       {
125         if (!writesFlushed.containsKey(controllerIdKey))
126           writesFlushed.put(controllerIdKey, Boolean.FALSE);
127       }
128
129       synchronized (this)
130       {
131
132         try
133         {
134           /*
135            * Send a FlushGroupCommunicationMessages message to ourselves to be
136            * sure that everything gets flushed (especially commit/rollback
137            * before building the open transaction list)
138            */

139           MulticastRequestAdapter multicastRequestAdapter = dvdb
140               .getMulticastRequestAdapter();
141           ArrayList JavaDoc dest = new ArrayList JavaDoc();
142
143           if ((multicastRequestAdapter != null)
144               && (multicastRequestAdapter.getChannel() != null))
145           {
146             dest.add(multicastRequestAdapter.getChannel().getLocalMembership());
147             Object JavaDoc ret = multicastRequestAdapter.multicastMessage(dest,
148                 new FlushGroupCommunicationMessages(failedControllerId),
149                 MulticastRequestAdapter.WAIT_ALL, 0);
150             if (ret instanceof Exception JavaDoc)
151               throw (Exception JavaDoc) ret;
152           }
153         }
154         catch (Throwable JavaDoc e)
155         {
156           String JavaDoc errorMessage = "Failed to send flush message in ControllerFailureCleanupThread";
157           logger.error(errorMessage, e);
158         }
159
160         // Set "writes flushed" flag to true and notify blocked vdb worker
161
// threads
162
synchronized (writesFlushed)
163         {
164           writesFlushed.put(controllerIdKey, Boolean.TRUE);
165           writesFlushed.notifyAll();
166         }
167
168         notifyAll();
169       }
170
171       // Wait for clients to failover
172
try
173       {
174         synchronized (this)
175         {
176           if (logger.isInfoEnabled())
177             logger.info("Waiting " + failoverTimeoutInMs
178                 + "ms for client of controller " + controllerIdKey
179                 + " to failover");
180           wait(failoverTimeoutInMs);
181         }
182       }
183       catch (InterruptedException JavaDoc ignore)
184       {
185       }
186
187       /*
188        * Notify queries that were expecting an answer from the failed controller
189        * that this will never happen.
190        */

191       drm.cleanupAllFailedQueriesFromController(controllerIdKey.longValue());
192
193       /*
194        * Build list of transactions and persistent connections for which
195        * failover did not happen. Start with pending requests in the scheduler
196        * then look for active resources in the distributed request manager.
197        */

198       transactionsToRecover = parseTransactionMetadataListForControllerId(drm
199           .getScheduler().getActiveTransactions());
200       rollbackInactiveTransactions(controllerIdKey);
201       persistentConnectionsToRecover = parsePersistentConnections(drm
202           .getScheduler().getOpenPersistentConnections());
203       // getTransactionAndPersistentConnectionsFromRequests(drm.getScheduler()
204
// .getActiveReadRequests());
205
getTransactionAndPersistentConnectionsFromRequests(drm.getScheduler()
206           .getActiveWriteRequests());
207
208       // If both lists are empty there is nothing to cleanup
209
if ((transactionsToRecover.isEmpty())
210           && (persistentConnectionsToRecover.isEmpty()))
211         return;
212
213       /*
214        * Ok, now everything should have been recovered. Cleanup all remaining
215        * objects. The following methods will take care of not cleaning up
216        * resources for which failover was notified through
217        * notifyTransactionFailover or notifyPersistentConnectionFailover.
218        */

219       // abortRemainingTransactions(controllerIdKey);
220
closeRemainingPersistentConnections(controllerIdKey);
221     }
222     finally
223     {
224       logger.info("Cleanup for controller " + failedControllerId
225           + " failure is completed.");
226
227       // Set "writes flushed" flag to true (may be overkill if it was done
228
// earlier in this method) and notify blocked vdb worker threads
229
synchronized (writesFlushed)
230       {
231         writesFlushed.put(controllerIdKey, Boolean.TRUE);
232         writesFlushed.notifyAll();
233       }
234
235       // Remove ourselves from the thread list
236
cleanupThreadsList.remove(this);
237     }
238   }
239
240   /**
241    * Rollback all the transactions for clients that did not reconnect. The
242    * transaction can exist at this point for 3 reasons.
243    * <ul>
244    * <li>The client did not have an active request when the controller failed
245    * <li>The client had an active request that completed after the failure
246    * <li>The active request is blocked by a transaction in the first state.
247    * <ul>
248    * In the first two cases the, it is safe to immediately rollback the
249    * transaction, because there is no current activity. In the third case, we
250    * need to wait until the request complete before doing the rollback. So we go
251    * through the list of transactions needing recovery repeatly rollbacking the
252    * inactive transactions. Rollingback transactions should let the other
253    * pending transactions complete.
254    *
255    * @param controllerIdKey the controller id
256    */

257   private void rollbackInactiveTransactions(Long JavaDoc controllerIdKey)
258   {
259     List JavaDoc transactionsRecovered = dvdb.getTransactionsRecovered(controllerIdKey);
260     Map JavaDoc readRequests = drm.getScheduler().getActiveReadRequests();
261     Map JavaDoc writeRequests = drm.getScheduler().getActiveWriteRequests();
262     while (!transactionsToRecover.isEmpty())
263     {
264       int waitingForCompletion = 0;
265       // Iterate on the list of active transactions (based on scheduler
266
// knowledge) that were started by the failed controller.
267
for (Iterator JavaDoc iter = transactionsToRecover.iterator(); iter.hasNext();)
268       {
269         Long JavaDoc lTid = (Long JavaDoc) iter.next();
270
271         if ((transactionsRecovered == null)
272             || !transactionsRecovered.contains(lTid))
273         {
274           if (!hasRequestForTransaction(lTid.longValue(), readRequests)
275               && !hasRequestForTransaction(lTid.longValue(), writeRequests))
276           {
277             if (logger.isInfoEnabled())
278               logger.info("Rollingback transaction " + lTid
279                   + " started by dead controller " + failedControllerId
280                   + " since client did not ask for failover");
281
282             try
283             {
284               boolean logRollback = dvdb.getRecoveryLog()
285                   .hasLoggedBeginForTransaction(lTid);
286               dvdb.rollback(lTid.longValue(), logRollback);
287             }
288             catch (SQLException JavaDoc e)
289             {
290               logger.error("Failed to rollback transaction " + lTid
291                   + " started by dead controller " + failedControllerId, e);
292             }
293             catch (VirtualDatabaseException e)
294             {
295               logger.error("Failed to rollback transaction " + lTid
296                   + " started by dead controller " + failedControllerId, e);
297             }
298
299             iter.remove();
300           }
301           else
302           {
303             waitingForCompletion++;
304             if (logger.isDebugEnabled())
305               logger.debug("Waiting for activity to complete for " + lTid
306                   + " started by dead controller " + failedControllerId
307                   + " since client did not ask for failover");
308           }
309         }
310       }
311
312       if (waitingForCompletion == 0)
313         break;
314       try
315       {
316         synchronized (writeRequests)
317         {
318           if (!writeRequests.isEmpty())
319           {
320             writeRequests.wait(500);
321             continue;
322           }
323         }
324         synchronized (readRequests)
325         {
326           if (!readRequests.isEmpty())
327           {
328             readRequests.wait(500);
329             continue;
330           }
331         }
332       }
333       catch (InterruptedException JavaDoc e)
334       {
335       }
336     } // while (!transactionsToRecover.isEmpty())
337
}
338
339   /**
340    * Returns true if the given map contains an AbstractRequest value that
341    * belongs to the given transaction.
342    *
343    * @param transactionId transaction id to look for
344    * @param map map of Long(transaction id) -> AbstractRequest
345    * @return true if a request in the map matches the transaction id
346    */

347   private boolean hasRequestForTransaction(long transactionId, Map JavaDoc map)
348   {
349     synchronized (map)
350     {
351       for (Iterator JavaDoc iter = map.values().iterator(); iter.hasNext();)
352       {
353         AbstractRequest request = (AbstractRequest) iter.next();
354         if (transactionId == request.getTransactionId())
355           return true;
356       }
357     }
358
359     return false;
360   }
361
362   /**
363    * Shutdown this thread and wait for its completion (the thread will try to
364    * end asap and skip work if possible).
365    */

366   public synchronized void shutdown()
367   {
368     notifyAll();
369     try
370     {
371       this.join();
372     }
373     catch (InterruptedException JavaDoc e)
374     {
375       logger
376           .warn("Controller cleanup thread may not have completed before it was terminated");
377     }
378   }
379
380   private void closeRemainingPersistentConnections(Long JavaDoc controllerId)
381   {
382     List JavaDoc persistentConnectionsRecovered = dvdb
383         .getControllerPersistentConnectionsRecovered(controllerId);
384     for (Iterator JavaDoc iter = persistentConnectionsToRecover.iterator(); iter
385         .hasNext();)
386     {
387       Long JavaDoc lConnectionId = (Long JavaDoc) iter.next();
388
389       if ((persistentConnectionsRecovered == null)
390           || !persistentConnectionsRecovered.contains(lConnectionId))
391       {
392         if (logger.isInfoEnabled())
393           logger.info("Closing persistent connection " + lConnectionId
394               + " started by dead controller " + failedControllerId
395               + " since client did not ask for failover");
396
397         drm.closePersistentConnection(lConnectionId);
398       }
399     }
400   }
401
402   /**
403    * Update the transactionsToRecover and persistentConnectionsToRecover lists
404    * with the requests found in the HashMap that are matching our failed
405    * controller id.
406    *
407    * @param map the map to parse
408    */

409   private void getTransactionAndPersistentConnectionsFromRequests(Map JavaDoc map)
410   {
411     synchronized (map)
412     {
413       for (Iterator JavaDoc iter = map.keySet().iterator(); iter.hasNext();)
414       {
415         Long JavaDoc lTid = (Long JavaDoc) iter.next();
416         if ((lTid.longValue() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId)
417         { // Request id matches the failed controller
418
AbstractRequest request = (AbstractRequest) map.get(lTid);
419           if (!request.isAutoCommit())
420           {
421             /*
422              * Re-check transaction id in case this was already a failover from
423              * another controller
424              */

425             if ((request.getTransactionId() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId)
426             {
427               Long JavaDoc tidLong = new Long JavaDoc(request.getTransactionId());
428               if (!transactionsToRecover.contains(tidLong))
429               {
430                 transactionsToRecover.add(tidLong);
431               }
432             }
433           }
434           if (request.isPersistentConnection())
435           {
436             /*
437              * Re-check persistent connection id in case this was a failover
438              * from another controller
439              */

440             Long JavaDoc connIdLong = new Long JavaDoc(request.getPersistentConnectionId());
441             if ((request.getPersistentConnectionId() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId)
442               if (!persistentConnectionsToRecover.contains(connIdLong))
443               {
444                 persistentConnectionsToRecover.add(connIdLong);
445               }
446           }
447         }
448       }
449     }
450   }
451
452   private List JavaDoc parsePersistentConnections(Map JavaDoc map)
453   {
454     LinkedList JavaDoc result = new LinkedList JavaDoc();
455     synchronized (map)
456     {
457       for (Iterator JavaDoc iter = map.keySet().iterator(); iter.hasNext();)
458       {
459         Long JavaDoc persistentConnectionId = (Long JavaDoc) iter.next();
460         if ((persistentConnectionId.longValue() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId)
461         { // Request id matches the failed controller
462
result.add(persistentConnectionId);
463         }
464       }
465       return result;
466     }
467   }
468
469   /**
470    * Parse the list containing transaction metadata and return a list containing
471    * all transaction ids matching the controllerId this thread is taking care
472    * of.
473    *
474    * @param list transaction metadata list to parse
475    * @return sublist containing ids matching failedControllerId
476    */

477   private List JavaDoc parseTransactionMetadataListForControllerId(Collection JavaDoc list)
478   {
479     LinkedList JavaDoc result = new LinkedList JavaDoc();
480     synchronized (list)
481     {
482       boolean retry = true;
483       while (retry)
484       {
485         try
486         {
487           for (Iterator JavaDoc iter = list.iterator(); iter.hasNext();)
488           {
489             TransactionMetaData tm = (TransactionMetaData) iter.next();
490             if ((tm.getTransactionId() & DistributedRequestManager.CONTROLLER_ID_BIT_MASK) == failedControllerId)
491               result.addLast(new Long JavaDoc(tm.getTransactionId()));
492           }
493
494           retry = false;
495         }
496         catch (ConcurrentModificationException JavaDoc e)
497         {
498           // TODO: handle exception
499
}
500       }
501     }
502     return result;
503   }
504
505 }
506
Popular Tags