KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > ResourceManager


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: ResourceManager.java,v 1.2 2005/03/18 03:58:39 tanderson Exp $
44  */

45 package org.exolab.jms.messagemgr;
46
47 import java.io.File JavaDoc;
48 import java.io.FilenameFilter JavaDoc;
49 import java.sql.Connection JavaDoc;
50 import java.util.Comparator JavaDoc;
51 import java.util.HashMap JavaDoc;
52 import java.util.Iterator JavaDoc;
53 import java.util.LinkedList JavaDoc;
54 import java.util.TreeSet JavaDoc;
55 import java.util.Vector JavaDoc;
56
57 import javax.jms.JMSException JavaDoc;
58 import javax.transaction.xa.XAException JavaDoc;
59 import javax.transaction.xa.XAResource JavaDoc;
60 import javax.transaction.xa.Xid JavaDoc;
61
62 import org.apache.commons.logging.Log;
63 import org.apache.commons.logging.LogFactory;
64
65 import org.exolab.jms.service.BasicService;
66 import org.exolab.jms.service.ServiceException;
67 import org.exolab.jms.service.ServiceState;
68 import org.exolab.jms.common.uuid.UUID;
69 import org.exolab.jms.message.MessageImpl;
70 import org.exolab.jms.persistence.DatabaseService;
71 import org.exolab.jms.persistence.PersistenceException;
72 import org.exolab.jms.persistence.SQLHelper;
73 import org.exolab.jms.tranlog.ExternalXid;
74 import org.exolab.jms.tranlog.StateTransactionLogEntry;
75 import org.exolab.jms.tranlog.TransactionLog;
76 import org.exolab.jms.tranlog.TransactionLogException;
77 import org.exolab.jms.tranlog.TransactionState;
78 import org.exolab.jms.tranlog.DataTransactionLogEntry;
79
80
81 /**
82  * The resource manager provides XA support for the JMS Server.
83  * <p>
84  * The resource manager is responsible for managing the various transaction
85  * identifiers and managing the association between transaction ids and
86  * connections.
87  * <p>
88  * The resource manager will store the global XID's and their state in the
89  * database for recovery purposes.
90  * <p>
91  * Messages that arrive, and are associated with an XID are not processed
92  * through the {@link MessageMgr}. Instead they are routed to this resource
93  * managers where they are cached until the associated XID is committed or
94  * rolled back. If the transaction is successfully committed, through the 2PC
95  * protocol the messages will pass through the system.
96  * <p>
97  * Similarly, messages that are sent to consumers, either synchronously or
98  * asynchronously are also cached by the resource manager until the global
99  * transaction completes.
100  * <p>
101  * On startup the resource manager will read all incomplete transactions, which
102  * are incompleted into memory. It will then process trnasactions that have
103  * timed out.
104  * <p>
105  * The transaction manager will call the {@link #recover} method and obtain a
106  * list of incomplete transaction for the purpose of completing them where
107  * possible.
108  *
109  * @version $Revision: 1.2 $ $Date: 2005/03/18 03:58:39 $
110  * @author <a HREF="mailto:jima@intalio.com">Jim Alateras</a>
111  */

