KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > ojb > broker > core > PersistenceBrokerFactorySyncImpl


1 package org.apache.ojb.broker.core;
2
3 /* Copyright 2004-2005 The Apache Software Foundation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 import javax.transaction.RollbackException JavaDoc;
19 import javax.transaction.Status JavaDoc;
20 import javax.transaction.Synchronization JavaDoc;
21 import javax.transaction.SystemException JavaDoc;
22 import javax.transaction.Transaction JavaDoc;
23 import javax.transaction.TransactionManager JavaDoc;
24 import java.lang.reflect.Field JavaDoc;
25 import java.util.ArrayList JavaDoc;
26 import java.util.Collections JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.List JavaDoc;
30 import java.util.Map JavaDoc;
31 import java.util.WeakHashMap JavaDoc;
32
33 import org.apache.commons.pool.KeyedObjectPool;
34 import org.apache.ojb.broker.PBFactoryException;
35 import org.apache.ojb.broker.PBKey;
36 import org.apache.ojb.broker.PersistenceBrokerInternal;
37 import org.apache.ojb.broker.TransactionAbortedException;
38 import org.apache.ojb.broker.TransactionInProgressException;
39 import org.apache.ojb.broker.TransactionNotInProgressException;
40 import org.apache.ojb.broker.accesslayer.ConnectionManagerIF;
41 import org.apache.ojb.broker.transaction.tm.TransactionManagerFactoryException;
42 import org.apache.ojb.broker.transaction.tm.TransactionManagerFactoryFactory;
43 import org.apache.ojb.broker.util.BrokerHelper;
44 import org.apache.ojb.broker.util.logging.Logger;
45 import org.apache.ojb.broker.util.logging.LoggerFactory;
46
47 /**
48  * Workaround for participate the PB-api in JTA {@link javax.transaction.Transaction transaction} by
49  * implementing the {@link javax.transaction.Synchronization} interface.
50  * <br/>
51  * This may will be deprecated when we implemented a full JCA compliant connector.
52  * <br/>
53  * When a new {@link org.apache.ojb.broker.PersistenceBroker} instance is created in method
54  * {@link #wrapBrokerWithPoolingHandle}
55  * the given PB instance is wrapped with {@link PersistenceBrokerSyncImpl} before it was put to the PB-pool.
56  * When a PB instance was requested class try to lookup the current JTA transaction in
57  * {@link #wrapRequestedBrokerInstance} before the pooled PB instance was wrapped with the PB handle.
58  * If a running tx was found the PB instance was registered with the transaction using the
59  * {@link Synchronization} interface.
60  *
61  * @author <a HREF="mailto:armin@codeAuLait.de">Armin Waibel</a>
62  * @version $Id: PersistenceBrokerFactorySyncImpl.java,v 1.7.2.8 2005/12/21 22:25:01 tomdz Exp $
63  */

