KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > petals > binding > xquarebc > listeners > XQuareBCListener


1 /**
2  * PETALS - PETALS Services Platform.
3  * Copyright (c) 2005 EBM Websourcing, http://www.ebmwebsourcing.com/
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * -------------------------------------------------------------------------
19  * $Id : $
20  * -------------------------------------------------------------------------
21  */

22 package org.objectweb.petals.binding.xquarebc.listeners;
23
24 import java.sql.Connection JavaDoc;
25 import java.util.HashMap JavaDoc;
26 import java.util.Map JavaDoc;
27 import java.util.Properties JavaDoc;
28 import java.util.logging.Level JavaDoc;
29 import java.util.logging.Logger JavaDoc;
30
31 import javax.jbi.JBIException;
32 import javax.jbi.component.ComponentContext;
33 import javax.jbi.messaging.DeliveryChannel;
34 import javax.jbi.messaging.ExchangeStatus;
35 import javax.jbi.messaging.InOnly;
36 import javax.jbi.messaging.InOut;
37 import javax.jbi.messaging.NormalizedMessage;
38 import javax.jbi.servicedesc.ServiceEndpoint;
39 import javax.sql.DataSource JavaDoc;
40 import javax.xml.namespace.QName JavaDoc;
41 import javax.xml.transform.Source JavaDoc;
42 import org.objectweb.petals.binding.xquarebc.XQuareBCException;
43 import org.objectweb.petals.binding.xquarebc.XQuareSUHandler;
44 import org.objectweb.petals.component.common.util.SourceHelper;
45 import org.objectweb.petals.component.common.util.WSDLHelper;
46 import org.w3c.dom.Document JavaDoc;
47
48 /**
49  * This listener "listens" to new data in any XQuareBC service unit's target database
50  * and builds upon the XQuery capabilities of the companion XQuareBCJBIProcessor in
51  * order to get them, format them as XML and send it as a message to the configured
52  * service. The "listening" part is done by regularly looking for "new data" as the
53  * results of a configured SQL query.
54  * <p/>
55  *
56  * WARNING : polling is not a best practice to integrate databases. Databases are
57  * usually application-driven, and those applications (or their users) should drive
58  * the "new data" lookup process. This implementation is provided as fitting the most
59  * generic needs.
60  *
61  * <h4>How it works</h4>
62  * <p/>
63  * This default implementation identifies "new data" by a given field that must be
64  * incremental - that is, be different and always greated in newly inserted records.
65  * The usual case for it is being a record identifier or primary key, be it works
66  * well with other kind of fields as well, like timestamps. Concretely, it executes
67  * the configured "oldDataDelimiterSelect" to get the old "new data delimiting value",
68  * then it executes the configured "newDataDelimiterSelect" to get the new "new data
69  * delimiting value" as available in the actual data. If they are different,
70  * the configured XQuery is then executed through the XQuareBCJBIProcessor and its
71  * results formatted as an XML document and sent as a JBI message to the configured
72  * JBI service. Finally the "old data delimiting value" is updated to the new one.
73  * <br/>
74  * Notes :
75  * <br/>
76  * This requires a new table to be added to the target database, that will contain
77  * a single record with a single field being the last seen "maximum data delimiting
78  * value". This table may start empty, in which case the "initDataDelimiter" will be
79  * used the first time.
80  * <br/>
81  * Before executing the XQuery, in its code the ##OLD_DATA_DELIMITER## qnd
82  * ##NEW_DATA_DELIMITER## expressions are replaced by the values found in the
83  * preceding executions of "oldDataDelimiterSelect" and "newDataDelimiterSelect".
84  * <br/>
85  * Many such listeners may be deployed in different service units to listen
86  * to different kinds of "new data" in the same database.
87  * <br/>
88  * Since it uses the XQuareBCJBIProcessor's "query" capabilities, look at its
89  * documentation for more details on how it behaves.
90  * <p/>
91  *
92  * <h4>Configuration</h4>
93  * The datasource to the target database and the XQuareBCJBIProcessor's XQuery
94  * capabilities are configured through the XQuareBCJBIProcessor (look at its
95  * javadoc for details).
96  * The configuration specific to this listener is configured through the same
97  * *.properties file provided by a given service unit, and uses the following
98  * additional properties :
99  *
100  * <ul>
101  * <li>newdatalistener.service : The fully qualified name of the service to which the new data
102  * messages will be sent. Ex. {http://petals.objectweb.org/}HelloworldService</li>
103  * <li>newdatalistener.operation : This service's operation. Ex. sayHello</li>
104  * <li>newdatalistener.delimiter.oldDataDelimiterSelect : An SQL query that must return the
105  * single value of the saved, old data delimiter. Ex. select max(bid_date) from bids_xqbc</li>
106  * <li>newdatalistener.delimiter.newDataDelimiterSelect : An SQL query that must return the
107  * single value of the current "new data" delimiter. Ex. select max(bid_date) from bids</li>
108  * <li>newdatalistener.delimiter.initDataDelimiter : The new data delimiter initial value if
109  * none in its own table and field. Ex. 1970-01-01</li>
110  * <li>newdatalistener.delimiter.newDataDelimiterUpdates : An SQL insert that must update the
111  * value of the "new data" delimiter from "old" to "new". Note that the ##NEW_DATA_DELIMITER##
112  * expression is replaced within the SQL code before its execution. Ex. delete from bids_xqbc;\
113  * <li>insert into bids_xqbc (bid_date) values ("##NEW_DATA_DELIMITER##")
114  * <li>newdatalistener.delimiter.newDataXquery : An XQuery that must return and format the
115  * current new data. Note that the ##OLD_DATA_DELIMITER## and ##NEW_DATA_DELIMITER##
116  * expressions are replaced within the SQL code before its execution. Ex. for $i in collection("bids")/bids \
117  * <li>where $i/bid_date > xs:date("##OLD_DATA_DELIMITER##") and $i/bid_date <= xs:date("##NEW_DATA_DELIMITER##") return \
118  * <bid>{ $i/bid_date }</bid></li>
119  * </ul>
120  *
121  * @version $Rev: 250 $Date: {date}
122  * @since Petals 1.0
123  * @author Marc Dutoo - Open Wide
124  *
125  */