112 public class ResourceManager
113     extends BasicService {
114
115     /**
116      * The name of the service
117      */

118     private final static String JavaDoc RM_SERVICE_NAME = "XAResourceManager";
119
120     /**
121      * The prefix used for all transaction log files, which are created and
122      * managed by the {@link TransactionLog}
123      */

124     private final static String JavaDoc RM_LOGFILE_PREFIX = "ojmsrm";
125
126     /**
127      * The extension for all transaction log files
128      */

129     public final static String JavaDoc RM_LOGFILE_EXTENSION = ".log";
130
131     /**
132      * This is used to indicate the garbage collection has been disabled and
133      * that the client will take responsibility for all aspects of log file
134      * management. This is useful in situations where the client wants to
135      * archive the transaction log files
136      * <p>
137      * This is the default mode for GC.
138      */

139     public static final int GC_DISABLED = 0;
140
141     /**
142      * Synchronous gabrage collection is used to remove processed log files
143      * when the last trnasaction, in that log file, has been successfully
144      * processed. This is more efficient means since the log files does not
145      * need to be scanned asynchronously to determine whether all the
146      * transactions have been processed.
147      */

148     public static final int GC_SYNCHRONOUS = 1;
149
150     /**
151      * Asynchronous garbage collection is used to remove processed log files
152      * asynchronous (i.e in a different thread context). This is rather
153      * expensive since it must manually scan each log file and determine
154      * whether all transactions, in that file, have been closed. If this is
155      * the case then it will remove the log file.
156      */

157     public static final int GC_ASYNCHRONOUS = 2;
158
159     /**
160      * Maintains a singleton instance of the gc service
161      */

162     private static ResourceManager _instance = null;
163
164     /**
165      * Used to synchronize the creation of the transaction manager
166      */

167     private static final Object JavaDoc _initializer = new Object JavaDoc();
168
169     /**
170      * This is the maximum size, in bytes, of each transaction log file. The
171      * value can be overriden by the user
172      */

173     private int _logFileSize = 1000000;
174
175     /**
176      * Maintains a collection of transaction log files currently in use by this
177      * resource manager
178      */

179     private TreeSet JavaDoc _logs = new TreeSet JavaDoc(new TranLogFileComparator());
180
181     /**
182      * Maintain a mapping between the TRID (transaction id and the log file it
183      * is associated with.
184      */

185     private HashMap JavaDoc _tridToLogCache = new HashMap JavaDoc();
186
187     /**
188      * Maintain a list of open TRIDs for a particular {@link TransactionLog}
189      */

190     private HashMap JavaDoc _logToTridCache = new HashMap JavaDoc();
191
192     /**
193      * This attribute is used to synchronize the modifications to the _tridToLog
194      * _logToTrid attributes
195      */

196     private final Object JavaDoc _cacheLock = new Object JavaDoc();
197
198     /**
199      * This maintains a cache of all open transactions and the corresponding
200      * data. The key is the transaction identifier and the object is a LinkedList
201      * transaction entries, which includes both state and data
202      */

203     private HashMap JavaDoc _activeTransactions = new HashMap JavaDoc();
204
205     /**
206      * The directory where the log files are stored. This can be set by the
207      * client
208      */

209     private String JavaDoc _logDirectory = ".";
210
211     /**
212      * This is the number of the last log file created by the ResourceManager
213      */

214     private long _lastLogNumber = 0;
215
216     /**
217      * The expiry time for transaction associated with this resource manager.
218      * This will either be configured or passed in with the transaction context
219      * The value is specified in seconds.
220      */

221     private int _txExpiryTime = 120;
222
223     /**
224      * This attribute caches the garbage collection mode for the resouce
225      * managers. Valid values are specified by the GC_* constants.
226      * <p>
227      * By default garbage collection is disabled.
228      */

229     private int _gcMode = GC_SYNCHRONOUS;
230
231     /**
232      * This is the id associated with this resource...need to work out who
233      * or what sets this.
234      */

235     private String JavaDoc _rid = UUID.next();
236
237     /**
238      * The logger
239      */

240     private static final Log _log = LogFactory.getLog(ResourceManager.class);
241
242
243     /**
244      * Return the singleton instance of the ResourceManager
245      *
246      * @return ResourceManager
247      * @throws ResourceManagerException
248      */

249     public static ResourceManager instance()
250         throws ResourceManagerException {
251         if (_instance == null) {
252             synchronized (_initializer) {
253                 // we need to check again if multiple threads
254
// have blocked on the creation of the singleton
255
if (_instance == null) {
256                     _instance = new ResourceManager();
257                 }
258             }
259         }
260
261         return _instance;
262     }
263
264     /**
265      * Construct a resource manager using the default directory for its log
266      * files.
267      * <p>
268      * If there is a problem constructing this instances then throw a
269      * {@link ResourceManagerException} exception.
270      *
271      * @throws ResourceManagerException
272      */

273     private ResourceManager()
274         throws ResourceManagerException {
275
276         // build the list of existing log files.
277
this("./logs");
278     }
279
280     /**
281      * Construct a resource manager using the specified directory, which
282      * must already exist. If the directory does not exist or there is
283      * no permisssion to access it then throw a ResourceManagerException
284      *
285      * @param dir - the base directory
286      * @throws ResourceManagerException
287      */

288     public ResourceManager(String JavaDoc dir)
289         throws ResourceManagerException {
290         super(RM_SERVICE_NAME);
291         _logDirectory = dir;
292         File JavaDoc file = new File JavaDoc(dir);
293         if ((!file.exists()) ||
294             (!file.isDirectory())) {
295             throw new ResourceManagerException(dir +
296                 " does not exist or is not a directory");
297         }
298
299         // build the list of existing log files.
300
buildLogFileList();
301
302         // recover te list of log files
303
recover();
304     }
305
306     /**
307      * Set the log directory. This is the directory where all the log files
308      * are stored
309      *
310      * @param dir - the name of the directory (absolute or relative)
311      * @throws IllegalArgumentException if the string is not a directory
312      */

313     public void setLogDirectory(String JavaDoc dir)
314         throws IllegalArgumentException JavaDoc {
315         if (!(new File JavaDoc(dir)).isDirectory()) {
316             throw new IllegalArgumentException JavaDoc(dir + " is not a directory");
317         } else {
318             _logDirectory = dir;
319         }
320     }
321
322     /**
323      * Retrieve the name of the log directory
324      *
325      * @return String - log dir name
326      */

327     public String JavaDoc getLogDirectory() {
328         return _logDirectory;
329     }
330
331     /**
332      * Set the maximum size of each log file. When this size is reached a
333      * new log file is created. The size of the log file can be changed
334      * dynamically during runtime. If it is not specified it will default
335      * to 1MB.
336      * <p>
337      * The size is specified in bytes
338      *
339      * @param size - the max size of each log file
340      */

341     public void setLogFileSize(int size) {
342         _logFileSize = size;
343     }
344
345     /**
346      * Return the maximum size that each log file can grow too.
347      *
348      * @return int - log file size
349      */

350     public int getLogFileSize() {
351         return _logFileSize;
352     }
353
354     /**
355      * Set the GC mode for the resource manager. Valid values are GC_SYNCHRONOUS,
356      * GC_ASYNCHRONOUS and GC_DISABLED.
357      * <p>
358      * @param mode - one of GC_*
359      * @return boolean - if the specified mode has been correctly set
360      */

361     public boolean setGCMode(int mode) {
362         boolean result = false;
363
364         if ((mode == GC_DISABLED) ||
365             (mode == GC_SYNCHRONOUS) ||
366             (mode == GC_ASYNCHRONOUS)) {
367             _gcMode = mode;
368             result = true;
369         }
370
371         return result;
372     }
373
374     /**
375      * Return the garbage collection mode for the resource manager
376      *
377      * @return int
378      */

379     public int getGCMode() {
380         return _gcMode;
381     }
382
383     /**
384      * Check whether garbage collection has been disabled
385      *
386      * @return boolean - true if gc is disabled
387      */

388     public boolean gcDisabled() {
389         return (_gcMode == GC_DISABLED) ? true : false;
390     }
391
392     /**
393      * Log this published message so that it can be passed through the system
394      * when the associated global transaction commits.
395      *
396      * @param xid - the global transaction identity
397      * @param message - the message published
398      * @throws TransactionLogException - error adding the entry
399      * @throws ResourceManagerException - error getting the trnasaction log
400      * @throws JMSException - if there is an issue with prep'ing the message
401      */

402     public synchronized void logPublishedMessage(Xid JavaDoc xid, MessageImpl message)
403         throws TransactionLogException, ResourceManagerException, JMSException JavaDoc {
404         MessageMgr.instance().prepare(message);
405         logTransactionData(new ExternalXid(xid), _rid,
406             createPublishedMessageWrapper(message));
407     }
408
409     /**
410      * Log that this message handle was sent to the consumer within the specified
411      * global transaction identity. The message will be acknowledged when the
412      * global transaction commits. Alternatively, if the global transaction is
413      * rolled back the message handle will be returned to the destination
414      *
415      * @param xid the global transaction identity
416      * @param id the consumer receiving this message
417      * @param handle - the handle of the message received
418      * @throws TransactionLogException - error adding the entry
419      * @throws ResourceManagerException - error getting the transaction log
420      */

421     public synchronized void logReceivedMessage(Xid JavaDoc xid, long id, MessageHandle handle)
422         throws TransactionLogException, ResourceManagerException {
423         logTransactionData(new ExternalXid(xid), _rid,
424             createReceivedMessageWrapper(id, handle));
425     }
426
427     /**
428      * Add an {@link StateTransactionLogEntry} using the specified txid,
429      * rid and state
430      *
431      * @param xid - the transaction identifier
432      * @param state - the transaction log state
433      * @throws TransactionLogException - error adding the entry
434      * @throws ResourceManagerException - error getting the trnasaction log
435      */

436     public synchronized void logTransactionState(Xid JavaDoc xid, TransactionState state)
437         throws TransactionLogException, ResourceManagerException {
438         ExternalXid txid = new ExternalXid(xid);
439         switch (state.getOrd()) {
440             case TransactionState.OPENED_ORD:
441                 {
442                     TransactionLog log = getCurrentTransactionLog();
443                     addTridLogEntry(txid, log);
444                     log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
445                         state);
446
447                     // cache the transaction state
448
_activeTransactions.put(txid, new LinkedList JavaDoc());
449                 }
450                 break;
451
452             case TransactionState.PREPARED_ORD:
453                 // cache the transaction state
454
LinkedList JavaDoc list = (LinkedList JavaDoc) _activeTransactions.get(txid);
455                 if (list != null) {
456                     list.add(state);
457                 } else {
458                     throw new ResourceManagerException("Trasaction " + txid +
459                         " is not active.");
460                 }
461                 break;
462
463             case TransactionState.CLOSED_ORD:
464                 {
465                     TransactionLog log = getTransactionLog(txid);
466                     log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
467                         state);
468                     removeTridLogEntry(txid, log);
469
470                     // check whether this log has anymore open transactions
471
synchronized (_cacheLock) {
472                         if ((_logToTridCache.get(log) == null) &&
473                             (!isCurrentTransactionLog(log))) {
474                             log.close();
475
476                             // now check if gc mode is GC_SYNCHRONOUS. If it is
477
// remove the log file
478
if (_gcMode == GC_SYNCHRONOUS) {
479                                 try {
480                                     log.destroy();
481                                 } catch (TransactionLogException exception) {
482                                     exception.printStackTrace();
483                                 }
484                             }
485                         }
486                     }
487
488                     // we also want to remove this entry from the list
489
// of active transactions
490
_activeTransactions.remove(txid);
491                 }
492                 break;
493
494             default:
495                 throw new ResourceManagerException("Cannot process tx state " +
496                     state);
497         }
498     }
499
500     /**
501      * Add an {@link DataTransactionLogEntry} using the specified txid,
502      * rid and data
503      *
504      * @param txid - the transaction identifier
505      * @param rid - the resource identifier
506      * @throws TransactionLogException - error adding the entry
507      * @throws ResourceManagerException - error getting the trnasaction log
508      */

