KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > XAResourceMngr


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 2004 - 2000 Bull SA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (Bull SA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.client.jms;
25
26 import java.util.Enumeration JavaDoc;
27 import java.util.Hashtable JavaDoc;
28 import java.util.Vector JavaDoc;
29
30 import javax.jms.JMSException JavaDoc;
31
32 import javax.transaction.xa.XAException JavaDoc;
33 import javax.transaction.xa.XAResource JavaDoc;
34 import javax.transaction.xa.Xid JavaDoc;
35
36 import org.objectweb.joram.shared.client.ProducerMessages;
37 import org.objectweb.joram.shared.client.SessAckRequest;
38 import org.objectweb.joram.shared.client.XACnxPrepare;
39 import org.objectweb.joram.shared.client.XACnxCommit;
40 import org.objectweb.joram.shared.client.XACnxRecoverReply;
41 import org.objectweb.joram.shared.client.XACnxRecoverRequest;
42 import org.objectweb.joram.shared.client.XACnxRollback;
43
44 import org.objectweb.util.monolog.api.BasicLevel;
45 import org.objectweb.joram.shared.JoramTracing;
46
47 /**
48  * Utility class used by XA connections for managing XA resources.
49  */

50 public class XAResourceMngr {
51   /** Transaction active. */
52   public static final int STARTED = 0;
53   /** Transaction suspended. */
54   public static final int SUSPENDED = 1;
55   /** Transaction successful. */
56   public static final int SUCCESS = 2;
57   /** Failed transaction. */
58   public static final int ROLLBACK_ONLY = 3;
59   /** Prepared transaction. */
60   public static final int PREPARED = 4;
61
62   /**
63    * The table of known transactions.
64    * <p>
65    * <b>Key:</b> transaction identifier<br>
66    * <b>Object:</b> <code>XAContext</code> instance
67    */

68   private Hashtable JavaDoc transactions;
69
70   /** The connection this manager belongs to. */
71   Connection cnx;
72
73   /** table of Session (key Xid). */
74   Hashtable JavaDoc sessionTable;
75
76
77   /**
78    * Creates a <code>XAResourceMngr</code> instance.
79    *
80    * @param cnx The connection this manager belongs to.
81    */

82   public XAResourceMngr(Connection cnx) {
83     this.cnx = cnx;
84     transactions = new Hashtable JavaDoc();
85     sessionTable = new Hashtable JavaDoc();
86
87     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
88       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
89                                  " XAResourceMngr cnx = " + cnx);
90   }
91
92   /**
93    * Notifies the RM that a transaction is starting.
94    *
95    * @exception XAException If the specified transaction is already known by
96    * the RM in an incompatible state with the start
97    * request.
98    */

99   synchronized void start(Xid JavaDoc xid, int flag, Session sess)
100     throws XAException JavaDoc {
101     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
102       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
103                                  " XAResourceMngr start(" + xid +
104                                  ", " + flag +
105                                  ", " + sess +")");
106
107     sess.setTransacted(true); // for XAResource.TMRESUME
108
sessionTable.put(xid,sess);
109
110     // New transaction.
111
if (flag == XAResource.TMNOFLAGS) {
112       if (transactions.containsKey(xid))
113         throw new XAException JavaDoc("Can't start transaction already known by RM.");
114
115       transactions.put(xid, new XAContext());
116
117       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
118         JoramTracing.dbgClient.log(BasicLevel.DEBUG,
119                                    "--- "
120                                    + this
121                                    + ": involved in transaction "
122                                    + xid.toString());
123     }
124     // Resumed transaction.
125
else if (flag == XAResource.TMRESUME) {
126       if (! transactions.containsKey(xid))
127         throw new XAException JavaDoc("Can't resume unknown transaction.");
128
129       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
130         JoramTracing.dbgClient.log(BasicLevel.DEBUG,
131                                    "--- "
132                                    + this
133                                    + ": resumes transaction "
134                                    + xid.toString());
135     }
136     // Already known transaction.
137
else if (flag == XAResource.TMJOIN) {
138       if (! transactions.containsKey(xid))
139         throw new XAException JavaDoc("Can't join unknown transaction.");
140
141       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
142         JoramTracing.dbgClient.log(BasicLevel.DEBUG,
143                                    "--- "
144                                    + this
145                                    + ": joins transaction "
146                                    + xid.toString());
147     }
148     else
149       throw new XAException JavaDoc("Invalid flag: " + flag);
150
151     setStatus(xid, STARTED);
152   }
153
154   /**
155    * Notifies the RM that a transaction is ended.
156    *
157    * @exception XAException If the specified transaction is in an
158    * incompatible state with the end request.
159    */

