1 10 11 package org.mule.providers.jdbc; 12 13 import org.apache.commons.dbutils.QueryRunner; 14 import org.apache.commons.dbutils.ResultSetHandler; 15 import org.apache.commons.lang.exception.ExceptionUtils; 16 import org.mule.config.ExceptionHelper; 17 import org.mule.config.i18n.Message; 18 import org.mule.config.i18n.Messages; 19 import org.mule.util.properties.BeanPropertyExtractor; 20 import org.mule.util.properties.MapPropertyExtractor; 21 import org.mule.util.properties.MessagePropertyExtractor; 22 import org.mule.util.properties.PayloadPropertyExtractor; 23 import org.mule.util.properties.PropertyExtractor; 24 import org.mule.providers.AbstractServiceEnabledConnector; 25 import org.mule.transaction.TransactionCoordination; 26 import org.mule.umo.TransactionException; 27 import org.mule.umo.UMOComponent; 28 import org.mule.umo.UMOTransaction; 29 import org.mule.umo.endpoint.UMOEndpoint; 30 import org.mule.umo.endpoint.UMOImmutableEndpoint; 31 import org.mule.umo.lifecycle.InitialisationException; 32 import org.mule.umo.provider.UMOMessageReceiver; 33 import org.mule.util.ClassUtils; 34 import org.mule.util.StringUtils; 35 36 import javax.naming.Context ; 37 import javax.naming.InitialContext ; 38 import javax.naming.NamingException ; 39 import javax.sql.DataSource ; 40 import java.sql.Connection ; 41 import java.util.Hashtable ; 42 import java.util.Iterator ; 43 import java.util.List ; 44 import java.util.Map ; 45 import java.util.Set ; 46 import java.util.HashSet ; 47 import java.util.regex.Matcher ; 48 import java.util.regex.Pattern ; 49 50 public class JdbcConnector extends AbstractServiceEnabledConnector 51 { 52 public static final String PROPERTY_POLLING_FREQUENCY = "pollingFrequency"; 55 public static final long DEFAULT_POLLING_FREQUENCY = 1000; 56 57 private static final String DEFAULT_QUERY_RUNNER = "org.apache.commons.dbutils.QueryRunner"; 58 private static final String DEFAULT_RESULTSET_HANDLER = "org.apache.commons.dbutils.handlers.MapListHandler"; 59 60 private static final Pattern STATEMENT_ARGS = Pattern.compile("\\$\\{[^\\}]*\\}"); 61 62 63 static 64 { 65 ExceptionHelper.registerExceptionReader(new SQLExceptionReader()); 66 } 67 68 protected long pollingFrequency = 0; 69 protected DataSource dataSource; 70 protected String dataSourceJndiName; 71 protected Context jndiContext; 72 protected String jndiInitialFactory; 73 protected String jndiProviderUrl; 74 protected Map providerProperties; 75 protected Map queries; 76 protected String resultSetHandler = DEFAULT_RESULTSET_HANDLER; 77 protected String queryRunner = DEFAULT_QUERY_RUNNER; 78 protected Set queryValueExtractors; 79 protected Set propertyExtractors; 80 81 86 public String getProtocol() 87 { 88 return "jdbc"; 89 } 90 91 public UMOMessageReceiver createReceiver(UMOComponent component, UMOEndpoint endpoint) throws Exception 92 { 93 Map props = endpoint.getProperties(); 94 if (props != null) 95 { 96 String tempPolling = (String ) props.get(PROPERTY_POLLING_FREQUENCY); 97 if (tempPolling != null) 98 { 99 pollingFrequency = Long.parseLong(tempPolling); 100 } 101 } 102 103 if (pollingFrequency <= 0) 104 { 105 pollingFrequency = DEFAULT_POLLING_FREQUENCY; 106 } 107 108 String [] params = getReadAndAckStatements(endpoint); 109 return getServiceDescriptor().createMessageReceiver(this, component, endpoint, params); 110 } 111 112 protected void initJndiContext() throws NamingException 113 { 114 if (this.jndiContext == null) 115 { 116 Hashtable props = new Hashtable (); 117 if (this.jndiInitialFactory != null) 118 { 119 props.put(Context.INITIAL_CONTEXT_FACTORY, this.jndiInitialFactory); 120 } 121 if (this.jndiProviderUrl != null) 122 { 123 props.put(Context.PROVIDER_URL, jndiProviderUrl); 124 } 125 if (this.providerProperties != null) 126 { 127 props.putAll(this.providerProperties); 128 } 129 this.jndiContext = new InitialContext (props); 130 } 131 132 } 133 134 protected void createDataSource() throws InitialisationException, NamingException 135 { 136 Object temp = this.jndiContext.lookup(this.dataSourceJndiName); 137 if (temp instanceof DataSource ) 138 { 139 dataSource = (DataSource )temp; 140 } 141 else 142 { 143 throw new InitialisationException(new Message(Messages.JNDI_RESOURCE_X_NOT_FOUND, 144 this.dataSourceJndiName), this); 145 } 146 } 147 148 153 public void doInitialise() throws InitialisationException 154 { 155 super.doInitialise(); 156 try 157 { 158 if (dataSource == null) 161 { 162 initJndiContext(); 163 createDataSource(); 164 } 165 if (queryValueExtractors == null) 167 { 168 queryValueExtractors = new HashSet (); 170 queryValueExtractors.add(MessagePropertyExtractor.class.getName()); 171 queryValueExtractors.add(NowPropertyExtractor.class.getName()); 172 queryValueExtractors.add(PayloadPropertyExtractor.class.getName()); 173 queryValueExtractors.add(MapPropertyExtractor.class.getName()); 174 queryValueExtractors.add(BeanPropertyExtractor.class.getName()); 175 if (ClassUtils.isClassOnPath("org.mule.util.properties.Dom4jPropertyExtractor", getClass())) 176 { 177 queryValueExtractors.add("org.mule.util.properties.Dom4jPropertyExtractor"); 178 } 179 if (ClassUtils.isClassOnPath("org.mule.util.properties.JDomPropertyExtractor", getClass())) 180 { 181 queryValueExtractors.add("org.mule.util.properties.JDomPropertyExtractor"); 182 } 183 } 184 propertyExtractors = new HashSet (); 185 for (Iterator iterator = queryValueExtractors.iterator(); iterator.hasNext();) 186 { 187 String s = (String )iterator.next(); 188 propertyExtractors.add(ClassUtils.instanciateClass(s, ClassUtils.NO_ARGS)); 189 } 190 } 191 catch (Exception e) 192 { 193 throw new InitialisationException(new Message(Messages.FAILED_TO_CREATE_X, "Jdbc Connector"), e, 194 this); 195 } 196 } 197 198 public String [] getReadAndAckStatements(UMOImmutableEndpoint endpoint) 199 { 200 String str; 201 String readStmt; 203 if ((str = (String )endpoint.getProperty("sql")) != null) 204 { 205 readStmt = str; 206 } 207 else 208 { 209 readStmt = endpoint.getEndpointURI().getAddress(); 210 } 211 String ackStmt; 213 if ((str = (String )endpoint.getProperty("ack")) != null) 214 { 215 ackStmt = str; 216 if ((str = getQuery(endpoint, ackStmt)) != null) 217 { 218 ackStmt = str; 219 } 220 } 221 else 222 { 223 ackStmt = readStmt + ".ack"; 224 if ((str = getQuery(endpoint, ackStmt)) != null) 225 { 226 ackStmt = str; 227 } 228 else 229 { 230 ackStmt = null; 231 } 232 } 233 if ((str = getQuery(endpoint, readStmt)) != null) 235 { 236 readStmt = str; 237 } 238 if (readStmt == null) 239 { 240 throw new IllegalArgumentException ("Read statement should not be null"); 241 } 242 if (!"select".equalsIgnoreCase(readStmt.substring(0, 6))) 243 { 244 throw new IllegalArgumentException ("Read statement should be a select sql statement"); 245 } 246 if (ackStmt != null) 247 { 248 if (!"insert".equalsIgnoreCase(ackStmt.substring(0, 6)) 249 && !"update".equalsIgnoreCase(ackStmt.substring(0, 6)) 250 && !"delete".equalsIgnoreCase(ackStmt.substring(0, 6))) 251 { 252 throw new IllegalArgumentException ( 253 "Ack statement should be an insert / update / delete sql statement"); 254 } 255 } 256 return new String []{readStmt, ackStmt}; 257 } 258 259 public String getQuery(UMOImmutableEndpoint endpoint, String stmt) 260 { 261 Object query = null; 262 if (endpoint != null && endpoint.getProperties() != null) 263 { 264 Object queries = endpoint.getProperties().get("queries"); 265 if (queries instanceof Map ) 266 { 267 query = ((Map )queries).get(stmt); 268 } 269 } 270 if (query == null) 271 { 272 if (this.queries != null) 273 { 274 query = this.queries.get(stmt); 275 } 276 } 277 return query == null ? null : query.toString(); 278 } 279 280 283 public DataSource getDataSource() 284 { 285 return dataSource; 286 } 287 288 291 public void setDataSource(DataSource dataSource) 292 { 293 this.dataSource = dataSource; 294 } 295 296 299 public long getPollingFrequency() 300 { 301 return pollingFrequency; 302 } 303 304 307 public void setPollingFrequency(long pollingFrequency) 308 { 309 this.pollingFrequency = pollingFrequency; 310 } 311 312 315 public Map getQueries() 316 { 317 return queries; 318 } 319 320 323 public void setQueries(Map queries) 324 { 325 this.queries = queries; 326 } 327 328 331 public String getDataSourceJndiName() 332 { 333 return dataSourceJndiName; 334 } 335 336 339 public void setDataSourceJndiName(String dataSourceJndiName) 340 { 341 this.dataSourceJndiName = dataSourceJndiName; 342 } 343 344 347 public Context getJndiContext() 348 { 349 return jndiContext; 350 } 351 352 355 public void setJndiContext(Context jndiContext) 356 { 357 this.jndiContext = jndiContext; 358 } 359 360 363 public String getJndiInitialFactory() 364 { 365 return jndiInitialFactory; 366 } 367 368 371 public void setJndiInitialFactory(String jndiInitialFactory) 372 { 373 this.jndiInitialFactory = jndiInitialFactory; 374 } 375 376 379 public String getJndiProviderUrl() 380 { 381 return jndiProviderUrl; 382 } 383 384 387 public void setJndiProviderUrl(String jndiProviderUrl) 388 { 389 this.jndiProviderUrl = jndiProviderUrl; 390 } 391 392 395 public Map getProviderProperties() 396 { 397 return providerProperties; 398 } 399 400 403 public void setProviderProperties(Map providerProperties) 404 { 405 this.providerProperties = providerProperties; 406 } 407 408 413 public Object getSessionFactory(UMOEndpoint endpoint) throws Exception 414 { 415 return dataSource; 416 } 417 418 public Connection getConnection() throws Exception 419 { 420 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 421 if (tx != null) 422 { 423 if (tx.hasResource(dataSource)) 424 { 425 logger.debug("Retrieving connection from current transaction"); 426 return (Connection )tx.getResource(dataSource); 427 } 428 } 429 logger.debug("Retrieving new connection from data source"); 430 Connection con = dataSource.getConnection(); 431 432 if (tx != null) 433 { 434 logger.debug("Binding connection to current transaction"); 435 try 436 { 437 tx.bindResource(dataSource, con); 438 } 439 catch (TransactionException e) 440 { 441 throw new RuntimeException ("Could not bind connection to current transaction", e); 442 } 443 } 444 return con; 445 } 446 447 450 public String getResultSetHandler() 451 { 452 return this.resultSetHandler; 453 } 454 455 458 public void setResultSetHandler(String resultSetHandler) 459 { 460 this.resultSetHandler = resultSetHandler; 461 } 462 463 467 protected ResultSetHandler createResultSetHandler() 468 { 469 try 470 { 471 return (ResultSetHandler) ClassUtils.instanciateClass(getResultSetHandler(), 472 ClassUtils.NO_ARGS); 473 } 474 catch (Exception e) 475 { 476 throw new IllegalArgumentException ("Error creating instance of the resultSetHandler class :" 477 + getResultSetHandler() + System.getProperty("line.separator") 478 + ExceptionUtils.getFullStackTrace(e)); 479 } 480 } 481 482 public Set getQueryValueExtractors() 483 { 484 return queryValueExtractors; 485 } 486 487 public void setQueryValueExtractors(Set queryValueExtractors) 488 { 489 this.queryValueExtractors = queryValueExtractors; 490 } 491 492 495 public String getQueryRunner() 496 { 497 return this.queryRunner; 498 } 499 500 503 public void setQueryRunner(String queryRunner) 504 { 505 this.queryRunner = queryRunner; 506 } 507 508 512 protected QueryRunner createQueryRunner() 513 { 514 try 515 { 516 return (QueryRunner) ClassUtils.instanciateClass(getQueryRunner(), 517 ClassUtils.NO_ARGS); 518 } 519 catch (Exception e) 520 { 521 throw new IllegalArgumentException ("Error creating instance of the queryRunner class :" 522 + getQueryRunner() + System.getProperty("line.separator") 523 + ExceptionUtils.getFullStackTrace(e)); 524 } 525 } 526 527 535 public String parseStatement(String stmt, List params) 536 { 537 if (stmt == null) 538 { 539 return stmt; 540 } 541 Matcher m = STATEMENT_ARGS.matcher(stmt); 542 StringBuffer sb = new StringBuffer (200); 543 while (m.find()) 544 { 545 String key = m.group(); 546 m.appendReplacement(sb, "?"); 547 params.add(key); 548 } 549 m.appendTail(sb); 550 return sb.toString(); 551 } 552 553 public Object [] getParams(UMOImmutableEndpoint endpoint, List paramNames, Object message) 554 throws Exception 555 { 556 Object [] params = new Object [paramNames.size()]; 557 for (int i = 0; i < paramNames.size(); i++) 558 { 559 String param = (String )paramNames.get(i); 560 String name = param.substring(2, param.length() - 1); 561 Object value = null; 562 boolean foundValue = false; 564 if (message != null) 565 { 566 for (Iterator iterator = propertyExtractors.iterator(); iterator.hasNext();) 567 { 568 PropertyExtractor pe = (PropertyExtractor)iterator.next(); 569 value = pe.getProperty(name, message); 570 if (value != null) 571 { 572 if (value.equals(StringUtils.EMPTY) && pe instanceof BeanPropertyExtractor) 573 { 574 value = null; 575 } 576 foundValue = true; 577 break; 578 } 579 } 580 } 581 if (!foundValue) 582 { 583 value = endpoint.getProperty(name); 584 } 585 586 params[i] = value; 595 } 596 return params; 597 } 598 } 599 | Popular Tags |