509     synchronized void logTransactionData(ExternalXid txid, String JavaDoc rid,
510                                          Object JavaDoc data)
511         throws ResourceManagerException, TransactionLogException {
512         getTransactionLog(txid).logTransactionData(txid, _txExpiryTime * 1000,
513             rid, data);
514
515         // we also want to add this to the transaction data for that
516
// txid
517
LinkedList JavaDoc list = (LinkedList JavaDoc) _activeTransactions.get(txid);
518         if (list != null) {
519             list.add(data);
520         } else {
521             throw new ResourceManagerException("Trasaction " + txid +
522                 " is not active.");
523         }
524     }
525
526     /**
527      * This is the entry point for the garbage collection callback. It scans
528      * through the each transaction log file and determines whether it can
529      * be garbage collected. If it can then it simply destroys the corresponding
530      * TransactionLog.
531      */

532     public void garbageCollect() {
533         try {
534             int gcfiles = 0;
535
536             // if there are no transaction log files then return
537
if (_logs.size() == 0) {
538                 return;
539             }
540
541             TreeSet JavaDoc copy = null;
542             synchronized (_logs) {
543                 copy = new TreeSet JavaDoc(_logs);
544             }
545
546             // remove the current log file, since this is likely to be the
547
// current log file
548
copy.remove(_logs.last());
549
550             // process each of the remaining log files
551
while (copy.size() > 0) {
552                 TransactionLog log = (TransactionLog) copy.first();
553                 copy.remove(log);
554                 if (log.canGarbageCollect()) {
555                     // destroy the log
556
log.destroy();
557
558                     // remove it from the log cache
559
synchronized (_logs) {
560                         _logs.remove(log);
561                     }
562
563                     // increment the number of garbafe collected files
564
++gcfiles;
565                 }
566             }
567
568             // print an informative message
569
_log.info("[RMGC] Collected " + gcfiles + " files.");
570         } catch (Exception JavaDoc exception) {
571             exception.printStackTrace();
572         }
573     }
574
575     /**
576      * Ensure that a transaction with the specified xid is currently active.
577      * If this is the case then commit the transaction based onb the value
578      * of the onePhase flag.
579      * <p>
580      * This will have the effect of passing all messages through
581      *
582      * @param id - the xa transaction identity
583      * @param onePhase - treu if it is a one phase commit
584      * @throws XAException - if there is a problem completing the call
585      */