160   synchronized void end(Xid JavaDoc xid, int flag, Session sess)
161     throws XAException JavaDoc {
162     boolean saveResourceState = true;
163
164     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
165       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
166                                  "--- "
167                                  + this
168                                  + ": end(" + xid
169                                  + ", " + flag
170                                  + ", " + sess + ")");
171     
172     if (flag == XAResource.TMSUSPEND) {
173       if (getStatus(xid) != STARTED)
174         throw new XAException JavaDoc("Can't suspend non started transaction.");
175
176       setStatus(xid, SUSPENDED);
177     }
178     else {
179       if (getStatus(xid) != STARTED && getStatus(xid) != SUSPENDED)
180         throw new XAException JavaDoc("Can't end non active or non "
181                               + "suspended transaction.");
182
183       // No need to save the resource's state as it has already been done
184
// when suspending it.
185
if (getStatus(xid) == SUSPENDED)
186         saveResourceState = false;
187
188       if (flag == XAResource.TMSUCCESS)
189         setStatus(xid, SUCCESS);
190       else if (flag == XAResource.TMFAIL)
191         setStatus(xid, ROLLBACK_ONLY);
192       else
193         throw new XAException JavaDoc("Invalid flag: " + flag);
194     }
195
196     if (saveResourceState) {
197       XAContext xaC = (XAContext) transactions.get(xid);
198       xaC.addSendings(sess.sendings);
199       xaC.addDeliveries(sess.deliveries);
200     }
201
202     Session session = (Session) sessionTable.get(xid);
203     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
204       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
205                                  "--- "
206                                  + this
207                                  + ": end(...) session="
208                                  + session);
209
210     if (session != null) {
211       session.setTransacted(false);
212       sessionTable.remove(xid);
213     }
214   }
215
216   /**
217    * Notifies the RM that a transaction is prepared.
218    *
219    * @exception XAException If the specified transaction is in an
220    * incompatible state with the prepare request,
221    * or if the request fails.
222    */

223   synchronized void prepare(Xid JavaDoc xid)
224     throws XAException JavaDoc {
225
226     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
227       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
228                                  "--- "
229                                  + this
230                                  + ": prepare(" + xid + ")");
231     
232     try {
233       if (getStatus(xid) == ROLLBACK_ONLY)
234         throw new XAException JavaDoc("Can't prepare resource in ROLLBACK_ONLY state.");
235
236       XAContext xaC = (XAContext) transactions.get(xid);
237
238       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
239         JoramTracing.dbgClient.log(BasicLevel.DEBUG,
240                                    "--- "
241                                    + this
242                                    + ": prepares transaction "
243                                    + xid.toString());
244
245       Enumeration JavaDoc targets;
246       String JavaDoc target;
247       Vector JavaDoc pMs = new Vector JavaDoc();
248       MessageAcks acks;
249       Vector JavaDoc sessAcks = new Vector JavaDoc();
250
251       // Getting all the ProducerMessages to send:
252
targets = xaC.sendings.keys();
253       while (targets.hasMoreElements()) {
254         target = (String JavaDoc) targets.nextElement();
255         pMs.add(xaC.sendings.remove(target));
256       }
257
258       // Getting all the SessAckRequest to send:
259
targets = xaC.deliveries.keys();
260       while (targets.hasMoreElements()) {
261         target = (String JavaDoc) targets.nextElement();
262         acks = (MessageAcks) xaC.deliveries.remove(target);
263         sessAcks.add(new SessAckRequest(target, acks.getIds(),
264                                         acks.getQueueMode()));
265       }
266
267       // Sending to the proxy:
268
cnx.syncRequest(new XACnxPrepare(xid.getBranchQualifier(),
269                                         xid.getFormatId(),
270                                         xid.getGlobalTransactionId(),
271                                         pMs,
272                                         sessAcks));
273
274       setStatus(xid, PREPARED);
275
276     } catch (JMSException JavaDoc exc) {
277       setStatus(xid, ROLLBACK_ONLY);
278       throw new XAException JavaDoc("Prepare request failed: " + exc);
279     } catch (XAException JavaDoc exc) {
280       setStatus(xid, ROLLBACK_ONLY);
281       throw exc;
282     }
283   }
284
285   /**
286    * Notifies the RM that a transaction is commited.
287    *
288    * @exception XAException If the specified transaction is in an
289    * incompatible state with the commit request,
290    * or if the request fails.
291    */

