KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > jdbc > JdbcMessageDispatcher


1 /*
2  * $Id: JdbcMessageDispatcher.java 3937 2006-11-20 16:04:25Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.jdbc;
12
13 import org.apache.commons.lang.StringUtils;
14 import org.mule.config.i18n.Message;
15 import org.mule.config.i18n.Messages;
16 import org.mule.impl.MuleMessage;
17 import org.mule.providers.AbstractMessageDispatcher;
18 import org.mule.transaction.TransactionCoordination;
19 import org.mule.umo.UMOEvent;
20 import org.mule.umo.UMOException;
21 import org.mule.umo.UMOMessage;
22 import org.mule.umo.UMOTransaction;
23 import org.mule.umo.endpoint.UMOImmutableEndpoint;
24 import org.mule.umo.provider.ConnectorException;
25 import org.mule.umo.provider.UMOMessageAdapter;
26
27 import java.sql.Connection JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.List JavaDoc;
30
31 /**
32  * The Jdbc Message dispatcher is responsible for executing SQL queries against a
33  * database.
34  */

35 public class JdbcMessageDispatcher extends AbstractMessageDispatcher
36 {
37
38     private JdbcConnector connector;
39
40     public JdbcMessageDispatcher(UMOImmutableEndpoint endpoint)
41     {
42         super(endpoint);
43         this.connector = (JdbcConnector)endpoint.getConnector();
44     }
45
46     /*
47      * (non-Javadoc)
48      *
49      * @see org.mule.providers.AbstractMessageDispatcher#doDispose()
50      */

51     protected void doDispose()
52     {
53         // template method
54
}
55
56     /*
57      * (non-Javadoc)
58      *
59      * @see org.mule.providers.AbstractMessageDispatcher#doDispatch(org.mule.umo.UMOEvent)
60      */

61     protected void doDispatch(UMOEvent event) throws Exception JavaDoc
62     {
63         if (logger.isDebugEnabled())
64         {
65             logger.debug("Dispatch event: " + event);
66         }
67
68         UMOImmutableEndpoint endpoint = event.getEndpoint();
69         String JavaDoc writeStmt = endpoint.getEndpointURI().getAddress();
70         String JavaDoc str;
71         if ((str = this.connector.getQuery(endpoint, writeStmt)) != null)
72         {
73             writeStmt = str;
74         }
75         writeStmt = StringUtils.trimToEmpty(writeStmt);
76         if (StringUtils.isBlank(writeStmt))
77         {
78             throw new IllegalArgumentException JavaDoc("Missing a write statement");
79         }
80         if (!"insert".equalsIgnoreCase(writeStmt.substring(0, 6))
81             && !"update".equalsIgnoreCase(writeStmt.substring(0, 6))
82             && !"delete".equalsIgnoreCase(writeStmt.substring(0, 6)))
83         {
84             throw new IllegalArgumentException JavaDoc(
85                 "Write statement should be an insert / update / delete sql statement");
86         }
87         List JavaDoc paramNames = new ArrayList JavaDoc();
88         writeStmt = connector.parseStatement(writeStmt, paramNames);
89
90         Object JavaDoc[] paramValues = connector.getParams(endpoint, paramNames, new MuleMessage(
91             event.getTransformedMessage()));
92
93         UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
94         Connection JavaDoc con = null;
95         try
96         {
97             con = this.connector.getConnection();
98
99             int nbRows = connector.createQueryRunner().update(con, writeStmt, paramValues);
100             if (nbRows != 1)
101             {
102                 logger.warn("Row count for write should be 1 and not " + nbRows);
103             }
104             if (tx == null)
105             {
106                 JdbcUtils.commitAndClose(con);
107             }
108             logger.debug("Event dispatched succesfuly");
109         }
110         catch (Exception JavaDoc e)
111         {
112             logger.debug("Error dispatching event: " + e.getMessage(), e);
113             if (tx == null)
114             {
115                 JdbcUtils.rollbackAndClose(con);
116             }
117             throw e;
118         }
119     }
120
121     /*
122      * (non-Javadoc)
123      *
124      * @see org.mule.providers.AbstractMessageDispatcher#doSend(org.mule.umo.UMOEvent)
125      */

126     protected UMOMessage doSend(UMOEvent event) throws Exception JavaDoc
127     {
128         doDispatch(event);
129         return event.getMessage();
130     }
131
132     /**
133      * Make a specific request to the underlying transport
134      *
135      * @param endpoint the endpoint to use when connecting to the resource
136      * @param timeout the maximum time the operation should block before returning.
137      * The call should return immediately if there is data available. If
138      * no data becomes available before the timeout elapses, null will be
139      * returned
140      * @return the result of the request wrapped in a UMOMessage object. Null will be
141      * returned if no data was avaialable
142      * @throws Exception if the call to the underlying protocal cuases an exception
143      */

144     protected UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception JavaDoc
145     {
146         if (logger.isDebugEnabled())
147         {
148             logger.debug("Trying to receive a message with a timeout of " + timeout);
149         }
150
151         String JavaDoc[] stmts = this.connector.getReadAndAckStatements(endpoint);
152         String JavaDoc readStmt = stmts[0];
153         String JavaDoc ackStmt = stmts[1];
154         List JavaDoc readParams = new ArrayList JavaDoc();
155         List JavaDoc ackParams = new ArrayList JavaDoc();
156         readStmt = connector.parseStatement(readStmt, readParams);
157         ackStmt = connector.parseStatement(ackStmt, ackParams);
158
159         Connection JavaDoc con = null;
160         long t0 = System.currentTimeMillis();
161         try
162         {
163             con = this.connector.getConnection();
164             if (timeout < 0)
165             {
166                 timeout = Long.MAX_VALUE;
167             }
168             Object JavaDoc result;
169             do
170             {
171                 result = connector.createQueryRunner().query(con, readStmt,
172                     connector.getParams(endpoint, readParams, null), connector.createResultSetHandler());
173                 if (result != null)
174                 {
175                     if (logger.isDebugEnabled())
176                     {
177                         logger.debug("Received: " + result);
178                     }
179                     break;
180                 }
181                 long sleep = Math.min(this.connector.getPollingFrequency(),
182                     timeout - (System.currentTimeMillis() - t0));
183                 if (sleep > 0)
184                 {
185                     if (logger.isDebugEnabled())
186                     {
187                         logger.debug("No results, sleeping for " + sleep);
188                     }
189                     Thread.sleep(sleep);
190                 }
191                 else
192                 {
193                     logger.debug("Timeout");
194                     return null;
195                 }
196             }
197             while (true);
198             if (ackStmt != null)
199             {
200                 int nbRows = connector.createQueryRunner().update(con, ackStmt,
201                     connector.getParams(endpoint, ackParams, result));
202                 if (nbRows != 1)
203                 {
204                     logger.warn("Row count for ack should be 1 and not " + nbRows);
205                 }
206             }
207             UMOMessageAdapter msgAdapter = this.connector.getMessageAdapter(result);
208             UMOMessage message = new MuleMessage(msgAdapter);
209             JdbcUtils.commitAndClose(con);
210             return message;
211         }
212         catch (Exception JavaDoc e)
213         {
214             JdbcUtils.rollbackAndClose(con);
215             throw e;
216         }
217     }
218
219     protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception JavaDoc
220     {
221         // template method
222
}
223
224     protected void doDisconnect() throws Exception JavaDoc
225     {
226         // template method
227
}
228
229     /*
230      * (non-Javadoc)
231      *
232      * @see org.mule.umo.provider.UMOMessageDispatcher#getDelegateSession()
233      */

234     public Object JavaDoc getDelegateSession() throws UMOException
235     {
236         try
237         {
238             return connector.getConnection();
239         }
240         catch (Exception JavaDoc e)
241         {
242             throw new ConnectorException(new Message(Messages.FAILED_TO_CREATE_X, "Jdbc Connection"),
243                 connector, e);
244         }
245     }
246
247 }
248
Popular Tags