586     public synchronized void commit(Xid JavaDoc id, boolean onePhase)
587         throws XAException JavaDoc {
588         // check that the xid is not null
589
if (id == null) {
590             throw new XAException JavaDoc(XAException.XAER_NOTA);
591         }
592
593         // covert to our internal representation of an xid
594
ExternalXid xid = new ExternalXid(id);
595
596         // check to see that the transaction is active and open. We should
597
// not be allowed to commit a committed transaction.
598
if (!isTransactionActive(xid)) {
599             throw new XAException JavaDoc(XAException.XAER_PROTO);
600         }
601
602         // process all the messages associated with this global transaction
603
// If a message has been published then sent it to the message mgr
604
// for processing. If a message has been consumed then remove it
605
// from the list of unconsumed messages.
606
Connection JavaDoc connection = null;
607         try {
608             // get a connection to the database
609
connection = DatabaseService.getConnection();
610
611             // retrieve a list of recrods for the specified global transaction
612
// and process them. Ignore the state records and only process the
613
// data records, which are of type TransacitonalObjectWrapper.
614
Object JavaDoc[] records = getTransactionRecords(xid, _rid);
615             for (int index = 0; index < records.length; index++) {
616                 if (records[index] instanceof TransactionalObjectWrapper) {
617                     TransactionalObjectWrapper wrapper =
618                         (TransactionalObjectWrapper) records[index];
619                     if (wrapper.isPublishedMessage()) {
620                         // send the published message to the message manager
621
MessageMgr.instance().add(connection,
622                             (MessageImpl) wrapper.getObject());
623
624                     } else if (wrapper.isReceivedMessage()) {
625                         // if it is a received message handle then simply
626
// delete it and mark it as acknowledged
627
MessageHandle handle = ((ReceivedMessageWrapper) (wrapper)).getMessageHandle();
628                         if (handle.isPersistent()) {
629                             handle.destroy(connection);
630                         } else {
631                             handle.destroy();
632                         }
633                     }
634                 } else {
635                     // ignore since it is a state records.
636
}
637             }
638             connection.commit();
639         } catch (PersistenceException exception) {
640             SQLHelper.rollback(connection);
641             throw new XAException JavaDoc("Failed in ResourceManager.commit : " +
642                 exception.toString());
643         } catch (Exception JavaDoc exception) {
644             throw new XAException JavaDoc("Failed in ResourceManager.commit : " +
645                 exception.toString());
646         } finally {
647             SQLHelper.close(connection);
648
649             // and now mark the transaction as closed
650
try {
651                 logTransactionState(xid, TransactionState.CLOSED);
652             } catch (Exception JavaDoc exception) {
653                 throw new XAException JavaDoc("Error processing commit : " + exception);
654             }
655         }
656     }
657
658     /**
659      * Ends the work performed on behalf of a transaction branch. The resource
660      * manager disassociates the XA resource from the transaction branch
661      * specified and let the transaction be completedCommits an XA transaction
662      * that is in progress.
663      *
664      * @param id - the xa transaction identity
665      * @param flags - one of TMSUCCESS, TMFAIL, or TMSUSPEND
666      * @throws XAException - if there is a problem completing the call
667      */

668     public synchronized void end(Xid JavaDoc id, int flags)
669         throws XAException JavaDoc {
670         //check the xid is not null
671
if (id == null) {
672             throw new XAException JavaDoc(XAException.XAER_NOTA);
673         }
674
675         // covert to our internal representation of an xid
676
ExternalXid xid = new ExternalXid(id);
677
678         // check that the flags are valid for this method
679
if ((flags != XAResource.TMSUSPEND) ||
680             (flags != XAResource.TMSUCCESS) ||
681             (flags != XAResource.TMFAIL)) {
682             throw new XAException JavaDoc(XAException.XAER_PROTO);
683         }
684
685         switch (flags) {
686             case XAResource.TMFAIL:
687                 // check that the transaction exists
688
if (!isTransactionActive(xid)) {
689                     throw new XAException JavaDoc(XAException.XAER_PROTO);
690                 }
691
692                 // do not process that associated data, simply rollback
693
rollback(xid);
694                 break;
695
696             case XAResource.TMSUSPEND:
697                 // check that the transaction is opened
698
if (!isTransactionActive(xid)) {
699                     throw new XAException JavaDoc(XAException.XAER_PROTO);
700                 }
701                 break;
702
703             case XAResource.TMSUCCESS:
704                 // nothing to do here but check that the resource manager is
705
// in a consistent state wrt to this xid. The xid should not
706
// be active if it received the commit, forget etc.
707
if (isTransactionActive(xid)) {
708                     throw new XAException JavaDoc(XAException.XAER_PROTO);
709                 }
710                 break;
711         }
712     }
713
714     /**
715      * Tell the resource manager to forget about a heuristically completed
716      * transaction branch.
717      *
718      * @param id - the xa transaction identity
719      * @throws XAException - if there is a problem completing the call
720      */

721     public synchronized void forget(Xid JavaDoc id)
722         throws XAException JavaDoc {
723         //check the xid is not null
724
if (id == null) {
725             throw new XAException JavaDoc(XAException.XAER_NOTA);
726         }
727
728         // covert to our internal representation of an xid
729
ExternalXid xid = new ExternalXid(id);
730
731         // check to see that the xid actually exists
732
if (!isTransactionActive(xid)) {
733             throw new XAException JavaDoc(XAException.XAER_PROTO);
734         }
735
736         // call rollback to complete the work
737
rollback(id);
738     }
739
740     /**
741      * Return the transaction timeout for this instance of the resource
742      * manager.
743      *
744      * @return int - the timeout in seconds
745      * @throws XAException - if there is a problem completing the call
746      */

747     public synchronized int getTransactionTimeout()
748         throws XAException JavaDoc {
749         return _txExpiryTime;
750     }
751
752     /**
753      * Ask the resource manager to prepare for a transaction commit of the
754      * transaction specified in xid
755      *
756      * @param xares
757      * @return int - XA_RDONLY or XA_OK
758      * @throws XAException - if there is a problem completing the call
759      */