292   synchronized void commit(Xid JavaDoc xid)
293     throws XAException JavaDoc {
294
295     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
296       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
297                                  "--- "
298                                  + this
299                                  + ": commit(" + xid + ")");
300
301     try {
302       if (getStatus(xid) != PREPARED)
303         throw new XAException JavaDoc("Can't commit non prepared transaction.");
304
305       XAContext xaC = (XAContext) transactions.get(xid);
306
307       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
308         JoramTracing.dbgClient.log(BasicLevel.DEBUG,
309                                    "--- "
310                                    + this
311                                    + ": commits transaction "
312                                    + xid.toString());
313
314       cnx.syncRequest(new XACnxCommit(xid.getBranchQualifier(),
315                                        xid.getFormatId(),
316                                        xid.getGlobalTransactionId()));
317
318       transactions.remove(xid);
319       Session session = (Session) sessionTable.get(xid);
320       if (session != null)
321         session.setTransacted(false);
322
323     } catch (JMSException JavaDoc exc) {
324       setStatus(xid, ROLLBACK_ONLY);
325       throw new XAException JavaDoc("Commit request failed: " + exc);
326     } catch (XAException JavaDoc exc) {
327       setStatus(xid, ROLLBACK_ONLY);
328       throw exc;
329     }
330   }
331
332   /**
333    * Notifies the RM that a transaction is rolled back.
334    *
335    * @exception XAException If the specified transaction is in an
336    * incompatible state with the rollback request,
337    * or if the request fails.
338    */

339   synchronized void rollback(Xid JavaDoc xid)
340     throws XAException JavaDoc {
341
342     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
343       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
344                                  "--- "
345                                  + this
346                                  + ": rollback(" + xid + ")");
347
348     try {
349       XAContext xaC = (XAContext) transactions.get(xid);
350
351       if (xaC == null)
352         throw new XAException JavaDoc("Unknown transaction.");
353
354       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
355         JoramTracing.dbgClient.log(BasicLevel.DEBUG,
356                                    "--- "
357                                    + this
358                                    + ": rolls back transaction "
359                                    + xid.toString());
360
361       Enumeration JavaDoc targets;
362       String JavaDoc target;
363       MessageAcks acks;
364
365       XACnxRollback rollbackRequest;
366   
367       targets = xaC.deliveries.keys();
368   
369       rollbackRequest = new XACnxRollback(xid.getBranchQualifier(),
370                                            xid.getFormatId(),
371                                            xid.getGlobalTransactionId());
372
373       while (targets.hasMoreElements()) {
374         target = (String JavaDoc) targets.nextElement();
375         acks = (MessageAcks) xaC.deliveries.remove(target);
376         rollbackRequest.add(target, acks.getIds(), acks.getQueueMode());
377       }
378
379       // Sending to the proxy:
380
cnx.syncRequest(rollbackRequest);
381
382       transactions.remove(xid);
383       Session session = (Session) sessionTable.get(xid);
384       if (session != null) {
385         session.setTransacted(false);
386         sessionTable.remove(xid);
387       }
388     } catch (JMSException JavaDoc exc) {
389       setStatus(xid, ROLLBACK_ONLY);
390       throw new XAException JavaDoc("Rollback request failed: " + exc);
391     } catch (XAException JavaDoc exc) {
392       setStatus(xid, ROLLBACK_ONLY);
393       throw exc;
394     }
395   }
396
397   /**
398    * Notifies the RM to recover the prepared transactions.
399    *
400    * @exception XAException If the specified flag is invalid, or if the
401    * request fails.
402    */

403   synchronized Xid JavaDoc[] recover(int flag) throws XAException JavaDoc
404   {
405     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
406       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
407                                  "--- "
408                                  + this
409                                  + ": recovers transactions.");
410
411     if (flag == XAResource.TMSTARTRSCAN || flag == XAResource.TMENDRSCAN)
412       throw new XAException JavaDoc("Non supported recovery flag: " + flag);
413
414     try {
415       XACnxRecoverReply reply =
416         (XACnxRecoverReply) cnx.syncRequest(new XACnxRecoverRequest());
417
418       Xid JavaDoc[] xids = new Xid JavaDoc[reply.getSize()];
419
420       for (int i = 0; i < reply.getSize(); i++) {
421         xids[i] = new XidImpl(reply.getBranchQualifier(i),
422                               reply.getFormatId(i),
423                               reply.getGlobalTransactionId(i));
424         transactions.put(xids[i], new XAContext());
425         setStatus(xids[i], PREPARED);
426       }
427       return xids;
428     }
429     catch (Exception JavaDoc exc) {
430       throw new XAException JavaDoc("Recovery request failed: " + exc.getMessage());
431     }
432   }
433
434   /**
435    * Sets the status of a transaction.
436    *
437    * @exception XAException If the transaction is unknown.
438    */

