1 18 package org.apache.activemq.store.jdbc.adapter; 19 20 import java.io.IOException ; 21 import java.io.InputStream ; 22 import java.io.OutputStream ; 23 import java.sql.Blob ; 24 import java.sql.Connection ; 25 import java.sql.PreparedStatement ; 26 import java.sql.ResultSet ; 27 import java.sql.SQLException ; 28 29 import javax.jms.JMSException ; 30 31 import org.apache.activemq.store.jdbc.TransactionContext; 32 import org.apache.activemq.util.ByteArrayOutputStream; 33 34 35 53 public class BlobJDBCAdapter extends DefaultJDBCAdapter { 54 55 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException , 56 JMSException { 57 PreparedStatement s = null; 58 ResultSet rs = null; 59 try { 60 61 s = c.prepareStatement(statements.getAddMessageStatement()); 63 s.setLong(1, seq); 64 s.setString(2, destinationName); 65 s.setString(3, messageID); 66 s.setString(4, " "); 67 68 if (s.executeUpdate() != 1) 69 throw new JMSException ("Failed to broker message: " + messageID 70 + " in container."); 71 s.close(); 72 73 s = c.prepareStatement(statements.getFindMessageStatement()); 75 s.setLong(1, seq); 76 rs = s.executeQuery(); 77 if (!rs.next()) 78 throw new JMSException ("Failed to broker message: " + messageID 79 + " in container."); 80 81 Blob blob = rs.getBlob(1); 83 OutputStream stream = blob.setBinaryStream(data.length); 84 stream.write(data); 85 stream.close(); 86 s.close(); 87 88 s = c.prepareStatement(statements.getUpdateMessageStatement()); 90 s.setBlob(1, blob); 91 s.setLong(2, seq); 92 93 } catch (IOException e) { 94 throw (SQLException ) new SQLException ("BLOB could not be updated: " 95 + e).initCause(e); 96 } finally { 97 try { 98 rs.close(); 99 } catch (Throwable e) { 100 } 101 try { 102 s.close(); 103 } catch (Throwable e) { 104 } 105 } 106 } 107 108 public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException { 109 PreparedStatement s=null; ResultSet rs=null; 110 try { 111 112 s = c.getConnection().prepareStatement(statements.getFindMessageStatement()); 113 s.setLong(1, seq); 114 rs = s.executeQuery(); 115 116 if( !rs.next() ) 117 return null; 118 Blob blob = rs.getBlob(1); 119 InputStream is = blob.getBinaryStream(); 120 121 ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length()); 122 int ch; 123 while( (ch=is.read())>= 0 ) { 124 os.write(ch); 125 } 126 is.close(); 127 os.close(); 128 129 return os.toByteArray(); 130 131 } catch (IOException e) { 132 throw (SQLException ) new SQLException ("BLOB could not be updated: " 133 + e).initCause(e); 134 } finally { 135 try { rs.close(); } catch (Throwable e) {} 136 try { s.close(); } catch (Throwable e) {} 137 } 138 } 139 140 } 141 | Popular Tags |