760     public synchronized boolean isSameRM(XAResource JavaDoc xares)
761         throws XAException JavaDoc {
762         boolean result = false;
763
764         if ((xares == this) ||
765             ((xares instanceof ResourceManager) &&
766             (((ResourceManager) xares)._rid.equals(_rid)))) {
767             result = true;
768         }
769
770         return result;
771     }
772
773     /**
774      * Obtain a list of prepared transaction branches from a resource manager.
775      * The transaction manager calls this method during recovery to obtain the
776      * list of transaction branches that are currently in prepared or
777      * heuristically completed states.
778      *
779      * @throws XAException - if there is a problem completing the call
780      */

781     public synchronized int prepare(Xid JavaDoc id)
782         throws XAException JavaDoc {
783         //check the xid is not null
784
if (id == null) {
785             throw new XAException JavaDoc(XAException.XAER_NOTA);
786         }
787
788         // covert to our internal representation of an xid
789
ExternalXid xid = new ExternalXid(id);
790
791         // check to see that the xid actually exists
792
if (!isTransactionActive(xid)) {
793             throw new XAException JavaDoc(XAException.XAER_PROTO);
794         }
795
796         // can a prepare for the same resource occur multiple times
797
// ????
798

799         try {
800             logTransactionState(xid, TransactionState.PREPARED);
801         } catch (Exception JavaDoc exception) {
802             throw new XAException JavaDoc("Error processing prepare : " + exception);
803         }
804
805         return XAResource.XA_OK;
806     }
807
808     /**
809      * Inform the resource manager to roll back work done on behalf of a
810      * transaction branch
811      *
812      * @throws XAException - if there is a problem completing the call
813      */

814     public synchronized Xid JavaDoc[] recover(int flag)
815         throws XAException JavaDoc {
816
817         Xid JavaDoc[] result = new Xid JavaDoc[0];
818
819         if ((flag == XAResource.TMNOFLAGS) ||
820             (flag == XAResource.TMSTARTRSCAN) ||
821             (flag == XAResource.TMENDRSCAN)) {
822             LinkedList JavaDoc xids = new LinkedList JavaDoc();
823             Iterator JavaDoc iter = _activeTransactions.keySet().iterator();
824             while (iter.hasNext()) {
825                 Xid JavaDoc xid = (Xid JavaDoc) iter.next();
826                 LinkedList JavaDoc list = (LinkedList JavaDoc) _activeTransactions.get(xid);
827                 if (list.size() > 1) {
828                     // need at least a start in the chain.
829
Object JavaDoc last = list.getLast();
830                     if ((last instanceof StateTransactionLogEntry) &&
831                         (((StateTransactionLogEntry) last).getState().isPrepared())) {
832                         xids.add(xid);
833                     }
834                 }
835
836             }
837             result = (Xid JavaDoc[]) xids.toArray();
838         }
839
840         return result;
841     }
842
843     /**
844      * Set the current transaction timeout value for this XAResource instance.
845      *
846      * @throws XAException - if there is a problem completing the call
847      */

848     public synchronized void rollback(Xid JavaDoc id)
849         throws XAException JavaDoc {
850         //check the xid is not null
851
if (id == null) {
852             throw new XAException JavaDoc(XAException.XAER_NOTA);
853         }
854
855         // covert to our internal representation of an xid
856
ExternalXid xid = new ExternalXid(id);
857
858         // check to see that the xid actually exists
859
if (!isTransactionActive(xid)) {
860             throw new XAException JavaDoc(XAException.XAER_PROTO);
861         }
862
863         // process the data in that transaction. If it was a published message
864
// then drop it. If it was a consumed message then return it back to
865
// the destination.
866
Connection JavaDoc connection = null;
867         try {
868             // get a connection to the database
869
connection = DatabaseService.getConnection();
870
871             // retrieve a list of recrods for the specified global transaction
872
// and process them. Ignore the state records and only process the
873
// data records, which are of type TransacitonalObjectWrapper.
874
Object JavaDoc[] records = getTransactionRecords(xid, _rid);
875             for (int index = 0; index < records.length; index++) {
876                 if (records[index] instanceof TransactionalObjectWrapper) {
877                     TransactionalObjectWrapper wrapper =
878                         (TransactionalObjectWrapper) records[index];
879                     if (wrapper.isPublishedMessage()) {
880                         // we don't need to process these messages since the
881
// global transaction has been rolled back.
882
} else if (wrapper.isReceivedMessage()) {
883                         ReceivedMessageWrapper rmsg_wrapper =
884                             (ReceivedMessageWrapper) wrapper;
885                         MessageHandle handle =
886                                 (MessageHandle) rmsg_wrapper.getObject();
887
888                         DestinationManager mgr = DestinationManager.instance();
889                         DestinationCache cache =
890                                 mgr.getDestinationCache(handle.getDestination());
891                         cache.returnMessageHandle(handle);
892                     }
893                 } else {
894                     // ignore since it is a state records.
895
}
896             }
897
898             connection.commit();
899         } catch (PersistenceException exception) {
900             if (connection != null) {
901                 try {
902                     connection.rollback();
903                 } catch (Exception JavaDoc nested) {
904                     // ignore
905
}
906             }
907             throw new XAException JavaDoc("Failed in ResourceManager.rollback : " +
908                 exception.toString());
909         } catch (Exception JavaDoc exception) {
910             throw new XAException JavaDoc("Failed in ResourceManager.rollback : " +
911                 exception.toString());
912         } finally {
913             if (connection != null) {
914                 try {
915                     connection.close();
916                 } catch (Exception JavaDoc nested) {
917                     // ignore
918
}
919             }
920
921             // and now mark the transaction as closed
922
try {
923                 logTransactionState(xid, TransactionState.CLOSED);
924             } catch (Exception JavaDoc exception) {
925                 throw new XAException JavaDoc("Error processing rollback : " + exception);
926             }
927         }
928     }
929
930     /**
931      * Start work on behalf of a transaction branch specified in xid If TMJOIN
932      * is specified, the start is for joining a transaction previously seen by
933      * the resource manager
934      *
935      * @throws XAException - if there is a problem completing the call
936      */

