1 16 17 package org.springframework.jms.listener; 18 19 import java.util.Iterator ; 20 import java.util.LinkedList ; 21 import java.util.List ; 22 23 import javax.jms.Connection ; 24 import javax.jms.JMSException ; 25 26 import org.springframework.beans.factory.BeanNameAware; 27 import org.springframework.beans.factory.DisposableBean; 28 import org.springframework.context.Lifecycle; 29 import org.springframework.jms.JmsException; 30 import org.springframework.jms.connection.ConnectionFactoryUtils; 31 import org.springframework.jms.support.JmsUtils; 32 import org.springframework.jms.support.destination.JmsDestinationAccessor; 33 import org.springframework.util.Assert; 34 import org.springframework.util.ClassUtils; 35 36 62 public abstract class AbstractJmsListeningContainer extends JmsDestinationAccessor 63 implements Lifecycle, BeanNameAware, DisposableBean { 64 65 private String clientId; 66 67 private boolean autoStartup = true; 68 69 private String beanName; 70 71 private Connection sharedConnection; 72 73 private final Object sharedConnectionMonitor = new Object (); 74 75 private boolean active = false; 76 77 private boolean running = false; 78 79 private final List pausedTasks = new LinkedList (); 80 81 private final Object lifecycleMonitor = new Object (); 82 83 84 93 public void setClientId(String clientId) { 94 this.clientId = clientId; 95 } 96 97 101 protected String getClientId() { 102 return this.clientId; 103 } 104 105 110 public void setAutoStartup(boolean autoStartup) { 111 this.autoStartup = autoStartup; 112 } 113 114 public void setBeanName(String beanName) { 115 this.beanName = beanName; 116 } 117 118 122 protected final String getBeanName() { 123 return this.beanName; 124 } 125 126 127 130 public void afterPropertiesSet() { 131 super.afterPropertiesSet(); 132 validateConfiguration(); 133 initialize(); 134 } 135 136 140 protected void validateConfiguration() { 141 } 142 143 144 151 public void initialize() throws JmsException { 152 try { 153 synchronized (this.lifecycleMonitor) { 154 this.active = true; 155 this.lifecycleMonitor.notifyAll(); 156 } 157 158 if (sharedConnectionEnabled()) { 159 establishSharedConnection(); 160 } 161 162 if (this.autoStartup) { 163 doStart(); 164 } 165 166 doInitialize(); 167 } 168 catch (JMSException ex) { 169 synchronized (this.sharedConnectionMonitor) { 170 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup); 171 } 172 throw convertJmsAccessException(ex); 173 } 174 } 175 176 185 protected void establishSharedConnection() throws JMSException { 186 refreshSharedConnection(); 187 } 188 189 195 protected final void refreshSharedConnection() throws JMSException { 196 boolean running = isRunning(); 197 synchronized (this.sharedConnectionMonitor) { 198 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), running); 199 this.sharedConnection = null; 200 Connection con = createConnection(); 201 try { 202 prepareSharedConnection(con); 203 } 204 catch (JMSException ex) { 205 JmsUtils.closeConnection(con); 206 throw ex; 207 } 208 this.sharedConnection = con; 209 } 210 } 211 212 221 protected void prepareSharedConnection(Connection connection) throws JMSException { 222 String clientId = getClientId(); 223 if (clientId != null) { 224 connection.setClientID(clientId); 225 } 226 } 227 228 236 protected final Connection getSharedConnection() { 237 if (!sharedConnectionEnabled()) { 238 throw new IllegalStateException ( 239 "This listener container does not maintain a shared Connection"); 240 } 241 synchronized (this.sharedConnectionMonitor) { 242 if (this.sharedConnection == null) { 243 throw new SharedConnectionNotInitializedException( 244 "This listener container's shared Connection has not been initialized yet"); 245 } 246 return this.sharedConnection; 247 } 248 } 249 250 251 255 public void destroy() { 256 shutdown(); 257 } 258 259 264 public void shutdown() throws JmsException { 265 logger.debug("Shutting down JMS listener container"); 266 boolean wasRunning = false; 267 synchronized (this.lifecycleMonitor) { 268 wasRunning = this.running; 269 this.running = false; 270 this.active = false; 271 this.lifecycleMonitor.notifyAll(); 272 } 273 274 if (wasRunning && sharedConnectionEnabled()) { 276 try { 277 stopSharedConnection(); 278 } 279 catch (Throwable ex) { 280 logger.debug("Could not stop JMS Connection on shutdown", ex); 281 } 282 } 283 284 try { 286 doShutdown(); 287 } 288 catch (JMSException ex) { 289 throw convertJmsAccessException(ex); 290 } 291 finally { 292 if (sharedConnectionEnabled()) { 293 synchronized (this.sharedConnectionMonitor) { 294 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), false); 295 } 296 } 297 } 298 } 299 300 304 public final boolean isActive() { 305 synchronized (this.lifecycleMonitor) { 306 return this.active; 307 } 308 } 309 310 311 315 320 public void start() throws JmsException { 321 try { 322 doStart(); 323 } 324 catch (JMSException ex) { 325 throw convertJmsAccessException(ex); 326 } 327 } 328 329 334 protected void doStart() throws JMSException { 335 synchronized (this.lifecycleMonitor) { 336 this.running = true; 337 this.lifecycleMonitor.notifyAll(); 338 for (Iterator it = this.pausedTasks.iterator(); it.hasNext();) { 339 doRescheduleTask(it.next()); 340 it.remove(); 341 } 342 } 343 344 if (sharedConnectionEnabled()) { 345 startSharedConnection(); 346 } 347 } 348 349 354 protected void startSharedConnection() throws JMSException { 355 synchronized (this.sharedConnectionMonitor) { 356 if (this.sharedConnection != null) { 357 try { 358 this.sharedConnection.start(); 359 } 360 catch (javax.jms.IllegalStateException ex) { 361 logger.debug("Ignoring Connection start exception - assuming already started", ex); 362 } 363 } 364 } 365 } 366 367 372 public void stop() throws JmsException { 373 try { 374 doStop(); 375 } 376 catch (JMSException ex) { 377 throw convertJmsAccessException(ex); 378 } 379 } 380 381 386 protected void doStop() throws JMSException { 387 synchronized (this.lifecycleMonitor) { 388 this.running = false; 389 this.lifecycleMonitor.notifyAll(); 390 } 391 392 if (sharedConnectionEnabled()) { 393 stopSharedConnection(); 394 } 395 } 396 397 402 protected void stopSharedConnection() throws JMSException { 403 synchronized (this.sharedConnectionMonitor) { 404 if (this.sharedConnection != null) { 405 try { 406 this.sharedConnection.stop(); 407 } 408 catch (javax.jms.IllegalStateException ex) { 409 logger.debug("Ignoring Connection stop exception - assuming already stopped", ex); 410 } 411 } 412 } 413 } 414 415 419 public final boolean isRunning() { 420 synchronized (this.lifecycleMonitor) { 421 return this.running; 422 } 423 } 424 425 430 protected final void waitWhileNotRunning() { 431 synchronized (this.lifecycleMonitor) { 432 while (this.active && !this.running) { 433 try { 434 this.lifecycleMonitor.wait(); 435 } 436 catch (InterruptedException ex) { 437 Thread.currentThread().interrupt(); 439 } 440 } 441 } 442 } 443 444 455 protected final boolean rescheduleTaskIfNecessary(Object task) { 456 Assert.notNull(task, "Task object must not be null"); 457 synchronized (this.lifecycleMonitor) { 458 if (this.running) { 459 doRescheduleTask(task); 460 return true; 461 } 462 else if (this.active) { 463 this.pausedTasks.add(task); 464 return true; 465 } 466 else { 467 return false; 468 } 469 } 470 } 471 472 480 protected void doRescheduleTask(Object task) { 481 throw new UnsupportedOperationException ( 482 ClassUtils.getShortName(getClass()) + " does not support rescheduling of tasks"); 483 } 484 485 486 490 495 protected abstract boolean sharedConnectionEnabled(); 496 497 506 protected abstract void doInitialize() throws JMSException ; 507 508 517 protected abstract void doShutdown() throws JMSException ; 518 519 520 525 public static class SharedConnectionNotInitializedException extends RuntimeException { 526 527 531 protected SharedConnectionNotInitializedException(String msg) { 532 super(msg); 533 } 534 } 535 536 } 537 | Popular Tags |