1 22 package org.jboss.mq; 23 24 import java.io.Serializable ; 25 import java.util.Map ; 26 import java.util.ArrayList ; 27 28 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; 29 30 import javax.jms.JMSException ; 31 import javax.transaction.xa.XAException ; 32 import javax.transaction.xa.XAResource ; 33 import javax.transaction.xa.Xid ; 34 35 import org.jboss.logging.Logger; 36 37 45 public class SpyXAResourceManager implements Serializable 46 { 47 48 static final long serialVersionUID = -6268132972627753772L; 49 50 private static final Logger log = Logger.getLogger(SpyXAResourceManager.class); 51 52 private static boolean trace = log.isTraceEnabled(); 53 54 private final static byte TX_OPEN = 0; 56 private final static byte TX_ENDED = 1; 57 private final static byte TX_PREPARED = 3; 58 private final static byte TX_COMMITED = 4; 59 private final static byte TX_ROLLEDBACK = 5; 60 private final static byte TX_READONLY = 6; 61 62 63 private Connection connection; 64 65 private Map transactions = new ConcurrentReaderHashMap(); 66 67 private long nextInternalXid = Long.MIN_VALUE; 68 69 74 public SpyXAResourceManager(Connection conn) 75 { 76 super(); 77 connection = conn; 78 } 79 80 87 public void ackMessage(Object xid, SpyMessage msg) throws JMSException 88 { 89 if (xid == null) 90 { 91 if (trace) 92 log.trace("No Xid, acking message " + msg.header.jmsMessageID); 93 msg.doAcknowledge(); 94 return; 95 } 96 97 if (trace) 98 log.trace("Adding acked message xid=" + xid + " " + msg.header.jmsMessageID); 99 100 TXState state = (TXState) transactions.get(xid); 101 if (state == null) 102 throw new JMSException ("Invalid transaction id."); 103 AcknowledgementRequest item = msg.getAcknowledgementRequest(true); 104 state.ackedMessages.add(item); 105 } 106 107 public void addMessage(Object xid, SpyMessage msg) throws JMSException 108 { 109 if (xid == null) 110 { 111 if (trace) 112 log.trace("No Xid, sending message to server " + msg.header.jmsMessageID); 113 connection.sendToServer(msg); 114 return; 115 } 116 117 if (trace) 118 log.trace("Adding message xid=" + xid + ", message=" + msg.header.jmsMessageID); 119 120 TXState state = (TXState) transactions.get(xid); 121 if (trace) 122 log.trace("TXState=" + state); 123 124 if (state == null) 125 throw new JMSException ("Invalid transaction id."); 126 127 state.sentMessages.add(msg); 128 } 129 130 public void commit(Object xid, boolean onePhase) throws XAException , JMSException 131 { 132 if (trace) 133 log.trace("Commiting xid=" + xid + ", onePhase=" + onePhase); 134 135 TXState state = (TXState) transactions.remove(xid); 136 if (state == null) 137 { 138 XAException e = new XAException ("Unknown transaction during commit " + xid); 139 e.errorCode = XAException.XAER_NOTA; 140 throw e; 141 } 142 143 if (onePhase) 144 { 145 if (state.isReadOnly()) 146 { 147 if (trace) 148 log.trace("Nothing to do for " + xid); 149 } 150 151 TransactionRequest transaction = new TransactionRequest(); 152 transaction.requestType = TransactionRequest.ONE_PHASE_COMMIT_REQUEST; 153 transaction.xid = null; 154 if (state.sentMessages.size() != 0) 155 { 156 SpyMessage job[] = new SpyMessage[state.sentMessages.size()]; 157 job = (SpyMessage[]) state.sentMessages.toArray(job); 158 transaction.messages = job; 159 } 160 if (state.ackedMessages.size() != 0) 161 { 162 AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()]; 163 job = (AcknowledgementRequest[]) state.ackedMessages.toArray(job); 164 transaction.acks = job; 165 } 166 connection.send(transaction); 167 } 168 else 169 { 170 if (state.txState == TX_READONLY) 171 { 172 if (trace) 173 log.trace("Nothing to do for " + xid); 174 return; 175 } 176 if (state.txState != TX_PREPARED) 177 { 178 XAException e = new XAException ("Cannot complete 2 phase commit, the transaction has not been prepared " + xid); 179 e.errorCode = XAException.XAER_PROTO; 180 throw e; 181 } 182 TransactionRequest transaction = new TransactionRequest(); 183 transaction.xid = xid; 184 transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST; 185 connection.send(transaction); 186 } 187 state.txState = TX_COMMITED; 188 } 189 190 public void endTx(Object xid, boolean success) throws XAException 191 { 192 if (trace) 193 log.trace("Ending xid=" + xid + ", success=" + success); 194 195 TXState state = (TXState) transactions.get(xid); 196 if (state == null) 197 { 198 XAException e = new XAException ("Unknown transaction during delist " + xid); 199 e.errorCode = XAException.XAER_NOTA; 200 throw e; 201 } 202 state.txState = TX_ENDED; 203 } 204 205 public Object joinTx(Xid xid) throws XAException 206 { 207 if (trace) 208 log.trace("Joining tx xid=" + xid); 209 210 if (!transactions.containsKey(xid)) 211 { 212 XAException e = new XAException ("Unknown transaction during join " + xid); 213 e.errorCode = XAException.XAER_NOTA; 214 throw e; 215 } 216 return xid; 217 } 218 219 public int prepare(Object xid) throws XAException , JMSException 220 { 221 if (trace) 222 log.trace("Preparing xid=" + xid); 223 224 TXState state = (TXState) transactions.get(xid); 225 if (state == null) 226 { 227 XAException e = new XAException ("Unknown transaction during prepare " + xid); 228 e.errorCode = XAException.XAER_NOTA; 229 throw e; 230 } 231 232 if (state.isReadOnly()) 233 { 234 if (trace) 235 log.trace("Vote read only for " + xid); 236 state.txState = TX_READONLY; 237 return XAResource.XA_RDONLY; 238 } 239 240 TransactionRequest transaction = new TransactionRequest(); 241 transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST; 242 transaction.xid = xid; 243 if (state.sentMessages.size() != 0) 244 { 245 SpyMessage job[] = new SpyMessage[state.sentMessages.size()]; 246 job = (SpyMessage[]) state.sentMessages.toArray(job); 247 transaction.messages = job; 248 } 249 if (state.ackedMessages.size() != 0) 250 { 251 AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()]; 252 job = (AcknowledgementRequest[]) state.ackedMessages.toArray(job); 253 transaction.acks = job; 254 } 255 connection.send(transaction); 256 state.txState = TX_PREPARED; 257 return XAResource.XA_OK; 258 } 259 260 public Object resumeTx(Xid xid) throws XAException 261 { 262 if (trace) 263 log.trace("Resuming tx xid=" + xid); 264 265 if (!transactions.containsKey(xid)) 266 { 267 XAException e = new XAException ("Unknown transaction during resume " + xid); 268 e.errorCode = XAException.XAER_NOTA; 269 throw e; 270 } 271 return xid; 272 } 273 274 public void rollback(Object xid) throws XAException , JMSException 275 { 276 if (trace) 277 log.trace("Rolling back xid=" + xid); 278 279 TXState state = (TXState) transactions.remove(xid); 280 if (state == null) 281 { 282 XAException e = new XAException ("Unknown transaction during rollback " + xid); 283 e.errorCode = XAException.XAER_NOTA; 284 throw e; 285 } 286 if (state.txState == TX_READONLY) 287 { 288 if (trace) 289 log.trace("Nothing to do for " + xid); 290 return; 291 } 292 if (state.txState != TX_PREPARED) 293 { 294 TransactionRequest transaction = new TransactionRequest(); 295 transaction.requestType = TransactionRequest.ONE_PHASE_COMMIT_REQUEST; 296 transaction.xid = null; 297 if (state.ackedMessages.size() != 0) 298 { 299 AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()]; 300 job = (AcknowledgementRequest[]) state.ackedMessages.toArray(job); 301 transaction.acks = job; 302 for (int i = 0; i < transaction.acks.length; i++) 304 { 305 transaction.acks[i].isAck = false; 306 } 307 } 308 connection.send(transaction); 309 } 310 else 311 { 312 TransactionRequest transaction = new TransactionRequest(); 313 transaction.xid = xid; 314 transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST; 315 connection.send(transaction); 316 } 317 state.txState = TX_ROLLEDBACK; 318 } 319 320 public Xid [] recover(int arg) throws XAException , JMSException 321 { 322 if (trace) 323 log.trace("Recover arg=" + arg); 324 325 Xid [] xids = connection.recover(arg); 326 327 for (int i = 0; i < xids.length; ++i) 329 { 330 if (transactions.containsKey(xids[i]) == false) 331 { 332 TXState state = new TXState(); 333 state.txState = TX_PREPARED; 334 transactions.put(xids[i], state); 335 } 336 } 337 return xids; 338 } 339 340 public void forget(Xid xid) throws XAException , JMSException 341 { 342 if (trace) 343 log.trace("Forget xid=" + xid); 344 345 TXState state = (TXState) transactions.get(xid); 346 if (state == null) 347 return; 348 if (state.txState != TX_PREPARED) 349 transactions.remove(xid); 350 rollback(xid); 351 } 352 353 public synchronized Long getNewXid() 354 { 355 return new Long (nextInternalXid++); 356 } 357 358 public Object startTx() 359 { 360 Long newXid = getNewXid(); 361 transactions.put(newXid, new TXState()); 362 363 if (trace) 364 log.trace("Starting tx with new xid=" + newXid); 365 366 return newXid; 367 } 368 369 public Object startTx(Xid xid) throws XAException 370 { 371 if (trace) 372 log.trace("Starting tx xid=" + xid); 373 374 if (transactions.containsKey(xid)) 375 { 376 XAException e = new XAException ("Duplicate transaction id during enlist " + xid); 377 e.errorCode = XAException.XAER_DUPID; 378 throw e; 379 } 380 transactions.put(xid, new TXState()); 381 return xid; 382 } 383 384 public Object suspendTx(Xid xid) throws XAException 385 { 386 if (trace) 387 log.trace("Suppending tx xid=" + xid); 388 389 if (!transactions.containsKey(xid)) 390 { 391 XAException e = new XAException ("Unknown transaction during suspend " + xid); 392 e.errorCode = XAException.XAER_NOTA; 393 throw e; 394 } 395 return xid; 396 } 397 398 public Object convertTx(Long anonXid, Xid xid) throws XAException 399 { 400 if (trace) 401 log.trace("Converting tx anonXid=" + anonXid + ", xid=" + xid); 402 403 if (!transactions.containsKey(anonXid)) 404 { 405 XAException e = new XAException ("Unknown transaction during convert " + anonXid); 406 e.errorCode = XAException.XAER_NOTA; 407 throw e; 408 } 409 if (transactions.containsKey(xid)) 410 { 411 XAException e = new XAException ("Duplicate transaction during convert " + xid); 412 e.errorCode = XAException.XAER_DUPID; 413 throw e; 414 } 415 TXState s = (TXState) transactions.remove(anonXid); 416 417 transactions.put(xid, s); 418 return xid; 419 } 420 421 424 static class TXState 425 { 426 byte txState = TX_OPEN; 427 ArrayList sentMessages = new ArrayList (); 428 ArrayList ackedMessages = new ArrayList (); 429 430 public boolean isReadOnly() 431 { 432 return sentMessages.size() == 0 && ackedMessages.size() == 0; 433 } 434 435 public String toString() 436 { 437 StringBuffer buffer = new StringBuffer (100); 438 buffer.append("TxState txState=").append(txState); 439 buffer.append(" sent=").append(sentMessages); 440 buffer.append(" acks=").append(ackedMessages); 441 return buffer.toString(); 442 } 443 } 444 } 445 | Popular Tags |