937     public synchronized boolean setTransactionTimeout(int seconds)
938         throws XAException JavaDoc {
939         _txExpiryTime = seconds;
940         return true;
941     }
942
943     // implementation of XAResource.start
944
public synchronized void start(Xid JavaDoc id, int flags)
945         throws XAException JavaDoc {
946
947         //check the xid is not null
948
if (id == null) {
949             throw new XAException JavaDoc(XAException.XAER_NOTA);
950         }
951
952         // covert to our internal representation of an xid
953
ExternalXid xid = new ExternalXid(id);
954
955         // check that the flags are valid for this method
956
if ((flags != XAResource.TMNOFLAGS) ||
957             (flags != XAResource.TMJOIN) ||
958             (flags != XAResource.TMRESUME)) {
959             throw new XAException JavaDoc(XAException.XAER_PROTO);
960         }
961
962         switch (flags) {
963             case XAResource.TMNOFLAGS:
964                 // check to see that the xid does not already exist
965
if (isTransactionActive(xid)) {
966                     throw new XAException JavaDoc(XAException.XAER_DUPID);
967                 }
968
969                 // otherwise log the start of the transaction
970
try {
971                     logTransactionState(xid, TransactionState.OPENED);
972                 } catch (Exception JavaDoc exception) {
973                     throw new XAException JavaDoc("Error processing start : " + exception);
974                 }
975                 break;
976
977             case XAResource.TMJOIN:
978             case XAResource.TMRESUME:
979                 // joining a transaction previously seen by the resource
980
// manager
981
if (!isTransactionActive(xid)) {
982                     throw new XAException JavaDoc(XAException.XAER_PROTO);
983                 }
984                 break;
985         }
986     }
987
988     // override ServiceManager.start
989
public void start()
990         throws ServiceException {
991         this.setState(ServiceState.RUNNING);
992     }
993
994     // override ServiceManager.stop
995
public void stop()
996         throws ServiceException {
997         this.setState(ServiceState.STOPPED);
998     }
999
1000    // override ServiceManager.run
1001
public void run() {
1002        // do nothing
1003
}
1004
1005    /**
1006     * Return the resource manager identity
1007     *
1008     * @return the resource manager identity
1009     */

1010    public String JavaDoc getResourceManagerId() {
1011        return _rid;
1012    }
1013
1014    /**
1015     * Create the next {@link TransactionLog} and add it to the list of
1016     * managed transaction logs.
1017     * <p>
1018     * The method will throw ResourceManagerException if there is a
1019     * problem completing the request.
1020     *
1021     * @throws ResourceManagerException
1022     */

1023    protected TransactionLog createNextTransactionLog()
1024        throws ResourceManagerException {
1025        TransactionLog newlog = null;
1026
1027        synchronized (_logs) {
1028            try {
1029                // get the last log number
1030
long last = 1;
1031                if (!_logs.isEmpty()) {
1032                    last = getSequenceNumber(((TransactionLog) _logs.last()).getName());
1033                }
1034
1035                // now that we have the last log number, increment it and use
1036
// it to build the name of the next log file.
1037
String JavaDoc name = _logDirectory + System.getProperty("file.separator") +
1038                    RM_LOGFILE_PREFIX + Long.toString(++last) + RM_LOGFILE_EXTENSION;
1039
1040                // create a transaction log and add it to the collection
1041
newlog = new TransactionLog(name, true);
1042                _logs.add(newlog);
1043            } catch (TransactionLogException exception) {
1044                throw new ResourceManagerException(
1045                    "Error in createNextTransactionLog " + exception);
1046            }
1047        }
1048
1049        return newlog;
1050    }
1051
1052    /**
1053     * Build a list of all log files in the specified log directory
1054     *
1055     * @throws IllegalArgumentException - if the directory does not exist.
1056     */

1057    protected void buildLogFileList() {
1058        File JavaDoc dir = new File JavaDoc(_logDirectory);
1059        if ((!dir.exists()) ||
1060            (!dir.isDirectory())) {
1061            throw new IllegalArgumentException JavaDoc(_logDirectory +
1062                " is not a directory");
1063        }
1064
1065        try {
1066            File JavaDoc[] list = dir.listFiles(new FilenameFilter JavaDoc() {
1067
1068                // implementation of FilenameFilter.accept
1069
public boolean accept(File JavaDoc dir, String JavaDoc name) {
1070                    boolean result = false;
1071
1072                    if ((name.startsWith(RM_LOGFILE_PREFIX)) &&
1073                        (name.endsWith(RM_LOGFILE_EXTENSION))) {
1074                        result = true;
1075                    }
1076
1077                    return result;
1078                }
1079            });
1080
1081            // add the files to the list
1082
synchronized (_logs) {
1083                for (int index = 0; index < list.length; index++) {
1084                    _logs.add(new TransactionLog(list[index].getPath(), false));
1085                }
1086            }
1087        } catch (Exception JavaDoc exception) {
1088            // replace this with the exception strategy
1089
exception.printStackTrace();
1090        }
1091
1092    }
1093
1094    /**
1095     * This method will process all the transaction logs, in the log diretory
1096     * and call recover on each of them.
1097     *
1098     * @throws ResourceManagerException - if there is a problem recovering
1099     */

1100    private synchronized void recover()
1101        throws ResourceManagerException {
1102        try {
1103            if (!_logs.isEmpty()) {
1104                Iterator JavaDoc iter = _logs.iterator();
1105                while (iter.hasNext()) {
1106                    TransactionLog log = (TransactionLog) iter.next();
1107                    HashMap JavaDoc records = log.recover();
1108                }
1109            }
1110        } catch (Exception JavaDoc exception) {
1111            throw new ResourceManagerException("Error in recover " +
1112                exception.toString());
1113        }
1114    }
1115
1116    /**
1117     * Retrieve the transaction log for the specified transaction id
1118     *
1119     * @param txid - the transaction identity
1120     * @return TransactionLog
1121     * @throws TransactionLogException - if there is tx log exception
1122     * @throws ResourceManagerException - if there is a resource problem.
1123     */

