1 22 package org.objectweb.petals.binding.xquarebc.listeners; 23 24 import java.sql.Connection ; 25 import java.util.HashMap ; 26 import java.util.Map ; 27 import java.util.Properties ; 28 import java.util.logging.Level ; 29 import java.util.logging.Logger ; 30 31 import javax.jbi.JBIException; 32 import javax.jbi.component.ComponentContext; 33 import javax.jbi.messaging.DeliveryChannel; 34 import javax.jbi.messaging.ExchangeStatus; 35 import javax.jbi.messaging.InOnly; 36 import javax.jbi.messaging.InOut; 37 import javax.jbi.messaging.NormalizedMessage; 38 import javax.jbi.servicedesc.ServiceEndpoint; 39 import javax.sql.DataSource ; 40 import javax.xml.namespace.QName ; 41 import javax.xml.transform.Source ; 42 import org.objectweb.petals.binding.xquarebc.XQuareBCException; 43 import org.objectweb.petals.binding.xquarebc.XQuareSUHandler; 44 import org.objectweb.petals.component.common.util.SourceHelper; 45 import org.objectweb.petals.component.common.util.WSDLHelper; 46 import org.w3c.dom.Document ; 47 48 126 public class XQuareBCListener implements Runnable { 127 128 private ComponentContext context; 129 private DeliveryChannel channel; 130 private Logger logger; 131 private Map <String , Properties > serviceToPropertiesMap; 132 133 protected XQuareBCJBIProcessor processor; 134 135 private boolean running; 136 137 138 146 public XQuareBCListener(ComponentContext context, DeliveryChannel channel, 147 Logger logger, HashMap <String , Properties > serviceToPropertiesMap, 148 XQuareBCJBIProcessor processor) { 149 super(); 150 this.context = context; 151 this.channel = channel; this.logger = logger; 153 this.serviceToPropertiesMap = serviceToPropertiesMap; 154 this.processor = processor; } 156 157 158 161 public void run() { 162 163 running = true; 164 165 while (running) { 166 try { 167 Map <String , Properties > myServiceToPropertiesMap = new HashMap <String , Properties >(serviceToPropertiesMap); 170 for (Properties serviceProps : myServiceToPropertiesMap.values()) { 171 process(serviceProps); 172 173 } 179 try { 180 Thread.sleep(2000); } catch (InterruptedException e) { 182 this.logger.log(Level.WARNING, e.getMessage()); 183 } 184 } catch (RuntimeException e) { 185 this.logger.log(Level.WARNING, "Error while processing " 186 + "configured emission service properties", e); 187 } 188 } 189 190 } 191 192 193 197 public void stopProcessing() { 198 running = false; 199 } 200 201 202 203 211 public boolean handleInOnlyMessage(Source bodySource, QName service, 212 String operation) throws XQuareBCException { 213 214 try { 215 InOnly msg = null; 216 msg = channel.createExchangeFactory().createInOnlyExchange(); 217 218 NormalizedMessage inNm = msg.createMessage(); 220 inNm.setContent(bodySource); 222 msg.setMessage(inNm, "IN"); 223 msg.setService(service); 224 msg.setOperation(new QName (operation)); 225 226 channel.sendSync(msg); 227 if (ExchangeStatus.DONE.equals(msg.getStatus())) { 228 return true; 229 } else { 230 return false; 231 } 232 233 } catch (Exception e) { 234 String msg = "Error while emitting data as an InOnly message " 235 + "to service " + service + " and operation " + operation; 236 logger.log(Level.SEVERE, msg, e); 237 throw new XQuareBCException(msg, e); 238 } 239 } 240 241 249 public boolean handleInOutMessage(Source bodySource, QName service, 250 String operation) throws XQuareBCException { 251 InOut msg = null; 252 String response = ""; 253 try { 254 255 msg = channel.createExchangeFactory().createInOutExchange(); 256 257 NormalizedMessage inNm = msg.createMessage(); 258 inNm.setContent(bodySource); 260 msg.setMessage(inNm, "IN"); 261 msg.setService(service); 262 msg.setOperation(new QName (operation)); 263 264 channel.sendSync(msg); 265 266 if (msg.getStatus().equals(ExchangeStatus.ERROR)) { 268 response = SourceHelper.createString(msg.getFault() 271 .getContent()); 272 logger.log(Level.SEVERE, "XQuareBC data emission " 273 + "to service " + service + " and operation " + operation 274 + " received error response :" + response); 275 } else if (msg.getMessage("OUT") != null) { 276 NormalizedMessage outNm = msg.getMessage("OUT"); 279 response = SourceHelper.createString(outNm.getContent()); 280 logger.log(Level.FINE, "XQuareBC data emission " 281 + "to service " + service + " and operation " + operation 282 + " received response :" + response); 283 } else { 284 logger.log(Level.WARNING, "XQuareBC data emission " 285 + "to service " + service + " and operation " + operation 286 + " received no (null) response message"); 287 } 288 msg.setStatus(ExchangeStatus.DONE); 290 channel.send(msg); 291 return true; 292 293 } catch (Exception e) { 294 String errMsg = "Error while emitting data as an InOut message " 295 + "to service " + service + " and operation " + operation; 296 logger.log(Level.SEVERE, errMsg, e); 297 throw new XQuareBCException(errMsg, e); 298 } 299 } 300 301 302 311 public void handleNewData(boolean inOut, QName service, 312 String operation, Properties serviceProps) throws Exception { 313 314 DataSource ds = processor.getDataSource(serviceProps); 316 Connection conn = ds.getConnection(); 317 319 NewDataStrategy newDataStrategy = new DelimiterNewDataStrategy(processor, 321 conn, serviceProps, XQuareSUHandler.LISTENER_PROP_PREFIX, 322 logger); 323 324 if (!newDataStrategy.detectNewData()) { 326 return; 328 } 329 Source bodySource = newDataStrategy.getNewData(); 331 logger.log(Level.FINE, "XQuareBC data emission " 332 + "to service " + service + " and operation " + operation 333 + " sent data :" + SourceHelper.createString(bodySource)); 334 335 boolean written = false; 336 try { 337 if (!inOut) { 338 written = handleInOnlyMessage(bodySource, service, operation); 339 } else { 340 written = handleInOutMessage(bodySource, service, operation); 342 } 343 } catch (XQuareBCException e) { 344 this.logger.log(Level.SEVERE, e.getMessage()); 345 } 346 if (written) { 347 newDataStrategy.updateDataKnown(); 349 } 350 } 351 352 353 358 public void notFoundService(String response, String errorMessage) { 359 this.logger.log(Level.SEVERE, "Service not found : " + errorMessage); 360 } 361 362 363 369 protected void process(Properties serviceProps) { 370 boolean found = false; 371 boolean isInOut = false; 372 373 String serviceName = serviceProps.getProperty( 374 XQuareSUHandler.LISTENER_SERVICE_PROP); 375 String operation = serviceProps.getProperty( 376 XQuareSUHandler.LISTENER_OPERATION_PROP); 377 378 if (serviceName == null || serviceName.length() == 0 379 || operation == null || operation.length() == 0) { 380 return; 382 } 383 384 QName service = QName.valueOf(serviceName); 386 String errorMessage = "Looking for endpoints for service " + service 387 + " with operation " + operation + " : "; 388 ServiceEndpoint[] serviceEndpoints = this.context.getEndpointsForService(service); 389 ServiceEndpoint serviceEndpoint = null; 390 if (serviceEndpoints.length > 0) { 391 serviceEndpoint = serviceEndpoints[0]; 393 Document serviceDesc; 394 try { 395 serviceDesc = context.getEndpointDescriptor(serviceEndpoint); 396 397 try { 400 found = WSDLHelper.hasOperationNamed(serviceDesc, 401 operation, service); 402 if (!found) { 403 errorMessage += "its WSDL definition (for endpoint " 404 + serviceEndpoint.getEndpointName() + ") has no such operation"; 405 } 406 isInOut = WSDLHelper.isInOutOperation(serviceDesc, operation, 407 QName.valueOf(serviceName)); 408 } catch (Exception e) { 409 found = false; 410 isInOut = false; 411 errorMessage += "Error while looking in the wsdl definition " 412 + "for the required operation and its MEP for the endpoint " 413 + serviceEndpoint.getEndpointName(); 414 this.logger.log(Level.FINE, errorMessage , e); 415 } 416 417 } catch (JBIException e) { 418 errorMessage += "Error getting descriptor of endpoint " 419 + serviceEndpoint.getEndpointName(); 420 this.logger.log(Level.FINE, errorMessage, e); 421 } 422 } else { 423 errorMessage += "No endpoint found"; 424 } 425 if (!found) { 426 String response = SourceHelper.createSoapFault( 427 new XQuareBCException(errorMessage), "300"); 428 notFoundService(response, errorMessage); 429 return; 430 } 431 432 try { 434 handleNewData(isInOut, service, operation, serviceProps); 435 } catch (Exception e) { 436 this.logger.log(Level.SEVERE, "Error while handling new data " 437 + "for target service " + service + " with operation " 438 + operation, e); 439 } 440 441 } 442 443 } 444 | Popular Tags |