126 public class XQuareBCListener implements Runnable JavaDoc {
127     
128     private ComponentContext context;
129     private DeliveryChannel channel;
130     private Logger JavaDoc logger;
131     private Map JavaDoc<String JavaDoc, Properties JavaDoc> serviceToPropertiesMap;
132
133     protected XQuareBCJBIProcessor processor;
134     
135     private boolean running;
136     
137
138     /**
139      * Creates a new XQuareBCListener.
140      * @param context
141      * @param channel
142      * @param logger
143      * @param serviceToPropertiesMap
144      * @param processor
145      */

146     public XQuareBCListener(ComponentContext context, DeliveryChannel channel,
147         Logger JavaDoc logger, HashMap JavaDoc<String JavaDoc, Properties JavaDoc> serviceToPropertiesMap,
148         XQuareBCJBIProcessor processor) {
149         super();
150         this.context = context;
151         this.channel = channel; // needed to write to
152
this.logger = logger;
153         this.serviceToPropertiesMap = serviceToPropertiesMap;
154         this.processor = processor; // used to get access to datasource etc.
155
}
156     
157
158     /**
159      * Main listener "polling" method.
160      */

161     public void run() {
162
163         running = true;
164
165         while (running) {
166             try {
167                 // do it for all properties i.e. for all service units deployed
168
// NB. Using own instance of map so several xquare emitter SAs can coexist
169
Map JavaDoc<String JavaDoc, Properties JavaDoc> myServiceToPropertiesMap = new HashMap JavaDoc<String JavaDoc, Properties JavaDoc>(serviceToPropertiesMap);
170                 for (Properties JavaDoc serviceProps : myServiceToPropertiesMap.values()) {
171                     process(serviceProps);
172                    
173                    // NB. another way to do it would be :
174
//List<String> xqueries = getXqueries(serviceProps); // cache
175
//for (String xquery : xqueries) {
176
// process((String) xquery);
177
//}
178
}
179                 try {
180                     Thread.sleep(2000); // NB. could be in a prop
181
} catch (InterruptedException JavaDoc e) {
182                     this.logger.log(Level.WARNING, e.getMessage());
183                 }
184             } catch (RuntimeException JavaDoc e) {
185                 this.logger.log(Level.WARNING, "Error while processing "
186                     + "configured emission service properties", e);
187             }
188         }
189
190     }
191
192
193     /**
194      * Stops the polling loop
195      *
196      */

197     public void stopProcessing() {
198         running = false;
199     }
200
201     
202
203     /**
204      * Sends an inOnly message
205      * @param bodySource
206      * @param service
207      * @param operation
208      * @return
209      * @throws XQuareBCException
210      */

211     public boolean handleInOnlyMessage(Source JavaDoc bodySource, QName JavaDoc service,
212         String JavaDoc operation) throws XQuareBCException {
213
214         try {
215             InOnly msg = null;
216             msg = channel.createExchangeFactory().createInOnlyExchange();
217             
218             // Create and send messageExchange
219
NormalizedMessage inNm = msg.createMessage();
220             // NB. no attach
221
inNm.setContent(bodySource);
222             msg.setMessage(inNm, "IN");
223             msg.setService(service);
224             msg.setOperation(new QName JavaDoc(operation));
225                 
226             channel.sendSync(msg);
227             if (ExchangeStatus.DONE.equals(msg.getStatus())) {
228                 return true;
229             } else {
230                 return false;
231             }
232
233         } catch (Exception JavaDoc e) {
234             String JavaDoc msg = "Error while emitting data as an InOnly message "
235                 + "to service " + service + " and operation " + operation;
236             logger.log(Level.SEVERE, msg, e);
237             throw new XQuareBCException(msg, e);
238         }
239     }
240
241     /**
242      * Sends an inOut message
243      * @param bodySource
244      * @param service
245      * @param operation
246      * @return
247      * @throws XQuareBCException
248      */

249     public boolean handleInOutMessage(Source JavaDoc bodySource, QName JavaDoc service,
250         String JavaDoc operation) throws XQuareBCException {
251         InOut msg = null;
252         String JavaDoc response = "";
253         try {
254
255             msg = channel.createExchangeFactory().createInOutExchange();
256
257             NormalizedMessage inNm = msg.createMessage();
258             // NB. no attach
259
inNm.setContent(bodySource);
260             msg.setMessage(inNm, "IN");
261             msg.setService(service);
262             msg.setOperation(new QName JavaDoc(operation));
263             
264             channel.sendSync(msg);
265
266             // Verify if request was success,if not, handle the error message
267
if (msg.getStatus().equals(ExchangeStatus.ERROR)) {
268                 // fault
269
// TODO handle response !! output it as a file ??
270
response = SourceHelper.createString(msg.getFault()
271                     .getContent());
272                 logger.log(Level.SEVERE, "XQuareBC data emission "
273                     + "to service " + service + " and operation " + operation
274                     + " received error response :" + response);
275             } else if (msg.getMessage("OUT") != null) {
276                 // success
277
// if there wasn't errors , binding component handle out reponse
278
NormalizedMessage outNm = msg.getMessage("OUT");
279                 response = SourceHelper.createString(outNm.getContent());
280                 logger.log(Level.FINE, "XQuareBC data emission "
281                     + "to service " + service + " and operation " + operation
282                     + " received response :" + response);
283             } else {
284                 logger.log(Level.WARNING, "XQuareBC data emission "
285                     + "to service " + service + " and operation " + operation
286                     + " received no (null) response message");
287             }
288             // Send status DONE to end exchange
289
msg.setStatus(ExchangeStatus.DONE);
290             channel.send(msg);
291             return true;
292
293         } catch (Exception JavaDoc e) {
294             String JavaDoc errMsg = "Error while emitting data as an InOut message "
295                 + "to service " + service + " and operation " + operation;
296             logger.log(Level.SEVERE, errMsg, e);
297             throw new XQuareBCException(errMsg, e);
298         }
299     }
300
301     
302     /**
303      * Tries to find and process new data according to the configuration
304      * of the given service unit.
305      * @param inOut
306      * @param service
307      * @param operation
308      * @param serviceProps
309      * @throws Exception
310      */

311     public void handleNewData(boolean inOut, QName JavaDoc service,
312         String JavaDoc operation, Properties JavaDoc serviceProps) throws Exception JavaDoc {
313         
314         // getting the datasource
315
DataSource JavaDoc ds = processor.getDataSource(serviceProps);
316         Connection JavaDoc conn = ds.getConnection();
317         // conn.setAutoCommit(false); // would be better but...
318

319         // getting a new data detection and handling strategy
320
NewDataStrategy newDataStrategy = new DelimiterNewDataStrategy(processor,
321             conn, serviceProps, XQuareSUHandler.LISTENER_PROP_PREFIX,
322             logger);
323         
324         // now detecting new data if any :
325
if (!newDataStrategy.detectNewData()) {
326             // no data or no new data (when looping without having new data in between) : do nothing
327
return;
328         }
329         // now querying for new data and getting the formatted new data message :
330
Source JavaDoc bodySource = newDataStrategy.getNewData();
331         logger.log(Level.FINE, "XQuareBC data emission "
332             + "to service " + service + " and operation " + operation
333             + " sent data :" + SourceHelper.createString(bodySource));
334
335         boolean written = false;
336         try {
337             if (!inOut) {
338                 written = handleInOnlyMessage(bodySource, service, operation);
339             } else {
340                 // NB. in fact inOut and out very similar...
341
written = handleInOutMessage(bodySource, service, operation);
342             }
343         } catch (XQuareBCException e) {
344             this.logger.log(Level.SEVERE, e.getMessage());
345         }
346         if (written) {
347             // updating known, "old" data
348
newDataStrategy.updateDataKnown();
349         }
350     }
351     
352
353     /**
354      * Logs that the given service is not available.
355      * @param response
356      * @param service
357      */

358     public void notFoundService(String JavaDoc response, String JavaDoc errorMessage) {
359         this.logger.log(Level.SEVERE, "Service not found : " + errorMessage);
360     }
361
362     
363     /**
364      * Gets the targeted service configured in the given service unit
365      * properties. If it does not exist (or its configured operation doesn't),
366      * calls notFoundService() ; else calls handleNewData().
367      * @param serviceProps
368      */

369     protected void process(Properties JavaDoc serviceProps) {
370         boolean found = false;
371         boolean isInOut = false;
372
373         String JavaDoc serviceName = serviceProps.getProperty(
374             XQuareSUHandler.LISTENER_SERVICE_PROP);
375         String JavaDoc operation = serviceProps.getProperty(
376             XQuareSUHandler.LISTENER_OPERATION_PROP);
377         
378         if (serviceName == null || serviceName.length() == 0
379                 || operation == null || operation.length() == 0) {
380             // no newdatalistener for this xquare service
381
return;
382         }
383         
384         // looking for valid endpoint
385
QName JavaDoc service = QName.valueOf(serviceName);
386         String JavaDoc errorMessage = "Looking for endpoints for service " + service
387             + " with operation " + operation + " : ";
388         ServiceEndpoint[] serviceEndpoints = this.context.getEndpointsForService(service);
389         ServiceEndpoint serviceEndpoint = null;
390         if (serviceEndpoints.length > 0) {
391             // using first endpoint found
392
serviceEndpoint = serviceEndpoints[0];
393             Document JavaDoc serviceDesc;
394             try {
395                 serviceDesc = context.getEndpointDescriptor(serviceEndpoint);
396
397                 // Determines if the operation exists and if it is an inOnly or
398
// inOut operation
399
try {
400                     found = WSDLHelper.hasOperationNamed(serviceDesc,
401                         operation, service);
402                     if (!found) {
403                         errorMessage += "its WSDL definition (for endpoint "
404                             + serviceEndpoint.getEndpointName() + ") has no such operation";
405                     }
406                     isInOut = WSDLHelper.isInOutOperation(serviceDesc, operation,
407                         QName.valueOf(serviceName));
408                 } catch (Exception JavaDoc e) {
409                     found = false;
410                     isInOut = false;
411                     errorMessage += "Error while looking in the wsdl definition "
412                         + "for the required operation and its MEP for the endpoint "
413                         + serviceEndpoint.getEndpointName();
414                     this.logger.log(Level.FINE, errorMessage , e);
415                 }
416
417             } catch (JBIException e) {
418                 errorMessage += "Error getting descriptor of endpoint "
419                     + serviceEndpoint.getEndpointName();
420                 this.logger.log(Level.FINE, errorMessage, e);
421             }
422         } else {
423             errorMessage += "No endpoint found";
424         }
425         if (!found) {
426             String JavaDoc response = SourceHelper.createSoapFault(
427                 new XQuareBCException(errorMessage), "300");
428             notFoundService(response, errorMessage);
429             return;
430         }
431         
432         // Handle the dataTable
433
try {
434             handleNewData(isInOut, service, operation, serviceProps);
435         } catch (Exception JavaDoc e) {
436             this.logger.log(Level.SEVERE, "Error while handling new data "
437                 + "for target service " + service + " with operation "
438                 + operation, e);
439         }
440
441     }
442
443 }
444
Popular Tags