KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > jdbc > adapter > DefaultJDBCAdapter


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.jdbc.adapter;
16
17 import java.io.IOException JavaDoc;
18 import java.sql.PreparedStatement JavaDoc;
19 import java.sql.ResultSet JavaDoc;
20 import java.sql.SQLException JavaDoc;
21 import java.sql.Statement JavaDoc;
22 import java.util.ArrayList JavaDoc;
23 import java.util.HashSet JavaDoc;
24 import java.util.Set JavaDoc;
25 import org.apache.activemq.command.ActiveMQDestination;
26 import org.apache.activemq.command.MessageId;
27 import org.apache.activemq.command.SubscriptionInfo;
28 import org.apache.activemq.store.jdbc.JDBCAdapter;
29 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
30 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
31 import org.apache.activemq.store.jdbc.Statements;
32 import org.apache.activemq.store.jdbc.TransactionContext;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35
36 /**
37  * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
38  * encouraged to override the default implementation of methods to account for differences in JDBC Driver
39  * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
40  * The databases/JDBC drivers that use this adapter are:
41  * <ul>
42  * <li></li>
43  * </ul>
44  *
45  * @org.apache.xbean.XBean element="defaultJDBCAdapter"
46  *
47  * @version $Revision: 1.10 $
48  */

49 public class DefaultJDBCAdapter implements JDBCAdapter{
50
51     private static final Log log=LogFactory.getLog(DefaultJDBCAdapter.class);
52     protected Statements statements;
53     protected boolean batchStatments=true;
54
55     protected void setBinaryData(PreparedStatement JavaDoc s,int index,byte data[]) throws SQLException JavaDoc{
56         s.setBytes(index,data);
57     }
58
59     protected byte[] getBinaryData(ResultSet JavaDoc rs,int index) throws SQLException JavaDoc{
60         return rs.getBytes(index);
61     }
62
63     public void doCreateTables(TransactionContext c) throws SQLException JavaDoc,IOException JavaDoc{
64         Statement JavaDoc s=null;
65         try{
66             // Check to see if the table already exists. If it does, then don't log warnings during startup.
67
// Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version
68
// of the table
69
boolean alreadyExists=false;
70             ResultSet JavaDoc rs=null;
71             try{
72                 rs=c.getConnection().getMetaData().getTables(null,null,statements.getFullMessageTableName(),
73                         new String JavaDoc[] { "TABLE" });
74                 alreadyExists=rs.next();
75             }catch(Throwable JavaDoc ignore){
76             }finally{
77                 close(rs);
78             }
79             s=c.getConnection().createStatement();
80             String JavaDoc[] createStatments=statements.getCreateSchemaStatements();
81             for(int i=0;i<createStatments.length;i++){
82                 // This will fail usually since the tables will be
83
// created already.
84
try{
85                     log.debug("Executing SQL: "+createStatments[i]);
86                     boolean rc=s.execute(createStatments[i]);
87                 }catch(SQLException JavaDoc e){
88                     if(alreadyExists){
89                         log.debug("Could not create JDBC tables; The message table already existed."+" Failure was: "
90                                 +createStatments[i]+" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()
91                                 +" Vendor code: "+e.getErrorCode());
92                     }else{
93                         log.warn("Could not create JDBC tables; they could already exist."+" Failure was: "
94                                 +createStatments[i]+" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()
95                                 +" Vendor code: "+e.getErrorCode());
96                         JDBCPersistenceAdapter.log("Failure details: ",e);
97                     }
98                 }
99             }
100             c.getConnection().commit();
101         }finally{
102             try{
103                 s.close();
104             }catch(Throwable JavaDoc e){
105             }
106         }
107     }
108
109     public void doDropTables(TransactionContext c) throws SQLException JavaDoc,IOException JavaDoc{
110         Statement JavaDoc s=null;
111         try{
112             s=c.getConnection().createStatement();
113             String JavaDoc[] dropStatments=statements.getDropSchemaStatements();
114             for(int i=0;i<dropStatments.length;i++){
115                 // This will fail usually since the tables will be
116
// created already.
117
try{
118                     boolean rc=s.execute(dropStatments[i]);
119                 }catch(SQLException JavaDoc e){
120                     log.warn("Could not drop JDBC tables; they may not exist."+" Failure was: "+dropStatments[i]
121                             +" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()+" Vendor code: "
122                             +e.getErrorCode());
123                     JDBCPersistenceAdapter.log("Failure details: ",e);
124                 }
125             }
126             c.getConnection().commit();
127         }finally{
128             try{
129                 s.close();
130             }catch(Throwable JavaDoc e){
131             }
132         }
133     }
134
135     public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException JavaDoc,IOException JavaDoc{
136         PreparedStatement JavaDoc s=null;
137         ResultSet JavaDoc rs=null;
138         try{
139             s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
140             rs=s.executeQuery();
141             long seq1=0;
142             if(rs.next()){
143                 seq1=rs.getLong(1);
144             }
145             rs.close();
146             s.close();
147             s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement());
148             rs=s.executeQuery();
149             long seq2=0;
150             if(rs.next()){
151                 seq2=rs.getLong(1);
152             }
153             return Math.max(seq1,seq2);
154         }finally{
155             close(rs);
156             close(s);
157         }
158     }
159
160     public void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination,byte[] data,
161             long expiration) throws SQLException JavaDoc,IOException JavaDoc{
162         PreparedStatement JavaDoc s=c.getAddMessageStatement();
163         try{
164             if(s==null){
165                 s=c.getConnection().prepareStatement(statements.getAddMessageStatement());
166                 if(batchStatments){
167                     c.setAddMessageStatement(s);
168                 }
169             }
170             s.setLong(1,messageID.getBrokerSequenceId());
171             s.setString(2,messageID.getProducerId().toString());
172             s.setLong(3,messageID.getProducerSequenceId());
173             s.setString(4,destination.getQualifiedName());
174             s.setLong(5,expiration);
175             setBinaryData(s,6,data);
176             if(batchStatments){
177                 s.addBatch();
178             }else if(s.executeUpdate()!=1){
179                 throw new SQLException JavaDoc("Failed add a message");
180             }
181         }finally{
182             if(!batchStatments){
183                 s.close();
184             }
185         }
186     }
187
188     public void doAddMessageReference(TransactionContext c,MessageId messageID,ActiveMQDestination destination,
189             long expirationTime,String JavaDoc messageRef) throws SQLException JavaDoc,IOException JavaDoc{
190         PreparedStatement JavaDoc s=c.getAddMessageStatement();
191         try{
192             if(s==null){
193                 s=c.getConnection().prepareStatement(statements.getAddMessageStatement());
194                 if(batchStatments){
195                     c.setAddMessageStatement(s);
196                 }
197             }
198             s.setLong(1,messageID.getBrokerSequenceId());
199             s.setString(2,messageID.getProducerId().toString());
200             s.setLong(3,messageID.getProducerSequenceId());
201             s.setString(4,destination.getQualifiedName());
202             s.setLong(5,expirationTime);
203             s.setString(6,messageRef);
204             if(batchStatments){
205                 s.addBatch();
206             }else if(s.executeUpdate()!=1){
207                 throw new SQLException JavaDoc("Failed add a message");
208             }
209         }finally{
210             if(!batchStatments){
211                 s.close();
212             }
213         }
214     }
215
216     public long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException JavaDoc,IOException JavaDoc{
217         PreparedStatement JavaDoc s=null;
218         ResultSet JavaDoc rs=null;
219         try{
220             s=c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement());
221             s.setString(1,messageID.getProducerId().toString());
222             s.setLong(2,messageID.getProducerSequenceId());
223             rs=s.executeQuery();
224             if(!rs.next()){
225                 return 0;
226             }
227             return rs.getLong(1);
228         }finally{
229             close(rs);
230             close(s);
231         }
232     }
233
234     public byte[] doGetMessage(TransactionContext c,long seq) throws SQLException JavaDoc,IOException JavaDoc{
235         PreparedStatement JavaDoc s=null;
236         ResultSet JavaDoc rs=null;
237         try{
238             s=c.getConnection().prepareStatement(statements.getFindMessageStatement());
239             s.setLong(1,seq);
240             rs=s.executeQuery();
241             if(!rs.next()){
242                 return null;
243             }
244             return getBinaryData(rs,1);
245         }finally{
246             close(rs);
247             close(s);
248         }
249     }
250
251     public String JavaDoc doGetMessageReference(TransactionContext c,long seq) throws SQLException JavaDoc,IOException JavaDoc{
252         PreparedStatement JavaDoc s=null;
253         ResultSet JavaDoc rs=null;
254         try{
255             s=c.getConnection().prepareStatement(statements.getFindMessageStatement());
256             s.setLong(1,seq);
257             rs=s.executeQuery();
258             if(!rs.next()){
259                 return null;
260             }
261             return rs.getString(1);
262         }finally{
263             close(rs);
264             close(s);
265         }
266     }
267
268     public void doRemoveMessage(TransactionContext c,long seq) throws SQLException JavaDoc,IOException JavaDoc{
269         PreparedStatement JavaDoc s=c.getRemovedMessageStatement();
270         try{
271             if(s==null){
272                 s=c.getConnection().prepareStatement(statements.getRemoveMessageStatment());
273                 if(batchStatments){
274                     c.setRemovedMessageStatement(s);
275                 }
276             }
277             s.setLong(1,seq);
278             if(batchStatments){
279                 s.addBatch();
280             }else if(s.executeUpdate()!=1){
281                 throw new SQLException JavaDoc("Failed to remove message");
282             }
283         }finally{
284             if(!batchStatments){
285                 s.close();
286             }
287         }
288     }
289
290     public void doRecover(TransactionContext c,ActiveMQDestination destination,JDBCMessageRecoveryListener listener)
291             throws Exception JavaDoc{
292         PreparedStatement JavaDoc s=null;
293         ResultSet JavaDoc rs=null;
294         try{
295             s=c.getConnection().prepareStatement(statements.getFindAllMessagesStatement());
296             s.setString(1,destination.getQualifiedName());
297             rs=s.executeQuery();
298             if(statements.isUseExternalMessageReferences()){
299                 while(rs.next()){
300                     listener.recoverMessageReference(rs.getString(2));
301                 }
302             }else{
303                 while(rs.next()){
304                     listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
305                 }
306             }
307         }finally{
308             close(rs);
309             close(s);
310             listener.finished();
311         }
312     }
313
314     public void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String JavaDoc clientId,
315             String JavaDoc subscriptionName,long seq) throws SQLException JavaDoc,IOException JavaDoc{
316         PreparedStatement JavaDoc s=c.getAddMessageStatement();
317         try{
318             if(s==null){
319                 s=c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement());
320                 if(batchStatments){
321                     c.setUpdateLastAckStatement(s);
322                 }
323             }
324             s.setLong(1,seq);
325             s.setString(2,destination.getQualifiedName());
326             s.setString(3,clientId);
327             s.setString(4,subscriptionName);
328             if(batchStatments){
329                 s.addBatch();
330             }else if(s.executeUpdate()!=1){
331                 throw new SQLException JavaDoc("Failed add a message");
332             }
333         }finally{
334             if(!batchStatments){
335                 s.close();
336             }
337         }
338     }
339
340     public void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String JavaDoc clientId,
341             String JavaDoc subscriptionName,JDBCMessageRecoveryListener listener) throws Exception JavaDoc{
342         // dumpTables(c, destination.getQualifiedName(),clientId,subscriptionName);
343
PreparedStatement JavaDoc s=null;
344         ResultSet JavaDoc rs=null;
345         try{
346             s=c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement());
347             s.setString(1,destination.getQualifiedName());
348             s.setString(2,clientId);
349             s.setString(3,subscriptionName);
350             rs=s.executeQuery();
351             if(statements.isUseExternalMessageReferences()){
352                 while(rs.next()){
353                     listener.recoverMessageReference(rs.getString(2));
354                 }
355             }else{
356                 while(rs.next()){
357                     listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
358                 }
359             }
360         }finally{
361             close(rs);
362             close(s);
363             listener.finished();
364         }
365     }
366
367     public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String JavaDoc clientId,
368             String JavaDoc subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception JavaDoc{
369         PreparedStatement JavaDoc s=null;
370         ResultSet JavaDoc rs=null;
371         try{
372             s=c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
373             s.setMaxRows(maxReturned);
374             s.setString(1,destination.getQualifiedName());
375             s.setString(2,clientId);
376             s.setString(3,subscriptionName);
377             s.setLong(4,seq);
378             rs=s.executeQuery();
379             int count=0;
380             if(statements.isUseExternalMessageReferences()){
381                 while(rs.next()&&count<maxReturned){
382                     listener.recoverMessageReference(rs.getString(1));
383                     count++;
384                 }
385             }else{
386                 while(rs.next()&&count<maxReturned){
387                     listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
388                     count++;
389                 }
390             }
391         }finally{
392             close(rs);
393             close(s);
394             listener.finished();
395         }
396     }
397
398     public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String JavaDoc clientId,
399             String JavaDoc subscriptionName) throws SQLException JavaDoc,IOException JavaDoc{
400         PreparedStatement JavaDoc s=null;
401         ResultSet JavaDoc rs=null;
402         int result=0;
403         try{
404             s=c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement());
405             s.setString(1,destination.getQualifiedName());
406             s.setString(2,clientId);
407             s.setString(3,subscriptionName);
408             rs=s.executeQuery();
409             if(rs.next()){
410                 result=rs.getInt(1);
411             }
412         }finally{
413             close(rs);
414             close(s);
415         }
416         return result;
417     }
418
419     /**
420      * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
421      * org.apache.activemq.service.SubscriptionInfo)
422      */