1124    private TransactionLog getTransactionLog(ExternalXid txid)
1125        throws TransactionLogException, ResourceManagerException {
1126        TransactionLog log = (TransactionLog) _tridToLogCache.get(txid);
1127        if (log == null) {
1128            log = getCurrentTransactionLog();
1129            addTridLogEntry(txid, log);
1130        }
1131
1132        return log;
1133    }
1134
1135    /**
1136     * Get the current transaction log. It will check the last transaction
1137     * log opened by the resource manager and determine whether there is
1138     * space enough to process another transaction.
1139     * <p>
1140     * If there is space enough then it will return that transaction,
1141     * otherwise it will create a new transaction log for the resource
1142     *
1143     * @return TransactionLog - the transaction log to use
1144     * @throws ResourceManagerException
1145     * @throws TransactionLogException
1146     */

1147    private TransactionLog getCurrentTransactionLog()
1148        throws TransactionLogException, ResourceManagerException {
1149        TransactionLog log = null;
1150
1151        synchronized (_logs) {
1152            if (_logs.size() > 0) {
1153                log = (TransactionLog) _logs.last();
1154            }
1155
1156            if ((log == null) ||
1157                (log.size() > _logFileSize)) {
1158                log = createNextTransactionLog();
1159            }
1160        }
1161
1162        return log;
1163    }
1164
1165    /**
1166     * Add an entry to the trid log cache table for the specified trid and
1167     * transaction log mapping.
1168     *
1169     * @param trid - the transaction identifier
1170     * @param log - the transaction log
1171     */

1172    private void addTridLogEntry(ExternalXid trid, TransactionLog log) {
1173        synchronized (_cacheLock) {
1174            // one to one relationship
1175
_tridToLogCache.put(trid, log);
1176
1177            // one to many relationship
1178
Vector JavaDoc trids = (Vector JavaDoc) _logToTridCache.get(log);
1179            if (trids == null) {
1180                trids = new Vector JavaDoc();
1181                _logToTridCache.put(log, trids);
1182            }
1183            trids.addElement(trid);
1184        }
1185    }
1186
1187    /**
1188     * Check whether the specified log is also the current log
1189     *
1190     * @param log - the log to check
1191     * @return boolean - true if it is
1192     */

1193    private boolean isCurrentTransactionLog(TransactionLog log) {
1194        boolean result = false;
1195
1196        if (_logs.size() > 0) {
1197            result = log.equals(_logs.last());
1198        }
1199
1200        return result;
1201    }
1202
1203    /**
1204     * Remove an entry to the trid log cache table for the specified trid and
1205     * transaction log mapping.
1206     *
1207     * @param trid - the transaction identifier
1208     * @param log - the transaction log
1209     */

1210    private void removeTridLogEntry(ExternalXid trid, TransactionLog log) {
1211        synchronized (_cacheLock) {
1212
1213            // one to one relationship
1214
_tridToLogCache.remove(trid);
1215
1216            // one to many relationship
1217
Vector JavaDoc trids = (Vector JavaDoc) _logToTridCache.get(log);
1218            if (trids != null) {
1219                trids.remove(trid);
1220                if (trids.size() == 0) {
1221                    _logToTridCache.remove(log);
1222                }
1223            }
1224        }
1225    }
1226
1227    /**
1228     * Return an arrya of records, both state and date, for the specified
1229     * global transaction
1230     *
1231     * @param xid - the global transaction id
1232     * @param rid - the resource id
1233     * @return Object[] - array of records
1234     */

1235    protected Object JavaDoc[] getTransactionRecords(ExternalXid xid, String JavaDoc rid) {
1236        Object JavaDoc[] records;
1237
1238        // we also want to add this to the transaction data for that
1239
// txid
1240
LinkedList JavaDoc list = (LinkedList JavaDoc) _activeTransactions.get(xid);
1241        if (list != null) {
1242            records = list.toArray();
1243        } else {
1244            records = new Object JavaDoc[0];
1245        }
1246
1247        return records;
1248    }
1249
1250
1251    /**
1252     * Return the sequence number of the file
1253     * files are associated with a unique number
1254     *
1255     * @param name - the file name to investigate
1256     * @return long - the transaction log number
1257     * @throws ResourceManagerException
1258     */

1259    protected long getSequenceNumber(String JavaDoc name)
1260        throws ResourceManagerException {
1261        int start = name.indexOf(RM_LOGFILE_PREFIX) +
1262            RM_LOGFILE_PREFIX.length();
1263        int end = name.indexOf(RM_LOGFILE_EXTENSION);
1264
1265        // the number must be between the start and end positions
1266
try {
1267            return Long.parseLong(name.substring(start, end));
1268        } catch (NumberFormatException JavaDoc exception) {
1269            throw new ResourceManagerException(
1270                "Invalid name assigned to resource manager file " + name);
1271        }
1272    }
1273
1274    /**
1275     * Return true if the specified transaction is active
1276     *
1277     * @param xid - the gobal transaction identifier
1278     */

1279    private synchronized boolean isTransactionActive(ExternalXid xid) {
1280        return _activeTransactions.containsKey(xid);
1281    }
1282
1283    /**
1284     * Dump the specified records to the screen
1285     */

1286    private void dumpRecovered(HashMap JavaDoc records) {
1287        Iterator JavaDoc iter = records.keySet().iterator();
1288        while (iter.hasNext()) {
1289            ExternalXid txid = (ExternalXid) iter.next();
1290            LinkedList JavaDoc list = (LinkedList JavaDoc) records.get(txid);
1291            Iterator JavaDoc oiter = list.iterator();
1292            while (oiter.hasNext()) {
1293                Object JavaDoc object = oiter.next();
1294                if (object instanceof StateTransactionLogEntry) {
1295                    System.err.println("Recovered [" + txid + "] Class " +
1296                        object.getClass().getName() + " [" +
1297                        ((StateTransactionLogEntry) object).getState().toString() + "]");
1298                } else {
1299                    System.err.println("Recovered [" + txid + "] Class " +
1300                        object.getClass().getName());
1301                }
1302            }
1303        }
1304    }
1305
1306
1307    /**
1308     * Helper and type-safe method for creating a wrapper object for published
1309     * messages
1310     *
1311     * @param message - the message published
1312     * @return PublishedMessageWrapper
1313     */

