KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > SpyXAResourceManager


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq;
23
24 import java.io.Serializable JavaDoc;
25 import java.util.Map JavaDoc;
26 import java.util.ArrayList JavaDoc;
27
28 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
29
30 import javax.jms.JMSException JavaDoc;
31 import javax.transaction.xa.XAException JavaDoc;
32 import javax.transaction.xa.XAResource JavaDoc;
33 import javax.transaction.xa.Xid JavaDoc;
34
35 import org.jboss.logging.Logger;
36
37 /**
38  * This class implements the ResourceManager used for the XAResources used int
39  * JBossMQ.
40  *
41  * @author Hiram Chirino (Cojonudo14@hotmail.com)
42  * @author <a HREF="mailto:adrian@jboss.org">Adrian Brock</a>
43  * @version $Revision: 45317 $
44  */

45 public class SpyXAResourceManager implements Serializable JavaDoc
46 {
47    /** The serialVersionUID */
48    static final long serialVersionUID = -6268132972627753772L;
49    /** The log */
50    private static final Logger log = Logger.getLogger(SpyXAResourceManager.class);
51    /** Whether trace is enabled */
52    private static boolean trace = log.isTraceEnabled();
53
54    //Valid tx states:
55
private final static byte TX_OPEN = 0;
56    private final static byte TX_ENDED = 1;
57    private final static byte TX_PREPARED = 3;
58    private final static byte TX_COMMITED = 4;
59    private final static byte TX_ROLLEDBACK = 5;
60    private final static byte TX_READONLY = 6;
61
62    /** The connection */
63    private Connection connection;
64    /** The transactions */
65    private Map JavaDoc transactions = new ConcurrentReaderHashMap();
66    /** The next xid */
67    private long nextInternalXid = Long.MIN_VALUE;
68    
69    /**
70     * Create a new SpyXAResourceManager
71     *
72     * @param conn the connection
73     */

74    public SpyXAResourceManager(Connection conn)
75    {
76       super();
77       connection = conn;
78    }
79
80    /**
81     * Acknowledge a message
82     *
83     * @param xid the xid
84     * @param msg the message
85     * @throws JMSException for any error
86     */

87    public void ackMessage(Object JavaDoc xid, SpyMessage msg) throws JMSException JavaDoc
88    {
89       if (xid == null)
90       {
91          if (trace)
92             log.trace("No Xid, acking message " + msg.header.jmsMessageID);
93          msg.doAcknowledge();
94          return;
95       }
96
97       if (trace)
98          log.trace("Adding acked message xid=" + xid + " " + msg.header.jmsMessageID);
99
100       TXState state = (TXState) transactions.get(xid);
101       if (state == null)
102          throw new JMSException JavaDoc("Invalid transaction id.");
103       AcknowledgementRequest item = msg.getAcknowledgementRequest(true);
104       state.ackedMessages.add(item);
105    }
106
107    public void addMessage(Object JavaDoc xid, SpyMessage msg) throws JMSException JavaDoc
108    {
109       if (xid == null)
110       {
111          if (trace)
112             log.trace("No Xid, sending message to server " + msg.header.jmsMessageID);
113          connection.sendToServer(msg);
114          return;
115       }
116       
117       if (trace)
118          log.trace("Adding message xid=" + xid + ", message=" + msg.header.jmsMessageID);
119
120       TXState state = (TXState) transactions.get(xid);
121       if (trace)
122          log.trace("TXState=" + state);
123
124       if (state == null)
125          throw new JMSException JavaDoc("Invalid transaction id.");
126
127       state.sentMessages.add(msg);
128    }
129
130    public void commit(Object JavaDoc xid, boolean onePhase) throws XAException JavaDoc, JMSException JavaDoc
131    {
132       if (trace)
133          log.trace("Commiting xid=" + xid + ", onePhase=" + onePhase);
134
135       TXState state = (TXState) transactions.remove(xid);
136       if (state == null)
137       {
138          XAException JavaDoc e = new XAException JavaDoc("Unknown transaction during commit " + xid);
139          e.errorCode = XAException.XAER_NOTA;
140          throw e;
141       }
142
143       if (onePhase)
144       {
145          if (state.isReadOnly())
146          {
147             if (trace)
148                log.trace("Nothing to do for " + xid);
149          }
150          
151          TransactionRequest transaction = new TransactionRequest();
152          transaction.requestType = TransactionRequest.ONE_PHASE_COMMIT_REQUEST;
153          transaction.xid = null;
154          if (state.sentMessages.size() != 0)
155          {
156             SpyMessage job[] = new SpyMessage[state.sentMessages.size()];
157             job = (SpyMessage[]) state.sentMessages.toArray(job);
158             transaction.messages = job;
159          }
160          if (state.ackedMessages.size() != 0)
161          {
162             AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()];
163             job = (AcknowledgementRequest[]) state.ackedMessages.toArray(job);
164             transaction.acks = job;
165          }
166          connection.send(transaction);
167       }
168       else
169       {
170          if (state.txState == TX_READONLY)
171          {
172             if (trace)
173                log.trace("Nothing to do for " + xid);
174             return;
175          }
176          if (state.txState != TX_PREPARED)
177          {
178             XAException JavaDoc e = new XAException JavaDoc("Cannot complete 2 phase commit, the transaction has not been prepared " + xid);
179             e.errorCode = XAException.XAER_PROTO;
180             throw e;
181          }
182          TransactionRequest transaction = new TransactionRequest();
183          transaction.xid = xid;
184          transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST;
185          connection.send(transaction);
186       }
187       state.txState = TX_COMMITED;
188    }
189
190    public void endTx(Object JavaDoc xid, boolean success) throws XAException JavaDoc
191    {
192       if (trace)
193          log.trace("Ending xid=" + xid + ", success=" + success);
194
195       TXState state = (TXState) transactions.get(xid);
196       if (state == null)
197       {
198          XAException JavaDoc e = new XAException JavaDoc("Unknown transaction during delist " + xid);
199          e.errorCode = XAException.XAER_NOTA;
200          throw e;
201       }
202       state.txState = TX_ENDED;
203    }
204
205    public Object JavaDoc joinTx(Xid JavaDoc xid) throws XAException JavaDoc
206    {
207       if (trace)
208          log.trace("Joining tx xid=" + xid);
209
210       if (!transactions.containsKey(xid))
211       {
212          XAException JavaDoc e = new XAException JavaDoc("Unknown transaction during join " + xid);
213          e.errorCode = XAException.XAER_NOTA;
214          throw e;
215       }
216       return xid;
217    }
218
219    public int prepare(Object JavaDoc xid) throws XAException JavaDoc, JMSException JavaDoc
220    {
221       if (trace)
222          log.trace("Preparing xid=" + xid);
223
224       TXState state = (TXState) transactions.get(xid);
225       if (state == null)
226       {
227          XAException JavaDoc e = new XAException JavaDoc("Unknown transaction during prepare " + xid);
228          e.errorCode = XAException.XAER_NOTA;
229          throw e;
230       }
231
232       if (state.isReadOnly())
233       {
234          if (trace)
235             log.trace("Vote read only for " + xid);
236          state.txState = TX_READONLY;
237          return XAResource.XA_RDONLY;
238       }
239       
240       TransactionRequest transaction = new TransactionRequest();
241       transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST;
242       transaction.xid = xid;
243       if (state.sentMessages.size() != 0)
244       {
245          SpyMessage job[] = new SpyMessage[state.sentMessages.size()];
246          job = (SpyMessage[]) state.sentMessages.toArray(job);
247          transaction.messages = job;
248       }
249       if (state.ackedMessages.size() != 0)
250       {
251          AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()];
252          job = (AcknowledgementRequest[]) state.ackedMessages.toArray(job);
253          transaction.acks = job;
254       }
255       connection.send(transaction);
256       state.txState = TX_PREPARED;
257       return XAResource.XA_OK;
258    }
259
260    public Object JavaDoc resumeTx(Xid JavaDoc xid) throws XAException JavaDoc
261    {
262       if (trace)
263          log.trace("Resuming tx xid=" + xid);
264
265       if (!transactions.containsKey(xid))
266       {
267          XAException JavaDoc e = new XAException JavaDoc("Unknown transaction during resume " + xid);
268          e.errorCode = XAException.XAER_NOTA;
269          throw e;
270       }
271       return xid;
272    }
273
274    public void rollback(Object JavaDoc xid) throws XAException JavaDoc, JMSException JavaDoc
275    {
276       if (trace)
277          log.trace("Rolling back xid=" + xid);
278
279       TXState state = (TXState) transactions.remove(xid);
280       if (state == null)
281       {
282          XAException JavaDoc e = new XAException JavaDoc("Unknown transaction during rollback " + xid);
283          e.errorCode = XAException.XAER_NOTA;
284          throw e;
285       }
286       if (state.txState == TX_READONLY)
287       {
288          if (trace)
289             log.trace("Nothing to do for " + xid);
290          return;
291       }
292       if (state.txState != TX_PREPARED)
293       {
294          TransactionRequest transaction = new TransactionRequest();
295          transaction.requestType = TransactionRequest.ONE_PHASE_COMMIT_REQUEST;
296          transaction.xid = null;
297          if (state.ackedMessages.size() != 0)
298          {
299             AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()];
300             job = (AcknowledgementRequest[]) state.ackedMessages.toArray(job);
301             transaction.acks = job;
302             //Neg Acknowlege all consumed messages
303
for (int i = 0; i < transaction.acks.length; i++)
304             {
305                transaction.acks[i].isAck = false;
306             }
307          }
308          connection.send(transaction);
309       }
310       else
311       {
312          TransactionRequest transaction = new TransactionRequest();
313          transaction.xid = xid;
314          transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST;
315          connection.send(transaction);
316       }
317       state.txState = TX_ROLLEDBACK;
318    }
319
320    public Xid JavaDoc[] recover(int arg) throws XAException JavaDoc, JMSException JavaDoc
321    {
322       if (trace)
323          log.trace("Recover arg=" + arg);
324       
325       Xid JavaDoc[] xids = connection.recover(arg);
326
327       // Make sure we have a reference to each xid
328
for (int i = 0; i < xids.length; ++i)
329       {
330          if (transactions.containsKey(xids[i]) == false)
331          {
332             TXState state = new TXState();
333             state.txState = TX_PREPARED;
334             transactions.put(xids[i], state);
335          }
336       }
337       return xids;
338    }
339
340    public void forget(Xid JavaDoc xid) throws XAException JavaDoc, JMSException JavaDoc
341    {
342       if (trace)
343          log.trace("Forget xid=" + xid);
344
345       TXState state = (TXState) transactions.get(xid);
346       if (state == null)
347          return;
348       if (state.txState != TX_PREPARED)
349          transactions.remove(xid);
350       rollback(xid);
351    }
352    
353    public synchronized Long JavaDoc getNewXid()
354    {
355       return new Long JavaDoc(nextInternalXid++);
356    }
357
358    public Object JavaDoc startTx()
359    {
360       Long JavaDoc newXid = getNewXid();
361       transactions.put(newXid, new TXState());
362
363       if (trace)
364          log.trace("Starting tx with new xid=" + newXid);
365
366       return newXid;
367    }
368
369    public Object JavaDoc startTx(Xid JavaDoc xid) throws XAException JavaDoc
370    {
371       if (trace)
372          log.trace("Starting tx xid=" + xid);
373
374       if (transactions.containsKey(xid))
375       {
376          XAException JavaDoc e = new XAException JavaDoc("Duplicate transaction id during enlist " + xid);
377          e.errorCode = XAException.XAER_DUPID;
378          throw e;
379       }
380       transactions.put(xid, new TXState());
381       return xid;
382    }
383
384    public Object JavaDoc suspendTx(Xid JavaDoc xid) throws XAException JavaDoc
385    {
386       if (trace)
387          log.trace("Suppending tx xid=" + xid);
388
389       if (!transactions.containsKey(xid))
390       {
391          XAException JavaDoc e = new XAException JavaDoc("Unknown transaction during suspend " + xid);
392          e.errorCode = XAException.XAER_NOTA;
393          throw e;
394       }
395       return xid;
396    }
397
398    public Object JavaDoc convertTx(Long JavaDoc anonXid, Xid JavaDoc xid) throws XAException JavaDoc
399    {
400       if (trace)
401          log.trace("Converting tx anonXid=" + anonXid + ", xid=" + xid);
402
403       if (!transactions.containsKey(anonXid))
404       {
405          XAException JavaDoc e = new XAException JavaDoc("Unknown transaction during convert " + anonXid);
406          e.errorCode = XAException.XAER_NOTA;
407          throw e;
408       }
409       if (transactions.containsKey(xid))
410       {
411          XAException JavaDoc e = new XAException JavaDoc("Duplicate transaction during convert " + xid);
412          e.errorCode = XAException.XAER_DUPID;
413          throw e;
414       }
415       TXState s = (TXState) transactions.remove(anonXid);
416
417       transactions.put(xid, s);
418       return xid;
419    }
420
421    /**
422     * The transaction state
423     */

424    static class TXState
425    {
426       byte txState = TX_OPEN;
427       ArrayList JavaDoc sentMessages = new ArrayList JavaDoc();
428       ArrayList JavaDoc ackedMessages = new ArrayList JavaDoc();
429
430       public boolean isReadOnly()
431       {
432          return sentMessages.size() == 0 && ackedMessages.size() == 0;
433       }
434       
435       public String JavaDoc toString()
436       {
437          StringBuffer JavaDoc buffer = new StringBuffer JavaDoc(100);
438          buffer.append("TxState txState=").append(txState);
439          buffer.append(" sent=").append(sentMessages);
440          buffer.append(" acks=").append(ackedMessages);
441          return buffer.toString();
442       }
443    }
444 }
445
Popular Tags