1 18 package org.apache.activemq.state; 19 20 import java.io.IOException ; 21 import java.util.Iterator ; 22 import java.util.concurrent.ConcurrentHashMap ; 23 24 import org.apache.activemq.command.Command; 25 import org.apache.activemq.command.ConnectionId; 26 import org.apache.activemq.command.ConnectionInfo; 27 import org.apache.activemq.command.ConsumerId; 28 import org.apache.activemq.command.ConsumerInfo; 29 import org.apache.activemq.command.DestinationInfo; 30 import org.apache.activemq.command.Message; 31 import org.apache.activemq.command.MessageAck; 32 import org.apache.activemq.command.ProducerId; 33 import org.apache.activemq.command.ProducerInfo; 34 import org.apache.activemq.command.Response; 35 import org.apache.activemq.command.SessionId; 36 import org.apache.activemq.command.SessionInfo; 37 import org.apache.activemq.command.TransactionInfo; 38 import org.apache.activemq.transport.Transport; 39 import org.apache.activemq.util.IOExceptionSupport; 40 41 47 public class ConnectionStateTracker extends CommandVisitorAdapter { 48 49 private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); 50 51 private boolean trackTransactions = false; 52 53 private boolean restoreSessions=true; 54 private boolean restoreConsumers=true; 55 private boolean restoreProducers=true; 56 private boolean restoreTransaction=true; 57 58 protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap (); 59 60 private class RemoveTransactionAction implements Runnable { 61 private final TransactionInfo info; 62 public RemoveTransactionAction(TransactionInfo info) { 63 this.info = info; 64 } 65 public void run() { 66 ConnectionId connectionId = info.getConnectionId(); 67 ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); 68 cs.removeTransactionState(info.getTransactionId()); 69 } 70 } 71 72 79 public Tracked track(Command command) throws IOException { 80 try { 81 return (Tracked) command.visit(this); 82 } catch (IOException e) { 83 throw e; 84 } catch (Throwable e) { 85 throw IOExceptionSupport.create(e); 86 } 87 } 88 89 public void restore( Transport transport ) throws IOException { 90 for (Iterator iter = connectionStates.values().iterator(); iter.hasNext();) { 92 ConnectionState connectionState = (ConnectionState) iter.next(); 93 transport.oneway(connectionState.getInfo()); 94 restoreTempDestinations(transport, connectionState); 95 96 if( restoreSessions ) 97 restoreSessions(transport, connectionState); 98 99 if( restoreTransaction ) 100 restoreTransactions(transport, connectionState); 101 } 102 } 103 104 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { 105 for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) { 106 TransactionState transactionState = (TransactionState) iter.next(); 107 for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) { 108 Command command = (Command) iterator.next(); 109 transport.oneway(command); 110 } 111 } 112 } 113 114 119 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException { 120 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { 122 SessionState sessionState = (SessionState) iter2.next(); 123 transport.oneway(sessionState.getInfo()); 124 125 if( restoreProducers ) 126 restoreProducers(transport, sessionState); 127 128 if( restoreConsumers ) 129 restoreConsumers(transport, sessionState); 130 } 131 } 132 133 138 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException { 139 for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) { 141 ConsumerState consumerState = (ConsumerState) iter3.next(); 142 transport.oneway(consumerState.getInfo()); 143 } 144 } 145 146 151 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException { 152 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { 154 ProducerState producerState = (ProducerState) iter3.next(); 155 transport.oneway(producerState.getInfo()); 156 } 157 } 158 159 164 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState) throws IOException { 165 for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) { 167 transport.oneway((DestinationInfo) iter2.next()); 168 } 169 } 170 171 public Response processAddDestination(DestinationInfo info) { 172 ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId()); 173 if( cs != null && info != null && info.getDestination().isTemporary() ) { 174 cs.addTempDestination(info); 175 } 176 return TRACKED_RESPONSE_MARKER; 177 } 178 179 public Response processRemoveDestination(DestinationInfo info) { 180 ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId()); 181 if( cs != null && info != null && info.getDestination().isTemporary() ) { 182 cs.removeTempDestination(info.getDestination()); 183 } 184 return TRACKED_RESPONSE_MARKER; 185 } 186 187 188 public Response processAddProducer(ProducerInfo info){ 189 if(info!=null&&info.getProducerId()!=null){ 190 SessionId sessionId=info.getProducerId().getParentId(); 191 if(sessionId!=null){ 192 ConnectionId connectionId=sessionId.getParentId(); 193 if(connectionId!=null){ 194 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 195 if(cs!=null){ 196 SessionState ss=cs.getSessionState(sessionId); 197 if(ss!=null){ 198 ss.addProducer(info); 199 } 200 } 201 } 202 } 203 } 204 return TRACKED_RESPONSE_MARKER; 205 } 206 207 public Response processRemoveProducer(ProducerId id){ 208 if(id!=null){ 209 SessionId sessionId=id.getParentId(); 210 if(sessionId!=null){ 211 ConnectionId connectionId=sessionId.getParentId(); 212 if(connectionId!=null){ 213 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 214 if(cs!=null){ 215 SessionState ss=cs.getSessionState(sessionId); 216 if(ss!=null){ 217 ss.removeProducer(id); 218 } 219 } 220 } 221 } 222 } 223 return TRACKED_RESPONSE_MARKER; 224 } 225 226 public Response processAddConsumer(ConsumerInfo info){ 227 if(info!=null){ 228 SessionId sessionId=info.getConsumerId().getParentId(); 229 if(sessionId!=null){ 230 ConnectionId connectionId=sessionId.getParentId(); 231 if(connectionId!=null){ 232 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 233 if(cs!=null){ 234 SessionState ss=cs.getSessionState(sessionId); 235 if(ss!=null){ 236 ss.addConsumer(info); 237 } 238 } 239 } 240 } 241 } 242 return TRACKED_RESPONSE_MARKER; 243 } 244 245 public Response processRemoveConsumer(ConsumerId id){ 246 if(id!=null){ 247 SessionId sessionId=id.getParentId(); 248 if(sessionId!=null){ 249 ConnectionId connectionId=sessionId.getParentId(); 250 if(connectionId!=null){ 251 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 252 if(cs!=null){ 253 SessionState ss=cs.getSessionState(sessionId); 254 if(ss!=null){ 255 ss.removeConsumer(id); 256 } 257 } 258 } 259 } 260 } 261 return TRACKED_RESPONSE_MARKER; 262 } 263 264 public Response processAddSession(SessionInfo info){ 265 if(info!=null){ 266 ConnectionId connectionId=info.getSessionId().getParentId(); 267 if(connectionId!=null){ 268 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 269 if(cs!=null){ 270 cs.addSession(info); 271 } 272 } 273 } 274 return TRACKED_RESPONSE_MARKER; 275 } 276 277 public Response processRemoveSession(SessionId id){ 278 if(id!=null){ 279 ConnectionId connectionId=id.getParentId(); 280 if(connectionId!=null){ 281 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 282 if(cs!=null){ 283 cs.removeSession(id); 284 } 285 } 286 } 287 return TRACKED_RESPONSE_MARKER; 288 } 289 290 public Response processAddConnection(ConnectionInfo info){ 291 if (info != null) { 292 connectionStates.put(info.getConnectionId(), new ConnectionState(info)); 293 } 294 return TRACKED_RESPONSE_MARKER; 295 } 296 297 public Response processRemoveConnection(ConnectionId id) throws Exception { 298 if (id != null) { 299 connectionStates.remove(id); 300 } 301 return TRACKED_RESPONSE_MARKER; 302 } 303 304 305 public Response processMessage(Message send) throws Exception { 306 if(trackTransactions&&send!=null&&send.getTransactionId()!=null){ 307 ConnectionId connectionId=send.getProducerId().getParentId().getParentId(); 308 if(connectionId!=null){ 309 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 310 if(cs!=null){ 311 TransactionState transactionState=cs.getTransactionState(send.getTransactionId()); 312 if(transactionState!=null){ 313 transactionState.addCommand(send); 314 } 315 } 316 } 317 return TRACKED_RESPONSE_MARKER; 318 } 319 return null; 320 } 321 322 public Response processMessageAck(MessageAck ack){ 323 if(trackTransactions&&ack!=null&&ack.getTransactionId()!=null){ 324 ConnectionId connectionId=ack.getConsumerId().getParentId().getParentId(); 325 if(connectionId!=null){ 326 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 327 if(cs!=null){ 328 TransactionState transactionState=cs.getTransactionState(ack.getTransactionId()); 329 if(transactionState!=null){ 330 transactionState.addCommand(ack); 331 } 332 } 333 } 334 return TRACKED_RESPONSE_MARKER; 335 } 336 return null; 337 } 338 339 public Response processBeginTransaction(TransactionInfo info){ 340 if(trackTransactions&&info!=null && info.getTransactionId() != null){ 341 ConnectionId connectionId=info.getConnectionId(); 342 if(connectionId!=null){ 343 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 344 if(cs!=null){ 345 cs.addTransactionState(info.getTransactionId()); 346 } 347 } 348 return TRACKED_RESPONSE_MARKER; 349 } 350 return null; 351 } 352 353 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 354 if(trackTransactions&&info!=null){ 355 ConnectionId connectionId=info.getConnectionId(); 356 if(connectionId!=null){ 357 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 358 if(cs!=null){ 359 TransactionState transactionState=cs.getTransactionState(info.getTransactionId()); 360 if(transactionState!=null){ 361 transactionState.addCommand(info); 362 } 363 } 364 } 365 return TRACKED_RESPONSE_MARKER; 366 } 367 return null; 368 } 369 370 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 371 if(trackTransactions&&info!=null){ 372 ConnectionId connectionId=info.getConnectionId(); 373 if(connectionId!=null){ 374 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 375 if(cs!=null){ 376 TransactionState transactionState=cs.getTransactionState(info.getTransactionId()); 377 if(transactionState!=null){ 378 transactionState.addCommand(info); 379 return new Tracked(new RemoveTransactionAction(info)); 380 } 381 } 382 } 383 } 384 return null; 385 } 386 387 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 388 if(trackTransactions&&info!=null){ 389 ConnectionId connectionId=info.getConnectionId(); 390 if(connectionId!=null){ 391 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 392 if(cs!=null){ 393 TransactionState transactionState=cs.getTransactionState(info.getTransactionId()); 394 if(transactionState!=null){ 395 transactionState.addCommand(info); 396 return new Tracked(new RemoveTransactionAction(info)); 397 } 398 } 399 } 400 } 401 return null; 402 } 403 404 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 405 if(trackTransactions&&info!=null){ 406 ConnectionId connectionId=info.getConnectionId(); 407 if(connectionId!=null){ 408 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 409 if(cs!=null){ 410 TransactionState transactionState=cs.getTransactionState(info.getTransactionId()); 411 if(transactionState!=null){ 412 transactionState.addCommand(info); 413 return new Tracked(new RemoveTransactionAction(info)); 414 } 415 } 416 } 417 } 418 return null; 419 } 420 421 public Response processEndTransaction(TransactionInfo info) throws Exception { 422 if(trackTransactions&&info!=null){ 423 ConnectionId connectionId=info.getConnectionId(); 424 if(connectionId!=null){ 425 ConnectionState cs=(ConnectionState)connectionStates.get(connectionId); 426 if(cs!=null){ 427 TransactionState transactionState=cs.getTransactionState(info.getTransactionId()); 428 if(transactionState!=null){ 429 transactionState.addCommand(info); 430 } 431 } 432 } 433 return TRACKED_RESPONSE_MARKER; 434 } 435 return null; 436 } 437 438 public boolean isRestoreConsumers() { 439 return restoreConsumers; 440 } 441 442 public void setRestoreConsumers(boolean restoreConsumers) { 443 this.restoreConsumers = restoreConsumers; 444 } 445 446 public boolean isRestoreProducers() { 447 return restoreProducers; 448 } 449 450 public void setRestoreProducers(boolean restoreProducers) { 451 this.restoreProducers = restoreProducers; 452 } 453 454 public boolean isRestoreSessions() { 455 return restoreSessions; 456 } 457 458 public void setRestoreSessions(boolean restoreSessions) { 459 this.restoreSessions = restoreSessions; 460 } 461 462 public boolean isTrackTransactions() { 463 return trackTransactions; 464 } 465 466 public void setTrackTransactions(boolean trackTransactions) { 467 this.trackTransactions = trackTransactions; 468 } 469 470 public boolean isRestoreTransaction() { 471 return restoreTransaction; 472 } 473 474 public void setRestoreTransaction(boolean restoreTransaction) { 475 this.restoreTransaction = restoreTransaction; 476 } 477 478 } 479 | Popular Tags |