1 14 15 package org.apache.activemq.store.jdbc.adapter; 16 17 import java.io.IOException ; 18 import java.sql.PreparedStatement ; 19 import java.sql.ResultSet ; 20 import java.sql.SQLException ; 21 import java.sql.Statement ; 22 import java.util.ArrayList ; 23 import java.util.HashSet ; 24 import java.util.Set ; 25 import org.apache.activemq.command.ActiveMQDestination; 26 import org.apache.activemq.command.MessageId; 27 import org.apache.activemq.command.SubscriptionInfo; 28 import org.apache.activemq.store.jdbc.JDBCAdapter; 29 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; 30 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 31 import org.apache.activemq.store.jdbc.Statements; 32 import org.apache.activemq.store.jdbc.TransactionContext; 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 36 49 public class DefaultJDBCAdapter implements JDBCAdapter{ 50 51 private static final Log log=LogFactory.getLog(DefaultJDBCAdapter.class); 52 protected Statements statements; 53 protected boolean batchStatments=true; 54 55 protected void setBinaryData(PreparedStatement s,int index,byte data[]) throws SQLException { 56 s.setBytes(index,data); 57 } 58 59 protected byte[] getBinaryData(ResultSet rs,int index) throws SQLException { 60 return rs.getBytes(index); 61 } 62 63 public void doCreateTables(TransactionContext c) throws SQLException ,IOException { 64 Statement s=null; 65 try{ 66 boolean alreadyExists=false; 70 ResultSet rs=null; 71 try{ 72 rs=c.getConnection().getMetaData().getTables(null,null,statements.getFullMessageTableName(), 73 new String [] { "TABLE" }); 74 alreadyExists=rs.next(); 75 }catch(Throwable ignore){ 76 }finally{ 77 close(rs); 78 } 79 s=c.getConnection().createStatement(); 80 String [] createStatments=statements.getCreateSchemaStatements(); 81 for(int i=0;i<createStatments.length;i++){ 82 try{ 85 log.debug("Executing SQL: "+createStatments[i]); 86 boolean rc=s.execute(createStatments[i]); 87 }catch(SQLException e){ 88 if(alreadyExists){ 89 log.debug("Could not create JDBC tables; The message table already existed."+" Failure was: " 90 +createStatments[i]+" Message: "+e.getMessage()+" SQLState: "+e.getSQLState() 91 +" Vendor code: "+e.getErrorCode()); 92 }else{ 93 log.warn("Could not create JDBC tables; they could already exist."+" Failure was: " 94 +createStatments[i]+" Message: "+e.getMessage()+" SQLState: "+e.getSQLState() 95 +" Vendor code: "+e.getErrorCode()); 96 JDBCPersistenceAdapter.log("Failure details: ",e); 97 } 98 } 99 } 100 c.getConnection().commit(); 101 }finally{ 102 try{ 103 s.close(); 104 }catch(Throwable e){ 105 } 106 } 107 } 108 109 public void doDropTables(TransactionContext c) throws SQLException ,IOException { 110 Statement s=null; 111 try{ 112 s=c.getConnection().createStatement(); 113 String [] dropStatments=statements.getDropSchemaStatements(); 114 for(int i=0;i<dropStatments.length;i++){ 115 try{ 118 boolean rc=s.execute(dropStatments[i]); 119 }catch(SQLException e){ 120 log.warn("Could not drop JDBC tables; they may not exist."+" Failure was: "+dropStatments[i] 121 +" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()+" Vendor code: " 122 +e.getErrorCode()); 123 JDBCPersistenceAdapter.log("Failure details: ",e); 124 } 125 } 126 c.getConnection().commit(); 127 }finally{ 128 try{ 129 s.close(); 130 }catch(Throwable e){ 131 } 132 } 133 } 134 135 public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException ,IOException { 136 PreparedStatement s=null; 137 ResultSet rs=null; 138 try{ 139 s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement()); 140 rs=s.executeQuery(); 141 long seq1=0; 142 if(rs.next()){ 143 seq1=rs.getLong(1); 144 } 145 rs.close(); 146 s.close(); 147 s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement()); 148 rs=s.executeQuery(); 149 long seq2=0; 150 if(rs.next()){ 151 seq2=rs.getLong(1); 152 } 153 return Math.max(seq1,seq2); 154 }finally{ 155 close(rs); 156 close(s); 157 } 158 } 159 160 public void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination,byte[] data, 161 long expiration) throws SQLException ,IOException { 162 PreparedStatement s=c.getAddMessageStatement(); 163 try{ 164 if(s==null){ 165 s=c.getConnection().prepareStatement(statements.getAddMessageStatement()); 166 if(batchStatments){ 167 c.setAddMessageStatement(s); 168 } 169 } 170 s.setLong(1,messageID.getBrokerSequenceId()); 171 s.setString(2,messageID.getProducerId().toString()); 172 s.setLong(3,messageID.getProducerSequenceId()); 173 s.setString(4,destination.getQualifiedName()); 174 s.setLong(5,expiration); 175 setBinaryData(s,6,data); 176 if(batchStatments){ 177 s.addBatch(); 178 }else if(s.executeUpdate()!=1){ 179 throw new SQLException ("Failed add a message"); 180 } 181 }finally{ 182 if(!batchStatments){ 183 s.close(); 184 } 185 } 186 } 187 188 public void doAddMessageReference(TransactionContext c,MessageId messageID,ActiveMQDestination destination, 189 long expirationTime,String messageRef) throws SQLException ,IOException { 190 PreparedStatement s=c.getAddMessageStatement(); 191 try{ 192 if(s==null){ 193 s=c.getConnection().prepareStatement(statements.getAddMessageStatement()); 194 if(batchStatments){ 195 c.setAddMessageStatement(s); 196 } 197 } 198 s.setLong(1,messageID.getBrokerSequenceId()); 199 s.setString(2,messageID.getProducerId().toString()); 200 s.setLong(3,messageID.getProducerSequenceId()); 201 s.setString(4,destination.getQualifiedName()); 202 s.setLong(5,expirationTime); 203 s.setString(6,messageRef); 204 if(batchStatments){ 205 s.addBatch(); 206 }else if(s.executeUpdate()!=1){ 207 throw new SQLException ("Failed add a message"); 208 } 209 }finally{ 210 if(!batchStatments){ 211 s.close(); 212 } 213 } 214 } 215 216 public long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException ,IOException { 217 PreparedStatement s=null; 218 ResultSet rs=null; 219 try{ 220 s=c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement()); 221 s.setString(1,messageID.getProducerId().toString()); 222 s.setLong(2,messageID.getProducerSequenceId()); 223 rs=s.executeQuery(); 224 if(!rs.next()){ 225 return 0; 226 } 227 return rs.getLong(1); 228 }finally{ 229 close(rs); 230 close(s); 231 } 232 } 233 234 public byte[] doGetMessage(TransactionContext c,long seq) throws SQLException ,IOException { 235 PreparedStatement s=null; 236 ResultSet rs=null; 237 try{ 238 s=c.getConnection().prepareStatement(statements.getFindMessageStatement()); 239 s.setLong(1,seq); 240 rs=s.executeQuery(); 241 if(!rs.next()){ 242 return null; 243 } 244 return getBinaryData(rs,1); 245 }finally{ 246 close(rs); 247 close(s); 248 } 249 } 250 251 public String doGetMessageReference(TransactionContext c,long seq) throws SQLException ,IOException { 252 PreparedStatement s=null; 253 ResultSet rs=null; 254 try{ 255 s=c.getConnection().prepareStatement(statements.getFindMessageStatement()); 256 s.setLong(1,seq); 257 rs=s.executeQuery(); 258 if(!rs.next()){ 259 return null; 260 } 261 return rs.getString(1); 262 }finally{ 263 close(rs); 264 close(s); 265 } 266 } 267 268 public void doRemoveMessage(TransactionContext c,long seq) throws SQLException ,IOException { 269 PreparedStatement s=c.getRemovedMessageStatement(); 270 try{ 271 if(s==null){ 272 s=c.getConnection().prepareStatement(statements.getRemoveMessageStatment()); 273 if(batchStatments){ 274 c.setRemovedMessageStatement(s); 275 } 276 } 277 s.setLong(1,seq); 278 if(batchStatments){ 279 s.addBatch(); 280 }else if(s.executeUpdate()!=1){ 281 throw new SQLException ("Failed to remove message"); 282 } 283 }finally{ 284 if(!batchStatments){ 285 s.close(); 286 } 287 } 288 } 289 290 public void doRecover(TransactionContext c,ActiveMQDestination destination,JDBCMessageRecoveryListener listener) 291 throws Exception { 292 PreparedStatement s=null; 293 ResultSet rs=null; 294 try{ 295 s=c.getConnection().prepareStatement(statements.getFindAllMessagesStatement()); 296 s.setString(1,destination.getQualifiedName()); 297 rs=s.executeQuery(); 298 if(statements.isUseExternalMessageReferences()){ 299 while(rs.next()){ 300 listener.recoverMessageReference(rs.getString(2)); 301 } 302 }else{ 303 while(rs.next()){ 304 listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2)); 305 } 306 } 307 }finally{ 308 close(rs); 309 close(s); 310 listener.finished(); 311 } 312 } 313 314 public void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String clientId, 315 String subscriptionName,long seq) throws SQLException ,IOException { 316 PreparedStatement s=c.getAddMessageStatement(); 317 try{ 318 if(s==null){ 319 s=c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement()); 320 if(batchStatments){ 321 c.setUpdateLastAckStatement(s); 322 } 323 } 324 s.setLong(1,seq); 325 s.setString(2,destination.getQualifiedName()); 326 s.setString(3,clientId); 327 s.setString(4,subscriptionName); 328 if(batchStatments){ 329 s.addBatch(); 330 }else if(s.executeUpdate()!=1){ 331 throw new SQLException ("Failed add a message"); 332 } 333 }finally{ 334 if(!batchStatments){ 335 s.close(); 336 } 337 } 338 } 339 340 public void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String clientId, 341 String subscriptionName,JDBCMessageRecoveryListener listener) throws Exception { 342 PreparedStatement s=null; 344 ResultSet rs=null; 345 try{ 346 s=c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement()); 347 s.setString(1,destination.getQualifiedName()); 348 s.setString(2,clientId); 349 s.setString(3,subscriptionName); 350 rs=s.executeQuery(); 351 if(statements.isUseExternalMessageReferences()){ 352 while(rs.next()){ 353 listener.recoverMessageReference(rs.getString(2)); 354 } 355 }else{ 356 while(rs.next()){ 357 listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2)); 358 } 359 } 360 }finally{ 361 close(rs); 362 close(s); 363 listener.finished(); 364 } 365 } 366 367 public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId, 368 String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception { 369 PreparedStatement s=null; 370 ResultSet rs=null; 371 try{ 372 s=c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement()); 373 s.setMaxRows(maxReturned); 374 s.setString(1,destination.getQualifiedName()); 375 s.setString(2,clientId); 376 s.setString(3,subscriptionName); 377 s.setLong(4,seq); 378 rs=s.executeQuery(); 379 int count=0; 380 if(statements.isUseExternalMessageReferences()){ 381 while(rs.next()&&count<maxReturned){ 382 listener.recoverMessageReference(rs.getString(1)); 383 count++; 384 } 385 }else{ 386 while(rs.next()&&count<maxReturned){ 387 listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2)); 388 count++; 389 } 390 } 391 }finally{ 392 close(rs); 393 close(s); 394 listener.finished(); 395 } 396 } 397 398 public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId, 399 String subscriptionName) throws SQLException ,IOException { 400 PreparedStatement s=null; 401 ResultSet rs=null; 402 int result=0; 403 try{ 404 s=c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement()); 405 s.setString(1,destination.getQualifiedName()); 406 s.setString(2,clientId); 407 s.setString(3,subscriptionName); 408 rs=s.executeQuery(); 409 if(rs.next()){ 410 result=rs.getInt(1); 411 } 412 }finally{ 413 close(rs); 414 close(s); 415 } 416 return result; 417 } 418 419 423 public void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId, 424 String subscriptionName,String selector,boolean retroactive) throws SQLException ,IOException { 425 PreparedStatement s=null; 427 try{ 428 long lastMessageId=-1; 429 if(!retroactive){ 430 s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement()); 431 ResultSet rs=null; 432 try{ 433 rs=s.executeQuery(); 434 if(rs.next()){ 435 lastMessageId=rs.getLong(1); 436 } 437 }finally{ 438 close(rs); 439 close(s); 440 } 441 } 442 s=c.getConnection().prepareStatement(statements.getCreateDurableSubStatement()); 443 s.setString(1,destination.getQualifiedName()); 444 s.setString(2,clientId); 445 s.setString(3,subscriptionName); 446 s.setString(4,selector); 447 s.setLong(5,lastMessageId); 448 if(s.executeUpdate()!=1){ 449 throw new IOException ("Could not create durable subscription for: "+clientId); 450 } 451 }finally{ 452 close(s); 453 } 454 } 455 456 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId, 457 String subscriptionName) throws SQLException ,IOException { 458 PreparedStatement s=null; 459 ResultSet rs=null; 460 try{ 461 s=c.getConnection().prepareStatement(statements.getFindDurableSubStatement()); 462 s.setString(1,destination.getQualifiedName()); 463 s.setString(2,clientId); 464 s.setString(3,subscriptionName); 465 rs=s.executeQuery(); 466 if(!rs.next()){ 467 return null; 468 } 469 SubscriptionInfo subscription=new SubscriptionInfo(); 470 subscription.setDestination(destination); 471 subscription.setClientId(clientId); 472 subscription.setSubcriptionName(subscriptionName); 473 subscription.setSelector(rs.getString(1)); 474 return subscription; 475 }finally{ 476 close(rs); 477 close(s); 478 } 479 } 480 481 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination) 482 throws SQLException ,IOException { 483 PreparedStatement s=null; 484 ResultSet rs=null; 485 try{ 486 s=c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement()); 487 s.setString(1,destination.getQualifiedName()); 488 rs=s.executeQuery(); 489 ArrayList rc=new ArrayList (); 490 while(rs.next()){ 491 SubscriptionInfo subscription=new SubscriptionInfo(); 492 subscription.setDestination(destination); 493 subscription.setSelector(rs.getString(1)); 494 subscription.setSubcriptionName(rs.getString(2)); 495 subscription.setClientId(rs.getString(3)); 496 rc.add(subscription); 497 } 498 return (SubscriptionInfo[])rc.toArray(new SubscriptionInfo[rc.size()]); 499 }finally{ 500 close(rs); 501 close(s); 502 } 503 } 504 505 public void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName) throws SQLException , 506 IOException { 507 PreparedStatement s=null; 508 try{ 509 s=c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement()); 510 s.setString(1,destinationName.getQualifiedName()); 511 s.executeUpdate(); 512 s.close(); 513 s=c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement()); 514 s.setString(1,destinationName.getQualifiedName()); 515 s.executeUpdate(); 516 }finally{ 517 close(s); 518 } 519 } 520 521 public void doDeleteSubscription(TransactionContext c,ActiveMQDestination destination,String clientId, 522 String subscriptionName) throws SQLException ,IOException { 523 PreparedStatement s=null; 524 try{ 525 s=c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement()); 526 s.setString(1,destination.getQualifiedName()); 527 s.setString(2,clientId); 528 s.setString(3,subscriptionName); 529 s.executeUpdate(); 530 }finally{ 531 close(s); 532 } 533 } 534 535 public void doDeleteOldMessages(TransactionContext c) throws SQLException ,IOException { 536 PreparedStatement s=null; 537 try{ 538 log.debug("Executing SQL: "+statements.getDeleteOldMessagesStatement()); 539 s=c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement()); 540 s.setLong(1,System.currentTimeMillis()); 541 int i=s.executeUpdate(); 542 log.debug("Deleted "+i+" old message(s)."); 543 }finally{ 544 close(s); 545 } 546 } 547 548 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination destination,String clientId, String subscriberName) throws SQLException ,IOException { 549 PreparedStatement s=null; 550 ResultSet rs=null; 551 long result = -1; 552 try{ 553 s=c.getConnection().prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement()); 554 s.setString(1,destination.getQualifiedName()); 555 s.setString(2,clientId); 556 s.setString(3,subscriberName); 557 rs=s.executeQuery(); 558 if(rs.next()){ 559 result=rs.getLong(1); 560 } 561 rs.close(); 562 s.close(); 563 }finally{ 564 close(rs); 565 close(s); 566 } 567 return result; 568 } 569 570 static private void close(PreparedStatement s){ 571 try{ 572 s.close(); 573 }catch(Throwable e){ 574 } 575 } 576 577 static private void close(ResultSet rs){ 578 try{ 579 rs.close(); 580 }catch(Throwable e){ 581 } 582 } 583 584 public Set doGetDestinations(TransactionContext c) throws SQLException ,IOException { 585 HashSet rc=new HashSet (); 586 PreparedStatement s=null; 587 ResultSet rs=null; 588 try{ 589 s=c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement()); 590 rs=s.executeQuery(); 591 while(rs.next()){ 592 rc.add(ActiveMQDestination.createDestination(rs.getString(1),ActiveMQDestination.QUEUE_TYPE)); 593 } 594 }finally{ 595 close(rs); 596 close(s); 597 } 598 return rc; 599 } 600 601 public boolean isBatchStatments(){ 602 return batchStatments; 603 } 604 605 public void setBatchStatments(boolean batchStatments){ 606 this.batchStatments=batchStatments; 607 } 608 609 public void setUseExternalMessageReferences(boolean useExternalMessageReferences){ 610 statements.setUseExternalMessageReferences(useExternalMessageReferences); 611 } 612 613 public Statements getStatements(){ 614 return statements; 615 } 616 617 public void setStatements(Statements statements){ 618 this.statements=statements; 619 } 620 621 public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,ActiveMQDestination destination, 622 String clientId,String subscriberName) throws SQLException ,IOException { 623 PreparedStatement s=null; 624 ResultSet rs=null; 625 try{ 626 s=c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement()); 627 s.setString(1,destination.getQualifiedName()); 628 s.setString(2,clientId); 629 s.setString(3,subscriberName); 630 rs=s.executeQuery(); 631 if(!rs.next()){ 632 return null; 633 } 634 return getBinaryData(rs,1); 635 }finally{ 636 close(rs); 637 close(s); 638 } 639 } 640 641 public int doGetMessageCount(TransactionContext c,ActiveMQDestination destination) throws SQLException , IOException { 642 PreparedStatement s=null; 643 ResultSet rs=null; 644 int result=0; 645 try{ 646 s=c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement()); 647 s.setString(1,destination.getQualifiedName()); 648 rs=s.executeQuery(); 649 if(rs.next()){ 650 result=rs.getInt(1); 651 } 652 }finally{ 653 close(rs); 654 close(s); 655 } 656 return result; 657 } 658 659 660 public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception { 661 PreparedStatement s=null; 662 ResultSet rs=null; 663 try{ 664 s=c.getConnection().prepareStatement(statements.getFindNextMessagesStatement()); 665 s.setMaxRows(maxReturned); 666 s.setString(1,destination.getQualifiedName()); 667 s.setLong(2,nextSeq); 668 rs=s.executeQuery(); 669 int count=0; 670 if(statements.isUseExternalMessageReferences()){ 671 while(rs.next()&&count<maxReturned){ 672 listener.recoverMessageReference(rs.getString(1)); 673 count++; 674 } 675 }else{ 676 while(rs.next()&&count<maxReturned){ 677 listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2)); 678 count++; 679 } 680 } 681 }catch(Exception e) { 682 e.printStackTrace(); 683 }finally { 684 close(rs); 685 close(s); 686 listener.finished(); 687 } 688 689 } 690 713 714 715 } 716 | Popular Tags |