KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > jdbc > JdbcMessageReceiver


1 /*
2  * $Id: JdbcMessageReceiver.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.jdbc;
12
13 import org.mule.MuleManager;
14 import org.mule.impl.MuleMessage;
15 import org.mule.providers.ConnectException;
16 import org.mule.providers.TransactedPollingMessageReceiver;
17 import org.mule.transaction.TransactionCoordination;
18 import org.mule.umo.UMOComponent;
19 import org.mule.umo.UMOMessage;
20 import org.mule.umo.UMOTransaction;
21 import org.mule.umo.endpoint.UMOEndpoint;
22 import org.mule.umo.lifecycle.InitialisationException;
23 import org.mule.umo.provider.UMOConnector;
24 import org.mule.umo.provider.UMOMessageAdapter;
25
26 import java.sql.Connection JavaDoc;
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.List JavaDoc;
30
31 /**
32  * TODO
33  */

34 public class JdbcMessageReceiver extends TransactedPollingMessageReceiver
35 {
36
37     protected JdbcConnector connector;
38     protected String JavaDoc readStmt;
39     protected String JavaDoc ackStmt;
40     protected List JavaDoc readParams;
41     protected List JavaDoc ackParams;
42
43     public JdbcMessageReceiver(UMOConnector connector,
44                                UMOComponent component,
45                                UMOEndpoint endpoint,
46                                String JavaDoc readStmt,
47                                String JavaDoc ackStmt) throws InitialisationException
48     {
49         super(connector, component, endpoint, new Long JavaDoc(((JdbcConnector)connector).getPollingFrequency()));
50
51         this.receiveMessagesInTransaction = false;
52         this.connector = (JdbcConnector)connector;
53
54         this.readParams = new ArrayList JavaDoc();
55         this.readStmt = this.connector.parseStatement(readStmt, this.readParams);
56         this.ackParams = new ArrayList JavaDoc();
57         this.ackStmt = this.connector.parseStatement(ackStmt, this.ackParams);
58     }
59
60     public void doConnect() throws Exception JavaDoc
61     {
62         Connection JavaDoc con = null;
63         try
64         {
65             con = this.connector.getConnection();
66         }
67         catch (Exception JavaDoc e)
68         {
69             throw new ConnectException(e, this);
70         }
71         finally
72         {
73             JdbcUtils.close(con);
74         }
75     }
76
77     public void doDisconnect() throws ConnectException
78     {
79         // noop
80
}
81
82     public void processMessage(Object JavaDoc message) throws Exception JavaDoc
83     {
84         Connection JavaDoc con = null;
85         UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
86         try
87         {
88             con = this.connector.getConnection();
89             UMOMessageAdapter msgAdapter = this.connector.getMessageAdapter(message);
90             UMOMessage umoMessage = new MuleMessage(msgAdapter);
91             if (this.ackStmt != null)
92             {
93
94                 Object JavaDoc[] ackParams = connector.getParams(endpoint, this.ackParams, umoMessage);
95                 int nbRows = connector.createQueryRunner().update(con, this.ackStmt, ackParams);
96                 if (nbRows != 1)
97                 {
98                     logger.warn("Row count for ack should be 1 and not " + nbRows);
99                 }
100             }
101             routeMessage(umoMessage, tx, tx != null || endpoint.isSynchronous());
102
103         }
104         catch (Exception JavaDoc ex)
105         {
106             if (tx != null)
107             {
108                 tx.setRollbackOnly();
109             }
110
111             // rethrow
112
throw ex;
113         }
114         finally
115         {
116             if (MuleManager.getInstance().getTransactionManager() != null || tx == null)
117             {
118                 // We are running in an XA transaction.
119
// This call is required here for compatibility with strict XA
120
// DataSources
121
// implementations, as is the case for WebSphere AS and Weblogic.
122
// Failure to do it here may result in a connection leak.
123
// The close() call will NOT close the connection, neither will it
124
// return it to the pool.
125
// It will notify the XA driver's ConnectionEventListener that the XA
126
// connection
127
// is no longer used by the application and is ready for the 2PC
128
// commit.
129
JdbcUtils.close(con);
130             }
131         }
132     }
133
134     public List JavaDoc getMessages() throws Exception JavaDoc
135     {
136         Connection JavaDoc con = null;
137         try
138         {
139             try
140             {
141                 con = this.connector.getConnection();
142             }
143             catch (SQLException JavaDoc e)
144             {
145                 throw new ConnectException(e, this);
146             }
147
148             Object JavaDoc[] readParams = connector.getParams(endpoint, this.readParams, null);
149             Object JavaDoc results = connector.createQueryRunner().query(con, this.readStmt, readParams,
150                 connector.createResultSetHandler());
151             return (List JavaDoc)results;
152         }
153         finally
154         {
155             JdbcUtils.close(con);
156         }
157     }
158
159 }
160
Popular Tags