1 10 11 package org.mule.providers.jdbc; 12 13 import org.apache.commons.lang.StringUtils; 14 import org.mule.config.i18n.Message; 15 import org.mule.config.i18n.Messages; 16 import org.mule.impl.MuleMessage; 17 import org.mule.providers.AbstractMessageDispatcher; 18 import org.mule.transaction.TransactionCoordination; 19 import org.mule.umo.UMOEvent; 20 import org.mule.umo.UMOException; 21 import org.mule.umo.UMOMessage; 22 import org.mule.umo.UMOTransaction; 23 import org.mule.umo.endpoint.UMOImmutableEndpoint; 24 import org.mule.umo.provider.ConnectorException; 25 import org.mule.umo.provider.UMOMessageAdapter; 26 27 import java.sql.Connection ; 28 import java.util.ArrayList ; 29 import java.util.List ; 30 31 35 public class JdbcMessageDispatcher extends AbstractMessageDispatcher 36 { 37 38 private JdbcConnector connector; 39 40 public JdbcMessageDispatcher(UMOImmutableEndpoint endpoint) 41 { 42 super(endpoint); 43 this.connector = (JdbcConnector)endpoint.getConnector(); 44 } 45 46 51 protected void doDispose() 52 { 53 } 55 56 61 protected void doDispatch(UMOEvent event) throws Exception 62 { 63 if (logger.isDebugEnabled()) 64 { 65 logger.debug("Dispatch event: " + event); 66 } 67 68 UMOImmutableEndpoint endpoint = event.getEndpoint(); 69 String writeStmt = endpoint.getEndpointURI().getAddress(); 70 String str; 71 if ((str = this.connector.getQuery(endpoint, writeStmt)) != null) 72 { 73 writeStmt = str; 74 } 75 writeStmt = StringUtils.trimToEmpty(writeStmt); 76 if (StringUtils.isBlank(writeStmt)) 77 { 78 throw new IllegalArgumentException ("Missing a write statement"); 79 } 80 if (!"insert".equalsIgnoreCase(writeStmt.substring(0, 6)) 81 && !"update".equalsIgnoreCase(writeStmt.substring(0, 6)) 82 && !"delete".equalsIgnoreCase(writeStmt.substring(0, 6))) 83 { 84 throw new IllegalArgumentException ( 85 "Write statement should be an insert / update / delete sql statement"); 86 } 87 List paramNames = new ArrayList (); 88 writeStmt = connector.parseStatement(writeStmt, paramNames); 89 90 Object [] paramValues = connector.getParams(endpoint, paramNames, new MuleMessage( 91 event.getTransformedMessage())); 92 93 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 94 Connection con = null; 95 try 96 { 97 con = this.connector.getConnection(); 98 99 int nbRows = connector.createQueryRunner().update(con, writeStmt, paramValues); 100 if (nbRows != 1) 101 { 102 logger.warn("Row count for write should be 1 and not " + nbRows); 103 } 104 if (tx == null) 105 { 106 JdbcUtils.commitAndClose(con); 107 } 108 logger.debug("Event dispatched succesfuly"); 109 } 110 catch (Exception e) 111 { 112 logger.debug("Error dispatching event: " + e.getMessage(), e); 113 if (tx == null) 114 { 115 JdbcUtils.rollbackAndClose(con); 116 } 117 throw e; 118 } 119 } 120 121 126 protected UMOMessage doSend(UMOEvent event) throws Exception 127 { 128 doDispatch(event); 129 return event.getMessage(); 130 } 131 132 144 protected UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception 145 { 146 if (logger.isDebugEnabled()) 147 { 148 logger.debug("Trying to receive a message with a timeout of " + timeout); 149 } 150 151 String [] stmts = this.connector.getReadAndAckStatements(endpoint); 152 String readStmt = stmts[0]; 153 String ackStmt = stmts[1]; 154 List readParams = new ArrayList (); 155 List ackParams = new ArrayList (); 156 readStmt = connector.parseStatement(readStmt, readParams); 157 ackStmt = connector.parseStatement(ackStmt, ackParams); 158 159 Connection con = null; 160 long t0 = System.currentTimeMillis(); 161 try 162 { 163 con = this.connector.getConnection(); 164 if (timeout < 0) 165 { 166 timeout = Long.MAX_VALUE; 167 } 168 Object result; 169 do 170 { 171 result = connector.createQueryRunner().query(con, readStmt, 172 connector.getParams(endpoint, readParams, null), connector.createResultSetHandler()); 173 if (result != null) 174 { 175 if (logger.isDebugEnabled()) 176 { 177 logger.debug("Received: " + result); 178 } 179 break; 180 } 181 long sleep = Math.min(this.connector.getPollingFrequency(), 182 timeout - (System.currentTimeMillis() - t0)); 183 if (sleep > 0) 184 { 185 if (logger.isDebugEnabled()) 186 { 187 logger.debug("No results, sleeping for " + sleep); 188 } 189 Thread.sleep(sleep); 190 } 191 else 192 { 193 logger.debug("Timeout"); 194 return null; 195 } 196 } 197 while (true); 198 if (ackStmt != null) 199 { 200 int nbRows = connector.createQueryRunner().update(con, ackStmt, 201 connector.getParams(endpoint, ackParams, result)); 202 if (nbRows != 1) 203 { 204 logger.warn("Row count for ack should be 1 and not " + nbRows); 205 } 206 } 207 UMOMessageAdapter msgAdapter = this.connector.getMessageAdapter(result); 208 UMOMessage message = new MuleMessage(msgAdapter); 209 JdbcUtils.commitAndClose(con); 210 return message; 211 } 212 catch (Exception e) 213 { 214 JdbcUtils.rollbackAndClose(con); 215 throw e; 216 } 217 } 218 219 protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception 220 { 221 } 223 224 protected void doDisconnect() throws Exception 225 { 226 } 228 229 234 public Object getDelegateSession() throws UMOException 235 { 236 try 237 { 238 return connector.getConnection(); 239 } 240 catch (Exception e) 241 { 242 throw new ConnectorException(new Message(Messages.FAILED_TO_CREATE_X, "Jdbc Connection"), 243 connector, e); 244 } 245 } 246 247 } 248 | Popular Tags |