423     public void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String JavaDoc clientId,
424             String JavaDoc subscriptionName,String JavaDoc selector,boolean retroactive) throws SQLException JavaDoc,IOException JavaDoc{
425         // dumpTables(c, destination.getQualifiedName(), clientId, subscriptionName);
426
PreparedStatement JavaDoc s=null;
427         try{
428             long lastMessageId=-1;
429             if(!retroactive){
430                 s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
431                 ResultSet JavaDoc rs=null;
432                 try{
433                     rs=s.executeQuery();
434                     if(rs.next()){
435                         lastMessageId=rs.getLong(1);
436                     }
437                 }finally{
438                     close(rs);
439                     close(s);
440                 }
441             }
442             s=c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
443             s.setString(1,destination.getQualifiedName());
444             s.setString(2,clientId);
445             s.setString(3,subscriptionName);
446             s.setString(4,selector);
447             s.setLong(5,lastMessageId);
448             if(s.executeUpdate()!=1){
449                 throw new IOException JavaDoc("Could not create durable subscription for: "+clientId);
450             }
451         }finally{
452             close(s);
453         }
454     }
455
456     public SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String JavaDoc clientId,
457             String JavaDoc subscriptionName) throws SQLException JavaDoc,IOException JavaDoc{
458         PreparedStatement JavaDoc s=null;
459         ResultSet JavaDoc rs=null;
460         try{
461             s=c.getConnection().prepareStatement(statements.getFindDurableSubStatement());
462             s.setString(1,destination.getQualifiedName());
463             s.setString(2,clientId);
464             s.setString(3,subscriptionName);
465             rs=s.executeQuery();
466             if(!rs.next()){
467                 return null;
468             }
469             SubscriptionInfo subscription=new SubscriptionInfo();
470             subscription.setDestination(destination);
471             subscription.setClientId(clientId);
472             subscription.setSubcriptionName(subscriptionName);
473             subscription.setSelector(rs.getString(1));
474             return subscription;
475         }finally{
476             close(rs);
477             close(s);
478         }
479     }
480
481     public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination)
482             throws SQLException JavaDoc,IOException JavaDoc{
483         PreparedStatement JavaDoc s=null;
484         ResultSet JavaDoc rs=null;
485         try{
486             s=c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement());
487             s.setString(1,destination.getQualifiedName());
488             rs=s.executeQuery();
489             ArrayList JavaDoc rc=new ArrayList JavaDoc();
490             while(rs.next()){
491                 SubscriptionInfo subscription=new SubscriptionInfo();
492                 subscription.setDestination(destination);
493                 subscription.setSelector(rs.getString(1));
494                 subscription.setSubcriptionName(rs.getString(2));
495                 subscription.setClientId(rs.getString(3));
496                 rc.add(subscription);
497             }
498             return (SubscriptionInfo[])rc.toArray(new SubscriptionInfo[rc.size()]);
499         }finally{
500             close(rs);
501             close(s);
502         }
503     }
504
505     public void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName) throws SQLException JavaDoc,
506             IOException JavaDoc{
507         PreparedStatement JavaDoc s=null;
508         try{
509             s=c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement());
510             s.setString(1,destinationName.getQualifiedName());
511             s.executeUpdate();
512             s.close();
513             s=c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement());
514             s.setString(1,destinationName.getQualifiedName());
515             s.executeUpdate();
516         }finally{
517             close(s);
518         }
519     }
520
521     public void doDeleteSubscription(TransactionContext c,ActiveMQDestination destination,String JavaDoc clientId,
522             String JavaDoc subscriptionName) throws SQLException JavaDoc,IOException JavaDoc{
523         PreparedStatement JavaDoc s=null;
524         try{
525             s=c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement());
526             s.setString(1,destination.getQualifiedName());
527             s.setString(2,clientId);
528             s.setString(3,subscriptionName);
529             s.executeUpdate();
530         }finally{
531             close(s);
532         }
533     }
534
535     public void doDeleteOldMessages(TransactionContext c) throws SQLException JavaDoc,IOException JavaDoc{
536         PreparedStatement JavaDoc s=null;
537         try{
538             log.debug("Executing SQL: "+statements.getDeleteOldMessagesStatement());
539             s=c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement());
540             s.setLong(1,System.currentTimeMillis());
541             int i=s.executeUpdate();
542             log.debug("Deleted "+i+" old message(s).");
543         }finally{
544             close(s);
545         }
546     }
547     
548     public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination destination,String JavaDoc clientId, String JavaDoc subscriberName) throws SQLException JavaDoc,IOException JavaDoc{
549         PreparedStatement JavaDoc s=null;
550         ResultSet JavaDoc rs=null;
551         long result = -1;
552         try{
553             s=c.getConnection().prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement());
554             s.setString(1,destination.getQualifiedName());
555             s.setString(2,clientId);
556             s.setString(3,subscriberName);
557             rs=s.executeQuery();
558             if(rs.next()){
559                 result=rs.getLong(1);
560             }
561             rs.close();
562             s.close();
563         }finally{
564             close(rs);
565             close(s);
566         }
567         return result;
568     }
569
570     static private void close(PreparedStatement JavaDoc s){
571         try{
572             s.close();
573         }catch(Throwable JavaDoc e){
574         }
575     }
576
577     static private void close(ResultSet JavaDoc rs){
578         try{
579             rs.close();
580         }catch(Throwable JavaDoc e){
581         }
582     }
583
584     public Set JavaDoc doGetDestinations(TransactionContext c) throws SQLException JavaDoc,IOException JavaDoc{
585         HashSet JavaDoc rc=new HashSet JavaDoc();
586         PreparedStatement JavaDoc s=null;
587         ResultSet JavaDoc rs=null;
588         try{
589             s=c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement());
590             rs=s.executeQuery();
591             while(rs.next()){
592                 rc.add(ActiveMQDestination.createDestination(rs.getString(1),ActiveMQDestination.QUEUE_TYPE));
593             }
594         }finally{
595             close(rs);
596             close(s);
597         }
598         return rc;
599     }
600
601     public boolean isBatchStatments(){
602         return batchStatments;
603     }
604
605     public void setBatchStatments(boolean batchStatments){
606         this.batchStatments=batchStatments;
607     }
608
609     public void setUseExternalMessageReferences(boolean useExternalMessageReferences){
610         statements.setUseExternalMessageReferences(useExternalMessageReferences);
611     }
612
613     public Statements getStatements(){
614         return statements;
615     }
616
617     public void setStatements(Statements statements){
618         this.statements=statements;
619     }
620
621     public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,ActiveMQDestination destination,
622             String JavaDoc clientId,String JavaDoc subscriberName) throws SQLException JavaDoc,IOException JavaDoc{
623         PreparedStatement JavaDoc s=null;
624         ResultSet JavaDoc rs=null;
625         try{
626             s=c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
627             s.setString(1,destination.getQualifiedName());
628             s.setString(2,clientId);
629             s.setString(3,subscriberName);
630             rs=s.executeQuery();
631             if(!rs.next()){
632                 return null;
633             }
634             return getBinaryData(rs,1);
635         }finally{
636             close(rs);
637             close(s);
638         }
639     }
640    
641     public int doGetMessageCount(TransactionContext c,ActiveMQDestination destination) throws SQLException JavaDoc, IOException JavaDoc{
642         PreparedStatement JavaDoc s=null;
643         ResultSet JavaDoc rs=null;
644         int result=0;
645         try{
646             s=c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement());
647             s.setString(1,destination.getQualifiedName());
648             rs=s.executeQuery();
649             if(rs.next()){
650                 result=rs.getInt(1);
651             }
652         }finally{
653             close(rs);
654             close(s);
655         }
656         return result;
657     }
658
659     
660     public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception JavaDoc{
661         PreparedStatement JavaDoc s=null;
662         ResultSet JavaDoc rs=null;
663         try{
664             s=c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
665             s.setMaxRows(maxReturned);
666             s.setString(1,destination.getQualifiedName());
667             s.setLong(2,nextSeq);
668             rs=s.executeQuery();
669             int count=0;
670             if(statements.isUseExternalMessageReferences()){
671                 while(rs.next()&&count<maxReturned){
672                     listener.recoverMessageReference(rs.getString(1));
673                     count++;
674                 }
675             }else{
676                 while(rs.next()&&count<maxReturned){
677                     listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
678                     count++;
679                 }
680             }
681         }catch(Exception JavaDoc e) {
682             e.printStackTrace();
683         }finally {
684             close(rs);
685             close(s);
686             listener.finished();
687         }
688         
689     }
690     /*
691      * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
692      * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
693      * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
694      * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
695      * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
696      * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
697      * printQuery(s,System.out); }
698      *
699      * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
700      * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
701      *
702      * private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
703      * printQuery(c.prepareStatement(query), out); }
704      *
705      * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
706      *
707      * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
708      * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
709      * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
710      * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
711      * try { s.close(); } catch (Throwable ignore) {} } }
712      */

713
714     
715 }
716
Popular Tags