1 24 package org.objectweb.joram.client.jms; 25 26 import java.util.Enumeration ; 27 import java.util.Hashtable ; 28 import java.util.Vector ; 29 30 import javax.jms.JMSException ; 31 32 import javax.transaction.xa.XAException ; 33 import javax.transaction.xa.XAResource ; 34 import javax.transaction.xa.Xid ; 35 36 import org.objectweb.joram.shared.client.ProducerMessages; 37 import org.objectweb.joram.shared.client.SessAckRequest; 38 import org.objectweb.joram.shared.client.XACnxPrepare; 39 import org.objectweb.joram.shared.client.XACnxCommit; 40 import org.objectweb.joram.shared.client.XACnxRecoverReply; 41 import org.objectweb.joram.shared.client.XACnxRecoverRequest; 42 import org.objectweb.joram.shared.client.XACnxRollback; 43 44 import org.objectweb.util.monolog.api.BasicLevel; 45 import org.objectweb.joram.shared.JoramTracing; 46 47 50 public class XAResourceMngr { 51 52 public static final int STARTED = 0; 53 54 public static final int SUSPENDED = 1; 55 56 public static final int SUCCESS = 2; 57 58 public static final int ROLLBACK_ONLY = 3; 59 60 public static final int PREPARED = 4; 61 62 68 private Hashtable transactions; 69 70 71 Connection cnx; 72 73 74 Hashtable sessionTable; 75 76 77 82 public XAResourceMngr(Connection cnx) { 83 this.cnx = cnx; 84 transactions = new Hashtable (); 85 sessionTable = new Hashtable (); 86 87 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 88 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 89 " XAResourceMngr cnx = " + cnx); 90 } 91 92 99 synchronized void start(Xid xid, int flag, Session sess) 100 throws XAException { 101 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 102 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 103 " XAResourceMngr start(" + xid + 104 ", " + flag + 105 ", " + sess +")"); 106 107 sess.setTransacted(true); sessionTable.put(xid,sess); 109 110 if (flag == XAResource.TMNOFLAGS) { 112 if (transactions.containsKey(xid)) 113 throw new XAException ("Can't start transaction already known by RM."); 114 115 transactions.put(xid, new XAContext()); 116 117 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 118 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 119 "--- " 120 + this 121 + ": involved in transaction " 122 + xid.toString()); 123 } 124 else if (flag == XAResource.TMRESUME) { 126 if (! transactions.containsKey(xid)) 127 throw new XAException ("Can't resume unknown transaction."); 128 129 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 130 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 131 "--- " 132 + this 133 + ": resumes transaction " 134 + xid.toString()); 135 } 136 else if (flag == XAResource.TMJOIN) { 138 if (! transactions.containsKey(xid)) 139 throw new XAException ("Can't join unknown transaction."); 140 141 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 142 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 143 "--- " 144 + this 145 + ": joins transaction " 146 + xid.toString()); 147 } 148 else 149 throw new XAException ("Invalid flag: " + flag); 150 151 setStatus(xid, STARTED); 152 } 153 154 160 synchronized void end(Xid xid, int flag, Session sess) 161 throws XAException { 162 boolean saveResourceState = true; 163 164 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 165 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 166 "--- " 167 + this 168 + ": end(" + xid 169 + ", " + flag 170 + ", " + sess + ")"); 171 172 if (flag == XAResource.TMSUSPEND) { 173 if (getStatus(xid) != STARTED) 174 throw new XAException ("Can't suspend non started transaction."); 175 176 setStatus(xid, SUSPENDED); 177 } 178 else { 179 if (getStatus(xid) != STARTED && getStatus(xid) != SUSPENDED) 180 throw new XAException ("Can't end non active or non " 181 + "suspended transaction."); 182 183 if (getStatus(xid) == SUSPENDED) 186 saveResourceState = false; 187 188 if (flag == XAResource.TMSUCCESS) 189 setStatus(xid, SUCCESS); 190 else if (flag == XAResource.TMFAIL) 191 setStatus(xid, ROLLBACK_ONLY); 192 else 193 throw new XAException ("Invalid flag: " + flag); 194 } 195 196 if (saveResourceState) { 197 XAContext xaC = (XAContext) transactions.get(xid); 198 xaC.addSendings(sess.sendings); 199 xaC.addDeliveries(sess.deliveries); 200 } 201 202 Session session = (Session) sessionTable.get(xid); 203 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 204 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 205 "--- " 206 + this 207 + ": end(...) session=" 208 + session); 209 210 if (session != null) { 211 session.setTransacted(false); 212 sessionTable.remove(xid); 213 } 214 } 215 216 223 synchronized void prepare(Xid xid) 224 throws XAException { 225 226 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 227 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 228 "--- " 229 + this 230 + ": prepare(" + xid + ")"); 231 232 try { 233 if (getStatus(xid) == ROLLBACK_ONLY) 234 throw new XAException ("Can't prepare resource in ROLLBACK_ONLY state."); 235 236 XAContext xaC = (XAContext) transactions.get(xid); 237 238 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 239 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 240 "--- " 241 + this 242 + ": prepares transaction " 243 + xid.toString()); 244 245 Enumeration targets; 246 String target; 247 Vector pMs = new Vector (); 248 MessageAcks acks; 249 Vector sessAcks = new Vector (); 250 251 targets = xaC.sendings.keys(); 253 while (targets.hasMoreElements()) { 254 target = (String ) targets.nextElement(); 255 pMs.add(xaC.sendings.remove(target)); 256 } 257 258 targets = xaC.deliveries.keys(); 260 while (targets.hasMoreElements()) { 261 target = (String ) targets.nextElement(); 262 acks = (MessageAcks) xaC.deliveries.remove(target); 263 sessAcks.add(new SessAckRequest(target, acks.getIds(), 264 acks.getQueueMode())); 265 } 266 267 cnx.syncRequest(new XACnxPrepare(xid.getBranchQualifier(), 269 xid.getFormatId(), 270 xid.getGlobalTransactionId(), 271 pMs, 272 sessAcks)); 273 274 setStatus(xid, PREPARED); 275 276 } catch (JMSException exc) { 277 setStatus(xid, ROLLBACK_ONLY); 278 throw new XAException ("Prepare request failed: " + exc); 279 } catch (XAException exc) { 280 setStatus(xid, ROLLBACK_ONLY); 281 throw exc; 282 } 283 } 284 285 292 synchronized void commit(Xid xid) 293 throws XAException { 294 295 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 296 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 297 "--- " 298 + this 299 + ": commit(" + xid + ")"); 300 301 try { 302 if (getStatus(xid) != PREPARED) 303 throw new XAException ("Can't commit non prepared transaction."); 304 305 XAContext xaC = (XAContext) transactions.get(xid); 306 307 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 308 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 309 "--- " 310 + this 311 + ": commits transaction " 312 + xid.toString()); 313 314 cnx.syncRequest(new XACnxCommit(xid.getBranchQualifier(), 315 xid.getFormatId(), 316 xid.getGlobalTransactionId())); 317 318 transactions.remove(xid); 319 Session session = (Session) sessionTable.get(xid); 320 if (session != null) 321 session.setTransacted(false); 322 323 } catch (JMSException exc) { 324 setStatus(xid, ROLLBACK_ONLY); 325 throw new XAException ("Commit request failed: " + exc); 326 } catch (XAException exc) { 327 setStatus(xid, ROLLBACK_ONLY); 328 throw exc; 329 } 330 } 331 332 339 synchronized void rollback(Xid xid) 340 throws XAException { 341 342 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 343 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 344 "--- " 345 + this 346 + ": rollback(" + xid + ")"); 347 348 try { 349 XAContext xaC = (XAContext) transactions.get(xid); 350 351 if (xaC == null) 352 throw new XAException ("Unknown transaction."); 353 354 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 355 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 356 "--- " 357 + this 358 + ": rolls back transaction " 359 + xid.toString()); 360 361 Enumeration targets; 362 String target; 363 MessageAcks acks; 364 365 XACnxRollback rollbackRequest; 366 367 targets = xaC.deliveries.keys(); 368 369 rollbackRequest = new XACnxRollback(xid.getBranchQualifier(), 370 xid.getFormatId(), 371 xid.getGlobalTransactionId()); 372 373 while (targets.hasMoreElements()) { 374 target = (String ) targets.nextElement(); 375 acks = (MessageAcks) xaC.deliveries.remove(target); 376 rollbackRequest.add(target, acks.getIds(), acks.getQueueMode()); 377 } 378 379 cnx.syncRequest(rollbackRequest); 381 382 transactions.remove(xid); 383 Session session = (Session) sessionTable.get(xid); 384 if (session != null) { 385 session.setTransacted(false); 386 sessionTable.remove(xid); 387 } 388 } catch (JMSException exc) { 389 setStatus(xid, ROLLBACK_ONLY); 390 throw new XAException ("Rollback request failed: " + exc); 391 } catch (XAException exc) { 392 setStatus(xid, ROLLBACK_ONLY); 393 throw exc; 394 } 395 } 396 397 403 synchronized Xid [] recover(int flag) throws XAException 404 { 405 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 406 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 407 "--- " 408 + this 409 + ": recovers transactions."); 410 411 if (flag == XAResource.TMSTARTRSCAN || flag == XAResource.TMENDRSCAN) 412 throw new XAException ("Non supported recovery flag: " + flag); 413 414 try { 415 XACnxRecoverReply reply = 416 (XACnxRecoverReply) cnx.syncRequest(new XACnxRecoverRequest()); 417 418 Xid [] xids = new Xid [reply.getSize()]; 419 420 for (int i = 0; i < reply.getSize(); i++) { 421 xids[i] = new XidImpl(reply.getBranchQualifier(i), 422 reply.getFormatId(i), 423 reply.getGlobalTransactionId(i)); 424 transactions.put(xids[i], new XAContext()); 425 setStatus(xids[i], PREPARED); 426 } 427 return xids; 428 } 429 catch (Exception exc) { 430 throw new XAException ("Recovery request failed: " + exc.getMessage()); 431 } 432 } 433 434 439 private void setStatus(Xid xid, int status) throws XAException 440 { 441 XAContext xac = (XAContext) transactions.get(xid); 442 443 if (xac == null) 444 throw new XAException ("Unknown transaction."); 445 446 xac.status = status; 447 } 448 449 454 private int getStatus(Xid xid) throws XAException 455 { 456 XAContext xac = (XAContext) transactions.get(xid); 457 458 if (xac == null) 459 throw new XAException ("Unknown transaction."); 460 461 return xac.status; 462 } 463 464 465 public boolean equals(Object o) { 466 if (! (o instanceof XAResourceMngr)) 467 return false; 468 469 XAResourceMngr other = (XAResourceMngr) o; 470 471 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 472 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 473 this + ": equals other = " + other.cnx + 474 ", this.cnx = " + cnx + 475 ", equals = " + cnx.equals(other.cnx)); 476 477 return cnx.equals(other.cnx); 478 } 479 } 480 481 484 class XAContext 485 { 486 487 int status; 488 495 Hashtable sendings; 496 503 Hashtable deliveries; 504 505 506 509 XAContext() 510 { 511 sendings = new Hashtable (); 512 deliveries = new Hashtable (); 513 } 514 515 516 519 void addSendings(Hashtable newSendings) { 520 String newDest; 521 ProducerMessages newPM; 522 ProducerMessages storedPM; 523 Vector msgs; 524 525 Enumeration newDests = newSendings.keys(); 527 while (newDests.hasMoreElements()) { 528 newDest = (String ) newDests.nextElement(); 529 newPM = (ProducerMessages) newSendings.remove(newDest); 530 storedPM = (ProducerMessages) sendings.get(newDest); 531 if (storedPM == null) 534 sendings.put(newDest, newPM); 535 else { 538 msgs = newPM.getMessages(); 539 for (int i = 0; i < msgs.size(); i++) 540 storedPM.addMessage(((Message) msgs.get(i)).momMsg); 541 } 542 } 543 } 544 545 548 void addDeliveries(Hashtable newDeliveries) { 549 String newName; 550 MessageAcks newAcks; 551 MessageAcks storedAcks; 552 553 Enumeration newNames = newDeliveries.keys(); 556 while (newNames.hasMoreElements()) { 557 newName = (String ) newNames.nextElement(); 558 newAcks = (MessageAcks) newDeliveries.remove(newName); 559 storedAcks = (MessageAcks) deliveries.get(newName); 560 if (storedAcks == null) 563 deliveries.put(newName, newAcks); 564 else 566 storedAcks.addIds(newAcks.getIds()); 567 } 568 } 569 } 570 | Popular Tags |