1 10 11 package org.mule.providers.jdbc; 12 13 import org.mule.MuleManager; 14 import org.mule.impl.MuleMessage; 15 import org.mule.providers.ConnectException; 16 import org.mule.providers.TransactedPollingMessageReceiver; 17 import org.mule.transaction.TransactionCoordination; 18 import org.mule.umo.UMOComponent; 19 import org.mule.umo.UMOMessage; 20 import org.mule.umo.UMOTransaction; 21 import org.mule.umo.endpoint.UMOEndpoint; 22 import org.mule.umo.lifecycle.InitialisationException; 23 import org.mule.umo.provider.UMOConnector; 24 import org.mule.umo.provider.UMOMessageAdapter; 25 26 import java.sql.Connection ; 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 import java.util.List ; 30 31 34 public class JdbcMessageReceiver extends TransactedPollingMessageReceiver 35 { 36 37 protected JdbcConnector connector; 38 protected String readStmt; 39 protected String ackStmt; 40 protected List readParams; 41 protected List ackParams; 42 43 public JdbcMessageReceiver(UMOConnector connector, 44 UMOComponent component, 45 UMOEndpoint endpoint, 46 String readStmt, 47 String ackStmt) throws InitialisationException 48 { 49 super(connector, component, endpoint, new Long (((JdbcConnector)connector).getPollingFrequency())); 50 51 this.receiveMessagesInTransaction = false; 52 this.connector = (JdbcConnector)connector; 53 54 this.readParams = new ArrayList (); 55 this.readStmt = this.connector.parseStatement(readStmt, this.readParams); 56 this.ackParams = new ArrayList (); 57 this.ackStmt = this.connector.parseStatement(ackStmt, this.ackParams); 58 } 59 60 public void doConnect() throws Exception 61 { 62 Connection con = null; 63 try 64 { 65 con = this.connector.getConnection(); 66 } 67 catch (Exception e) 68 { 69 throw new ConnectException(e, this); 70 } 71 finally 72 { 73 JdbcUtils.close(con); 74 } 75 } 76 77 public void doDisconnect() throws ConnectException 78 { 79 } 81 82 public void processMessage(Object message) throws Exception 83 { 84 Connection con = null; 85 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 86 try 87 { 88 con = this.connector.getConnection(); 89 UMOMessageAdapter msgAdapter = this.connector.getMessageAdapter(message); 90 UMOMessage umoMessage = new MuleMessage(msgAdapter); 91 if (this.ackStmt != null) 92 { 93 94 Object [] ackParams = connector.getParams(endpoint, this.ackParams, umoMessage); 95 int nbRows = connector.createQueryRunner().update(con, this.ackStmt, ackParams); 96 if (nbRows != 1) 97 { 98 logger.warn("Row count for ack should be 1 and not " + nbRows); 99 } 100 } 101 routeMessage(umoMessage, tx, tx != null || endpoint.isSynchronous()); 102 103 } 104 catch (Exception ex) 105 { 106 if (tx != null) 107 { 108 tx.setRollbackOnly(); 109 } 110 111 throw ex; 113 } 114 finally 115 { 116 if (MuleManager.getInstance().getTransactionManager() != null || tx == null) 117 { 118 JdbcUtils.close(con); 130 } 131 } 132 } 133 134 public List getMessages() throws Exception 135 { 136 Connection con = null; 137 try 138 { 139 try 140 { 141 con = this.connector.getConnection(); 142 } 143 catch (SQLException e) 144 { 145 throw new ConnectException(e, this); 146 } 147 148 Object [] readParams = connector.getParams(endpoint, this.readParams, null); 149 Object results = connector.createQueryRunner().query(con, this.readStmt, readParams, 150 connector.createResultSetHandler()); 151 return (List )results; 152 } 153 finally 154 { 155 JdbcUtils.close(con); 156 } 157 } 158 159 } 160 | Popular Tags |