1 22 package org.jboss.resource.adapter.jms.inflow; 23 24 import javax.jms.Connection ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageListener ; 28 import javax.jms.ServerSession ; 29 import javax.jms.Session ; 30 import javax.jms.XAConnection ; 31 import javax.jms.XASession ; 32 import javax.resource.spi.endpoint.MessageEndpoint ; 33 import javax.resource.spi.endpoint.MessageEndpointFactory ; 34 import javax.resource.spi.work.Work ; 35 import javax.resource.spi.work.WorkEvent ; 36 import javax.resource.spi.work.WorkException ; 37 import javax.resource.spi.work.WorkListener ; 38 import javax.resource.spi.work.WorkManager ; 39 import javax.transaction.Status ; 40 import javax.transaction.Transaction ; 41 import javax.transaction.TransactionManager ; 42 import javax.transaction.xa.XAResource ; 43 44 import org.jboss.logging.Logger; 45 46 53 public class JmsServerSession implements ServerSession , MessageListener , Work , WorkListener 54 { 55 56 private static final Logger log = Logger.getLogger(JmsServerSession.class); 57 58 59 JmsServerSessionPool pool; 60 61 62 boolean transacted; 63 64 65 int acknowledge; 66 67 68 Session session; 69 70 71 XASession xaSession; 72 73 74 MessageEndpoint endpoint; 75 76 77 DLQHandler dlqHandler; 78 79 80 RuntimeErrorHandler runtimeHandler = new DefaultRuntimeErrorHandler(); 81 82 TransactionDemarcationStrategy txnStrategy; 83 84 85 90 public JmsServerSession(JmsServerSessionPool pool) 91 { 92 this.pool = pool; 93 94 } 95 96 99 public void setup() throws Exception 100 { 101 JmsActivation activation = pool.getActivation(); 102 JmsActivationSpec spec = activation.getActivationSpec(); 103 104 dlqHandler = activation.getDLQHandler(); 105 106 Connection connection = activation.getConnection(); 107 108 if (connection instanceof XAConnection && activation.isDeliveryTransacted()) 110 { 111 xaSession = ((XAConnection ) connection).createXASession(); 112 session = xaSession.getSession(); 113 } 114 else 115 { 116 transacted = spec.isSessionTransacted(); 117 acknowledge = spec.getAcknowledgeModeInt(); 118 session = connection.createSession(transacted, acknowledge); 119 } 120 121 MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory(); 123 XAResource xaResource = null; 124 125 if (activation.isDeliveryTransacted() && xaSession != null) 126 xaResource = xaSession.getXAResource(); 127 128 endpoint = endpointFactory.createEndpoint(xaResource); 129 130 session.setMessageListener(this); 132 } 133 134 137 public void teardown() 138 { 139 try 140 { 141 if (endpoint != null) 142 endpoint.release(); 143 } 144 catch (Throwable t) 145 { 146 log.debug("Error releasing endpoint " + endpoint, t); 147 } 148 149 try 150 { 151 if (xaSession != null) 152 xaSession.close(); 153 } 154 catch (Throwable t) 155 { 156 log.debug("Error releasing xaSession " + xaSession, t); 157 } 158 159 try 160 { 161 if (session != null) 162 session.close(); 163 } 164 catch (Throwable t) 165 { 166 log.debug("Error releasing session " + session, t); 167 } 168 } 169 170 public void onMessage(Message message) 171 { 172 try 173 { 174 endpoint.beforeDelivery(JmsActivation.ONMESSAGE); 175 176 try 177 { 178 if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false) 179 { 180 MessageListener listener = (MessageListener )endpoint; 181 listener.onMessage(message); 182 } 183 } 184 finally 185 { 186 endpoint.afterDelivery(); 187 188 if (dlqHandler != null) 189 dlqHandler.messageDelivered(message); 190 } 191 } 192 193 catch (Throwable t) 194 { 195 log.error("Unexpected error delivering message " + message, t); 196 197 if(txnStrategy != null) 198 txnStrategy.error(); 199 200 } 201 202 203 } 204 205 public Session getSession() throws JMSException 206 { 207 return session; 208 } 209 210 public void start() throws JMSException 211 { 212 JmsActivation activation = pool.getActivation(); 213 WorkManager workManager = activation.getWorkManager(); 214 try 215 { 216 workManager.scheduleWork(this, 0, null, this); 217 } 218 catch (WorkException e) 219 { 220 log.error("Unable to schedule work", e); 221 throw new JMSException ("Unable to schedule work: " + e.toString()); 222 } 223 } 224 225 public void run() 226 { 227 228 try 229 { 230 txnStrategy = createTransactionDemarcation(); 231 232 }catch(Throwable t) 233 { 234 log.error("Error creating transaction demarcation. Cannot continue."); 235 return; 236 } 237 238 239 try 240 { 241 session.run(); 242 } 243 catch(Throwable t) 244 { 245 if (txnStrategy != null) 246 txnStrategy.error(); 247 248 }finally 249 { 250 if(txnStrategy != null) 251 txnStrategy.end(); 252 253 txnStrategy = null; 254 } 255 256 } 257 258 private TransactionDemarcationStrategy createTransactionDemarcation() 259 { 260 return new DemarcationStrategyFactory().getStrategy(); 261 262 } 263 public void release() 264 { 265 } 266 267 public void workAccepted(WorkEvent e) 268 { 269 } 270 271 public void workCompleted(WorkEvent e) 272 { 273 pool.returnServerSession(this); 274 } 275 276 public void workRejected(WorkEvent e) 277 { 278 pool.returnServerSession(this); 279 } 280 281 282 public void workStarted(WorkEvent e) 283 { 284 } 285 286 private class DemarcationStrategyFactory 287 { 288 289 TransactionDemarcationStrategy getStrategy() 290 { 291 TransactionDemarcationStrategy current = null; 292 final JmsActivationSpec spec = pool.getActivation().getActivationSpec(); 293 final JmsActivation activation = pool.getActivation(); 294 295 if(activation.isDeliveryTransacted() && xaSession != null) 296 { 297 try 298 { 299 current = new XATransactionDemarcationStrategy(); 300 } 301 catch (Throwable t) 302 { 303 log.error(this + " error creating transaction demarcation ", t); 304 } 305 306 }else 307 { 308 309 return new LocalDemarcationStrategy(); 310 311 } 312 313 return current; 314 } 315 316 } 317 private interface TransactionDemarcationStrategy 318 { 319 void error(); 320 void end(); 321 322 } 323 324 private class LocalDemarcationStrategy implements TransactionDemarcationStrategy 325 { 326 public void end() 327 { 328 final JmsActivationSpec spec = pool.getActivation().getActivationSpec(); 329 330 if(spec.isSessionTransacted()) 331 { 332 if(session != null) 333 { 334 try 335 { 336 session.commit(); 337 } 338 catch (JMSException e) 339 { 340 log.error("Failed to commit session transaction", e); 341 } 342 } 343 } 344 } 345 346 public void error() 347 { 348 final JmsActivationSpec spec = pool.getActivation().getActivationSpec(); 349 350 if(spec.isSessionTransacted()) 351 { 352 if(session != null) 353 354 try 355 { 356 365 if(pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified()) 366 { 367 session.rollback(); 368 } 369 370 } 371 catch (JMSException e) 372 { 373 log.error("Failed to rollback session transaction", e); 374 } 375 376 } 377 } 378 379 } 380 381 private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy 382 { 383 384 boolean trace = log.isTraceEnabled(); 385 386 Transaction trans = null; 387 TransactionManager tm = pool.getActivation().getTransactionManager();; 388 389 public XATransactionDemarcationStrategy() throws Throwable 390 { 391 392 tm.begin(); 393 394 try 395 { 396 trans = tm.getTransaction(); 397 398 if (trace) 399 log.trace(JmsServerSession.this + " using tx=" + trans); 400 401 if (xaSession != null) 402 { 403 XAResource res = xaSession.getXAResource(); 404 405 if (!trans.enlistResource(res)) 406 { 407 throw new JMSException ("could not enlist resource"); 408 } 409 if (trace) 410 log.trace(JmsServerSession.this + " XAResource '" + res + "' enlisted."); 411 } 412 } 413 catch (Throwable t) 414 { 415 try 416 { 417 tm.rollback(); 418 } 419 catch (Throwable ignored) 420 { 421 log.trace(JmsServerSession.this + " ignored error rolling back after failed enlist", ignored); 422 } 423 throw t; 424 } 425 426 } 427 428 429 public void error() 430 { 431 try 433 { 434 435 if (trace) 436 log.trace(JmsServerSession.this + " using TM to mark TX for rollback tx=" + trans); 437 trans.setRollbackOnly(); 438 } 439 catch (Throwable t) 440 { 441 log.error(JmsServerSession.this + " failed to set rollback only", t); 442 } 443 444 } 445 446 public void end() 447 { 448 try 449 { 450 451 Transaction currentTx = tm.getTransaction(); 453 if (trans.equals(currentTx) == false) 454 throw new IllegalStateException ("Wrong tx association: expected " + trans + " was " + currentTx); 455 456 if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) 458 { 459 if (trace) 460 log.trace(JmsServerSession.this + " rolling back JMS transaction tx=" + trans); 461 tm.rollback(); 463 464 if (xaSession == null && pool.getActivation().isDeliveryTransacted()) 468 { 469 session.rollback(); 470 } 471 } 472 473 else if (trans.getStatus() == Status.STATUS_ACTIVE) 474 { 475 if (trace) 480 log.trace(JmsServerSession.this + " commiting the JMS transaction tx=" + trans); 481 tm.commit(); 482 483 if (xaSession == null && pool.getActivation().isDeliveryTransacted()) 486 { 487 session.commit(); 488 } 489 490 }else 491 { 492 tm.suspend(); 493 494 if (xaSession == null && pool.getActivation().isDeliveryTransacted()) 495 { 496 session.rollback(); 497 } 498 499 500 } 501 502 } 503 catch (Throwable t) 504 { 505 log.error(JmsServerSession.this + " failed to commit/rollback", t); 506 } 507 508 } 509 510 } 511 } | Popular Tags |