64 public class PersistenceBrokerFactorySyncImpl extends PersistenceBrokerFactoryDefaultImpl
65 {
66     private Logger log = LoggerFactory.getLogger(PersistenceBrokerFactorySyncImpl.class);
67     private TransactionManager JavaDoc txMan;
68     private TxRegistry txRegistry;
69
70     public PersistenceBrokerFactorySyncImpl()
71     {
72         super();
73         try
74         {
75             txMan = TransactionManagerFactoryFactory.instance().getTransactionManager();
76         }
77         catch (TransactionManagerFactoryException e)
78         {
79             throw new PBFactoryException("Can't instantiate TransactionManager of managed environment", e);
80         }
81         txRegistry = new TxRegistry();
82     }
83
84     public PersistenceBrokerInternal createPersistenceBroker(PBKey pbKey) throws PBFactoryException
85     {
86         /*
87         try to find a valid PBKey, if given key does not full match
88         */

89         pbKey = BrokerHelper.crossCheckPBKey(pbKey);
90         /*
91         arminw:
92         First try to find a running JTA-tx. If a tx was found we try to find
93         an associated PB instance. This ensures that in a running tx
94         always the same PB instance was used.
95         If no tx was found we lookup a instance from pool.
96         All used PB instances always be wrapped with a "PBHandle"
97         */

98         Transaction JavaDoc tx;
99         try
100         {
101             // search for an active tx
102
tx = searchForValidTx();
103         }
104         catch (SystemException JavaDoc e)
105         {
106             throw new PBFactoryException("Can't create PB instance, failure while lookup" +
107                     " running JTA transaction",e);
108         }
109         PersistenceBrokerSyncImpl obtainedBroker = null;
110         PersistenceBrokerSyncHandle result;
111         if (tx != null)
112         {
113             // try to find a broker instance already used by current tx
114
obtainedBroker = txRegistry.findBroker(tx, pbKey);
115         }
116
117         if(obtainedBroker == null || obtainedBroker.isClosed())
118         {
119             // we have to lookup new PB instance with wrapped with handle
120
// method #wrapRequestedBrokerInstance wraps the new instance
121
// with a handle
122
result = (PersistenceBrokerSyncHandle) super.createPersistenceBroker(pbKey);
123         }
124         else
125         {
126             // we found a PB instance that was already in use within the same JTA-tx
127
// so we only return a new handle
128
result = new PersistenceBrokerSyncHandle(obtainedBroker);
129         }
130         return result;
131     }
132
133     protected PersistenceBrokerInternal wrapBrokerWithPoolingHandle(PersistenceBrokerInternal broker, KeyedObjectPool pool)
134     {
135         // wrap real PB instance with an extended version of pooling PB
136
return new PersistenceBrokerSyncImpl(broker, pool);
137     }
138
139     protected PersistenceBrokerInternal wrapRequestedBrokerInstance(PersistenceBrokerInternal broker)
140     {
141         // all PB instance should be of this type
142
if (!(broker instanceof PersistenceBrokerSyncImpl))
143         {
144             throw new PBFactoryException("Expect instance of " + PersistenceBrokerSyncImpl.class
145                     + ", found " + broker.getClass());
146         }
147         /*
148         Before we return the PB handle, we jump into the running JTA tx
149         */

150         PersistenceBrokerSyncImpl pb = (PersistenceBrokerSyncImpl) broker;
151         try
152         {
153             // search for an active tx
154
Transaction JavaDoc tx = searchForValidTx();
155             if (tx != null)
156             {
157                 txRegistry.register(tx, pb);
158                 try
159                 {
160                     pb.internBegin();
161                 }
162                 catch (Exception JavaDoc e)
163                 {
164                     /*
165                     if something going wrong with pb-tx, we rollback the
166                     whole JTA tx
167                     */

168                     log.error("Unexpected exception when start intern pb-tx", e);
169                     try
170                     {
171                         tx.setRollbackOnly();
172                     }
173                     catch (Throwable JavaDoc ignore)
174                     {
175                     }
176                     throw new PBFactoryException("Unexpected exception when start intern pb-tx", e);
177                 }
178             }
179         }
180         catch (Exception JavaDoc e)
181         {
182             if(e instanceof PBFactoryException)
183             {
184                 throw (PBFactoryException) e;
185             }
186             else
187             {
188                 throw new PBFactoryException("Error while try to participate in JTA transaction", e);
189             }
190         }
191         return new PersistenceBrokerSyncHandle(pb);
192     }
193
194     private Transaction JavaDoc searchForValidTx() throws SystemException JavaDoc
195     {
196         Transaction JavaDoc tx = txMan.getTransaction();
197         if (tx != null)
198         {
199             int status = tx.getStatus();
200             if (status != Status.STATUS_ACTIVE && status != Status.STATUS_NO_TRANSACTION)
201             {
202                 throw new PBFactoryException("Transaction synchronization failed - wrong" +
203                         " status of external JTA tx. Expected was an 'active' or 'no transaction'"
204                         + ", found status is '" + getStatusFlagAsString(status) + "'");
205             }
206         }
207         return tx;
208     }
209
210     /**
211      * Returns a string representation of the given
212      * {@link javax.transaction.Status} flag.
213      */

214     private static String JavaDoc getStatusFlagAsString(int status)
215     {
216         String JavaDoc statusName = "no match, unknown status!";
217         try
218         {
219             Field JavaDoc[] fields = Status JavaDoc.class.getDeclaredFields();
220             for (int i = 0; i < fields.length; i++)
221             {
222                 if (fields[i].getInt(null) == status)
223                 {
224                     statusName = fields[i].getName();
225                     break;
226                 }
227             }
228         }
229         catch (Exception JavaDoc e)
230         {
231             statusName = "no match, unknown status!";
232         }
233         return statusName;
234     }
235
236     //****************************************************
237
// inner class
238
//****************************************************
239
public static class PersistenceBrokerSyncImpl extends PoolablePersistenceBroker implements Synchronization JavaDoc
240     {
241         private Logger log = LoggerFactory.getLogger(PersistenceBrokerSyncImpl.class);
242         /**
243          * Used to register all handles using this PB instance
244          */

245         private List JavaDoc handleList = new ArrayList JavaDoc();
246
247         public PersistenceBrokerSyncImpl(PersistenceBrokerInternal broker, KeyedObjectPool pool)
248         {
249             super(broker, pool);
250         }
251
252         public void beforeCompletion()
253         {
254             if (log.isDebugEnabled()) log.debug("beforeCompletion was called, nothing to do");
255             if(handleList.size() > 0)
256             {
257                 for(int i = 0; i < handleList.size(); i++)
258                 {
259                     log.warn("Found unclosed PersistenceBroker handle, will do automatic close. Please make" +
260                             " sure that all used PB instances will be closed.");
261                     PersistenceBrokerHandle pbh = (PersistenceBrokerHandle) handleList.get(i);
262                     pbh.close();
263                 }
264                 handleList.clear();
265             }
266             ConnectionManagerIF cm = serviceConnectionManager();
267             if(cm.isBatchMode()) cm.executeBatch();
268             // close connection immediately when in JTA-tx to avoid bad reports from server con-pool
269
if(cm.isInLocalTransaction())
270             {
271                 // we should not be in a local tx when performing tx completion
272
log.warn("Seems the used PersistenceBroker handle wasn't closed, close the used" +
273                         " handle before the transaction completes.");
274                 // in managed environments this call will be ignored by
275
// the wrapped connection
276
cm.localCommit();
277             }
278             cm.releaseConnection();
279         }
280
281         public void afterCompletion(int status)
282         {
283             if (log.isDebugEnabled()) log.debug("afterCompletion was called");
284             /*
285             we only commit if tx was successfully committed
286             */

287             try
288             {
289                 if (status != Status.STATUS_COMMITTED)
290                 {
291                     if (status == Status.STATUS_ROLLEDBACK || status == Status.STATUS_ROLLING_BACK)
292                     {
293                         if (log.isDebugEnabled()) log.debug("Aborting PB-tx due to JTA initiated Rollback: "
294                                 + getStatusFlagAsString(status));
295                     }
296                     else
297                     {
298                         log.error("Aborting PB-tx due to inconsistent, and unexpected, status of JTA tx: "
299                                 + getStatusFlagAsString(status));
300                     }
301                     internAbort();
302                 }
303                 else
304                 {
305                     if (log.isDebugEnabled()) log.debug("Commit PB-tx");
306                     internCommit();
307                 }
308             }
309             finally
310             {
311                 // returns the underlying PB instance to pool
312
doRealClose();
313             }
314         }
315
316         private void internBegin()
317         {
318             setManaged(true);
319             super.beginTransaction();
320         }
321
322         private void internCommit()
323         {
324             super.commitTransaction();
325         }
326
327         private void internAbort()
328         {
329             super.abortTransaction();
330         }
331
332         private void doRealClose()
333         {
334             if (log.isDebugEnabled()) log.debug("Now do real close of PB instance");
335             super.close();
336         }
337
338         public boolean close()
339         {
340             if(!isInTransaction())
341             {
342                 if (log.isDebugEnabled())
343                     log.debug("PB close was called, pass the close call to underlying PB instance");
344                 /*
345                 if we not in JTA-tx, we close PB instance in a "normal" way. The PB.close()
346                 should also release the used connection.
347                 */

348                 doRealClose();
349             }
350             else
351             {
352                 // if we in tx and other handles operate on the same PB instance, do
353
// nothing, till all handles are closed.
354
if(handleList.size() > 0)
355                 {
356                     if(log.isEnabledFor(Logger.INFO)) log.info("PB.close(): Active used by " + handleList.size()
357                             + " handle objects, will skip close call");
358                 }
359                 else
360                 {
361                     /*
362                     arminw:
363                     if in JTA-tx, we don't really close the underlying PB instance (return PB
364                     instance to pool, release used connection). As recently as the JTA was
365                     completed we can return PB instance to pool. Thus after tx completion method
366                     doRealClose() was called to close (return to pool) underlying PB instance.
367
368                     But to free used resources as soon as possible, we release the used connection
369                     immediately. The JTA-tx will handle the connection status in a proper way.
370                     */

371                     if (log.isDebugEnabled())
372                         log.debug("PB close was called, only close the PB handle when in JTA-tx");
373
374                     /*
375                     TODO: workaround, in 1.1 use special method do handle this stuff
376                     arminw:
377                     needed to prevent unclosed connection Statement instances when RsIterator
378                     wasn't fully materialized in managed environment, because RsIterator is
379                     a PBStateListener and below we close the connection.
380                     */

381                     PersistenceBrokerImpl pb = ((PersistenceBrokerImpl) getInnermostDelegate());
382                     pb.fireBrokerEvent(pb.BEFORE_CLOSE_EVENT);
383
384                     ConnectionManagerIF cm = serviceConnectionManager();
385                     if(cm.isInLocalTransaction())
386                     {
387                         /*
388                         arminw:
389                         in managed environment con.commit calls will be ignored because, the JTA
390                         transaction manager control the connection status. But to make
391                         connectionManager happy we have to complete the "local tx" of the
392                         connectionManager before release the connection
393                         */

394                         cm.localCommit();
395                     }
396                     cm.releaseConnection();
397                 }
398             }
399             return true;
400         }
401
402         void registerHandle(PersistenceBrokerHandle handle)
403         {
404             handleList.add(handle);
405         }
406
407         void deregisterHandle(PersistenceBrokerHandle handle)
408         {
409             handleList.remove(handle);
410         }
411
412         public void beginTransaction() throws TransactionInProgressException, TransactionAbortedException
413         {
414             throw new UnsupportedOperationException JavaDoc("In managed environments only JTA transaction demarcation allowed");
415         }
416
417         public void commitTransaction() throws TransactionNotInProgressException, TransactionAbortedException
418         {
419             throw new UnsupportedOperationException JavaDoc("In managed environments only JTA transaction demarcation allowed");
420         }
421
422         public void abortTransaction() throws TransactionNotInProgressException
423         {
424             throw new UnsupportedOperationException JavaDoc("In managed environments only JTA transaction demarcation allowed");
425         }
426     }
427
428     //****************************************************
429
// inner class
430
//****************************************************
431
/**
432      * This class collects all PB instances requested in the scope of one transaction
433      */

434     class TransactionBox implements Synchronization JavaDoc
435     {
436         Transaction JavaDoc jtaTx;
437         Map JavaDoc syncMap = new HashMap JavaDoc();
438         boolean isLocked = false;
439         boolean isClosed = false;
440
441         public TransactionBox(Transaction JavaDoc tx)
442         {
443             this.jtaTx = tx;
444         }
445
446         PersistenceBrokerSyncImpl find(PBKey key)
447         {
448             return (PersistenceBrokerSyncImpl) syncMap.get(key);
449         }
450
451         void add(PersistenceBrokerSyncImpl syncObj)
452         {
453             if (isLocked)
454             {
455                 throw new PBFactoryException("Can't associate object with JTA transaction, because tx-completion started");
456             }
457             syncMap.put(syncObj.getPBKey(), syncObj);
458         }
459
460         public void afterCompletion(int status)
461         {
462             boolean failures = false;
463             Synchronization JavaDoc synchronization = null;
464             for (Iterator JavaDoc iterator = syncMap.values().iterator(); iterator.hasNext();)
465             {
466                 try
467                 {
468                     synchronization = (Synchronization JavaDoc) iterator.next();
469                     synchronization.afterCompletion(status);
470                 }
471                 catch (Exception JavaDoc e)
472                 {
473                     failures = true;
474                     log.error("Unexpected error when perform Synchronization#afterCompletion method" +
475                             " call on object " + synchronization, e);
476                 }
477             }
478             isClosed = true;
479             // discard association of PB instances and jta-tx
480
txRegistry.removeTxBox(jtaTx);
481             if (failures)
482             {
483                 throw new PBFactoryException("Unexpected error occured while performing" +
484                         " Synchronization#afterCompletion method");
485             }
486         }
487
488         public void beforeCompletion()
489         {
490             boolean failures = false;
491             Synchronization JavaDoc synchronization = null;
492             for (Iterator JavaDoc iterator = syncMap.values().iterator(); iterator.hasNext();)
493             {
494                 try
495                 {
496                     synchronization = (Synchronization JavaDoc) iterator.next();
497                     synchronization.beforeCompletion();
498                 }
499                 catch (Exception JavaDoc e)
500                 {
501                     failures = true;
502                     log.error("Unexpected error when perform Synchronization#beforeCompletion method" +
503                             " call on object " + synchronization, e);
504                 }
505             }
506             isLocked = true;
507             if (failures)
508             {
509                 throw new PBFactoryException("Unexpected error occured while performing" +
510                         " Synchronization#beforeCompletion method");
511             }
512         }
513     }
514
515     //****************************************************
516
// inner class
517
//****************************************************
518
/**
519      * Maps all {@link TransactionBox} instances based on {@link Transaction} object identity.
520      *
521      * TODO: Not sure if we should held TransactionBox instances per thread or per transaction object identity.
522      * As far as I know it is possible in JTA that thread A starts a tx and thread B commits the tx, thus I
523      * start with tx identity as key in registry
524      */

525     class TxRegistry
526     {
527         Map JavaDoc txBoxMap;
528
529         public TxRegistry()
530         {
531             txBoxMap = Collections.synchronizedMap(new WeakHashMap JavaDoc());
532         }
533
534         void register(Transaction JavaDoc tx, PersistenceBrokerSyncImpl syncObject) throws RollbackException JavaDoc, SystemException JavaDoc
535         {
536             TransactionBox txBox = (TransactionBox) txBoxMap.get(tx);
537             if (txBox == null || txBox.isClosed)
538             {
539                 // if environment reuse tx instances we can find closed TransactionBox instances
540
if (txBox != null) txBoxMap.remove(tx);
541                 txBox = new TransactionBox(tx);
542                 tx.registerSynchronization(txBox);
543                 txBoxMap.put(tx, txBox);
544             }
545             txBox.add(syncObject);
546         }
547
548         PersistenceBrokerSyncImpl findBroker(Transaction JavaDoc tx, PBKey pbKey)
549         {
550             PersistenceBrokerSyncImpl result = null;
551             TransactionBox txBox = (TransactionBox) txBoxMap.get(tx);
552             if(txBox != null)
553             {
554                 result = txBox.find(pbKey);
555             }
556             return result;
557         }
558
559         TransactionBox findTxBox(Transaction JavaDoc tx)
560         {
561             return (TransactionBox) txBoxMap.get(tx);
562         }
563
564         void removeTxBox(Transaction JavaDoc tx)
565         {
566             txBoxMap.remove(tx);
567         }
568     }
569
570     //****************************************************
571
// inner class
572
//****************************************************
573
/**
574      * This wrapper was used when a PB instance which was already in use by a
575      * transaction was found.
576      */

577     class PersistenceBrokerSyncHandle extends PersistenceBrokerHandle
578     {
579         /**
580          * Constructor for the handle, set itself in
581          * {@link PersistenceBrokerThreadMapping#setCurrentPersistenceBroker}
582          */

583         public PersistenceBrokerSyncHandle(PersistenceBrokerSyncImpl broker)
584         {
585             super(broker);
586             // we register handle at underlying PB instance
587
broker.registerHandle(this);
588         }
589
590         public boolean isClosed()
591         {
592             return super.isClosed();
593         }
594
595         public boolean close()
596         {
597             if(getDelegate() != null)
598             {
599                 // deregister from underlying PB instance
600
((PersistenceBrokerSyncImpl) getDelegate()).deregisterHandle(this);
601             }
602             return super.close();
603         }
604     }
605 }
606
Popular Tags