1314    private PublishedMessageWrapper createPublishedMessageWrapper(
1315        MessageImpl message) {
1316        return new PublishedMessageWrapper(message);
1317    }
1318
1319    /**
1320     * Helper and type-safe method for creating a wrapper object for received
1321     * messages
1322     *
1323     * @param id - the identity of the consumer receiving the message
1324     * @param handle - the handle of the message received
1325     * @return ReceivedMessageWrapper
1326     */

1327    private ReceivedMessageWrapper createReceivedMessageWrapper(
1328        long id, MessageHandle handle) {
1329        return new ReceivedMessageWrapper(id, handle);
1330    }
1331
1332    /**
1333     * This functor is used by various collections to order the transaction log
1334     * files created by this resource manager. The resource manager will create
1335     * log files with sequentially increasing numbers (i.e xxx01.log, xxx2.log
1336     */

1337    private class TranLogFileComparator
1338        implements Comparator JavaDoc {
1339
1340        // implementation of Comparator.comapre
1341
public int compare(Object JavaDoc o1, Object JavaDoc o2) {
1342            int result = -1;
1343
1344            try {
1345                if ((o1 instanceof TransactionLog) &&
1346                    (o2 instanceof TransactionLog)) {
1347                    long seq1 = getSequenceNumber(((TransactionLog) o1).getName());
1348                    long seq2 = getSequenceNumber(((TransactionLog) o2).getName());
1349
1350                    if (seq1 > seq2) {
1351                        result = 1;
1352                    } else if (seq1 < seq2) {
1353                        result = -1;
1354                    } else {
1355                        result = 0;
1356                    }
1357                } else {
1358                    throw new ClassCastException JavaDoc("o1 = " +
1359                        o1.getClass().getName() + " and o2 = " +
1360                        o2.getClass().getName());
1361                }
1362            } catch (Exception JavaDoc exception) {
1363                throw new RuntimeException JavaDoc("Error in ResourceManager.compare " +
1364                    exception.toString());
1365            }
1366
1367            return result;
1368        }
1369
1370        // implementation of Comparator.equals
1371
public boolean equals(Object JavaDoc obj) {
1372            if (obj instanceof TranLogFileComparator) {
1373                return true;
1374            }
1375
1376            return false;
1377        }
1378    }
1379
1380
1381    /**
1382     * This private member class is used to wrap the transactional object,
1383     * which for this particular resource manager is a published message or
1384     * a received message handle.
1385     */

1386    abstract private class TransactionalObjectWrapper {
1387
1388        /**
1389         * The transactional object instance
1390         */

1391        private Object JavaDoc _object;
1392
1393        /**
1394         * Create an instance of the wrapper using the type and the object
1395         *
1396         * @param object - the associated object
1397         */

1398        public TransactionalObjectWrapper(Object JavaDoc object) {
1399            _object = object;
1400        }
1401
1402        /**
1403         * Check whether the wrapper contains a published message. Note that a
1404         * published message has a {@link MessageImpl} a the transactional
1405         * object.
1406         *
1407         * @return boolean - true if it is
1408         */

1409        public boolean isPublishedMessage() {
1410            return this instanceof PublishedMessageWrapper;
1411        }
1412
1413        /**
1414         * Check whether the wrapper contains a received message handle. Note
1415         * that a received message contains a {@link MessageHandle} as the
1416         * transactional object.
1417         *
1418         * @return boolean - true if it does
1419         */

1420        public boolean isReceivedMessage() {
1421            return this instanceof ReceivedMessageWrapper;
1422        }
1423
1424        /**
1425         * Return the transaction object
1426         *
1427         * @return Object
1428         */

1429        public Object JavaDoc getObject() {
1430            return _object;
1431        }
1432
1433    }
1434
1435
1436    /**
1437     * This private member class is used to wrap a published message
1438     */

1439    private class PublishedMessageWrapper extends TransactionalObjectWrapper {
1440
1441        /**
1442         * Create an instance of the wrapper using the specified message
1443         *
1444         * @param message - the message to wrap
1445         */

1446        public PublishedMessageWrapper(MessageImpl message) {
1447            super(message);
1448        }
1449
1450        /**
1451         * Return an instance of the message object
1452         *
1453         * @return MessageImpl
1454         */

1455        public MessageImpl getMessage() {
1456            return (MessageImpl) super.getObject();
1457        }
1458    }
1459
1460
1461    /**
1462     * This private member class is used to wrap a received message
1463     */

1464    private class ReceivedMessageWrapper extends TransactionalObjectWrapper {
1465
1466        /**
1467         * Caches the id of the {@link ConsumerEndpoint} that is processed
1468         * this handle
1469         */

1470        private long _consumerId;
1471
1472        /**
1473         * Create an instance of the wrapper using the specified message
1474         *
1475         * @param id - the identity of the consumer endpoint
1476         * @param handle - the handle to the message
1477         */

1478        public ReceivedMessageWrapper(long id, MessageHandle handle) {
1479            super(handle);
1480            _consumerId = id;
1481        }
1482
1483        /**
1484         * Return a reference to the consumer identity
1485         *
1486         * @return String
1487         */

1488        public long getConsumerId() {
1489            return _consumerId;
1490        }
1491
1492        /**
1493         * Return an instance of the message handle
1494         *
1495         * @return MessageHandle
1496         */

1497        public MessageHandle getMessageHandle() {
1498            return (MessageHandle) super.getObject();
1499        }
1500    }
1501
1502}
1503
Popular Tags