439   private void setStatus(Xid JavaDoc xid, int status) throws XAException JavaDoc
440   {
441     XAContext xac = (XAContext) transactions.get(xid);
442
443     if (xac == null)
444       throw new XAException JavaDoc("Unknown transaction.");
445
446     xac.status = status;
447   }
448
449   /**
450    * Gets the status of a transaction.
451    *
452    * @exception XAException If the transaction is unknown.
453    */

454   private int getStatus(Xid JavaDoc xid) throws XAException JavaDoc
455   {
456     XAContext xac = (XAContext) transactions.get(xid);
457
458     if (xac == null)
459       throw new XAException JavaDoc("Unknown transaction.");
460
461     return xac.status;
462   }
463
464   /** Resource managers are equal if they belong to the same connection. */
465   public boolean equals(Object JavaDoc o) {
466     if (! (o instanceof XAResourceMngr))
467       return false;
468
469     XAResourceMngr other = (XAResourceMngr) o;
470
471     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
472       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
473                                  this + ": equals other = " + other.cnx +
474                                  ", this.cnx = " + cnx +
475                                  ", equals = " + cnx.equals(other.cnx));
476     
477     return cnx.equals(other.cnx);
478   }
479 }
480
481 /**
482  * Utility class holding a resource's state during transaction progress.
483  */

484 class XAContext
485 {
486   /** The transaction status. */
487   int status;
488   /**
489    * Table holding the <code>ProducerMessages</code> produced in the
490    * transaction.
491    * <p>
492    * <b>Key:</b> destination name<br>
493    * <b>Object:</b> <code>ProducerMessages</code>
494    */

495   Hashtable JavaDoc sendings;
496   /**
497    * Table holding the identifiers of the messages delivered per
498    * destination or subscription, in the transaction.
499    * <p>
500    * <b>Key:</b> destination or subscription name<br>
501    * <b>Object:</b> corresponding <code>MessageAcks</code> instance
502    */

503   Hashtable JavaDoc deliveries;
504
505
506   /**
507    * Constructs an <code>XAContext</code> instance.
508    */

509   XAContext()
510   {
511     sendings = new Hashtable JavaDoc();
512     deliveries = new Hashtable JavaDoc();
513   }
514
515
516   /**
517    * Adds new sendings performed by the resumed transaction.
518    */

519   void addSendings(Hashtable JavaDoc newSendings) {
520     String JavaDoc newDest;
521     ProducerMessages newPM;
522     ProducerMessages storedPM;
523     Vector JavaDoc msgs;
524
525     // Browsing the destinations for which messages have been produced:
526
Enumeration JavaDoc newDests = newSendings.keys();
527     while (newDests.hasMoreElements()) {
528       newDest = (String JavaDoc) newDests.nextElement();
529       newPM = (ProducerMessages) newSendings.remove(newDest);
530       storedPM = (ProducerMessages) sendings.get(newDest);
531       // If messages haven't already been produced for this destination,
532
// storing the new ProducerMessages object:
533
if (storedPM == null)
534         sendings.put(newDest, newPM);
535       // Else, adding the newly produced messages to the existing
536
// ProducerMessages:
537
else {
538         msgs = newPM.getMessages();
539         for (int i = 0; i < msgs.size(); i++)
540           storedPM.addMessage(((Message) msgs.get(i)).momMsg);
541       }
542     }
543   }
544
545   /**
546    * Adds new deliveries occured within the resumed transaction.
547    */

548   void addDeliveries(Hashtable JavaDoc newDeliveries) {
549     String JavaDoc newName;
550     MessageAcks newAcks;
551     MessageAcks storedAcks;
552
553     // Browsing the destinations or subscriptions to which messages will have
554
// to be acknowledged:
555
Enumeration JavaDoc newNames = newDeliveries.keys();
556     while (newNames.hasMoreElements()) {
557       newName = (String JavaDoc) newNames.nextElement();
558       newAcks = (MessageAcks) newDeliveries.remove(newName);
559       storedAcks = (MessageAcks) deliveries.get(newName);
560       // If there are no messages to acknowledge for this destination or
561
// subscription, storing the new vector:
562
if (storedAcks == null)
563         deliveries.put(newName, newAcks);
564       // Else, adding the new ids to the stored ones:
565
else
566         storedAcks.addIds(newAcks.getIds());
567     }
568   }
569 }
570
Popular Tags