1 10 11 12 package org.mule.providers.oracle.jms; 13 14 import oracle.jms.AQjmsSession; 15 import oracle.jms.AdtMessage; 16 import oracle.xdb.XMLType; 17 import org.mule.config.i18n.Message; 18 import org.mule.config.i18n.Messages; 19 import org.mule.providers.ConnectException; 20 import org.mule.providers.jms.JmsConnector; 21 import org.mule.providers.jms.JmsConstants; 22 import org.mule.providers.jms.JmsMessageUtils; 23 import org.mule.transaction.TransactionCoordination; 24 import org.mule.umo.TransactionException; 25 import org.mule.umo.UMOTransaction; 26 import org.mule.umo.lifecycle.InitialisationException; 27 28 import javax.jms.Connection ; 29 import javax.jms.ConnectionFactory ; 30 import javax.jms.JMSException ; 31 import javax.jms.Session ; 32 import javax.naming.NamingException ; 33 import java.io.Serializable ; 34 import java.sql.SQLException ; 35 36 public abstract class AbstractOracleJmsConnector extends JmsConnector 37 { 38 44 public static final String PAYLOADFACTORY_PROPERTY = "payloadFactory"; 45 46 protected String payloadFactory = null; 47 48 54 private boolean multipleSessionsPerConnection = false; 55 56 public AbstractOracleJmsConnector() 57 { 58 super(); 59 registerSupportedProtocol("jms"); 60 } 61 62 65 public String getProtocol() 66 { 67 return "oaq"; 68 } 69 70 73 public boolean supportsProtocol(String protocol) 74 { 75 return getProtocol().equalsIgnoreCase(protocol) || super.getProtocol().equalsIgnoreCase(protocol); 80 } 81 82 public void doConnect() throws ConnectException 83 { 84 try { 85 setJndiDestinations(false); 88 setForceJndiDestinations(false); 89 90 setJmsSupport(new OracleJmsSupport(this, null, false, false)); 91 } 92 catch (Exception e) { 93 throw new ConnectException(new Message(Messages.FAILED_TO_CREATE_X, "Oracle Jms Connector"), e, 94 this); 95 } 96 97 } 102 103 111 public Session getSession(boolean transacted, boolean topic) throws JMSException { 112 113 if (multipleSessionsPerConnection) { 114 return super.getSession(transacted, topic); 115 } else { 116 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 117 118 Session session = getSessionFromTransaction(); 120 if (session != null) { 121 logger.debug("Retrieving jms session from current transaction"); 122 return session; 123 } 124 125 Connection connection; 127 try { 128 connection = createConnection(); 129 } 130 catch (NamingException e) { 131 throw new JMSException ("Unable to open new database connection: " + e.getMessage()); 132 } 133 catch (InitialisationException e) { 134 throw new JMSException ("Unable to open new database connection: " + e.getMessage()); 135 } 136 137 logger.debug("Retrieving new jms session from connection"); 139 session = getJmsSupport().createSession(connection, topic, transacted || tx != null, 140 getAcknowledgementMode(), isNoLocal()); 141 if (tx != null) { 142 logger.debug("Binding session to current transaction"); 143 try { 144 tx.bindResource(connection, session); 145 } 146 catch (TransactionException e) { 147 throw new RuntimeException ("Could not bind session to current transaction", e); 148 } 149 } 150 return session; 151 } 152 } 153 154 158 public boolean supportsProperty(String property) { 159 return (!JmsConstants.JMS_REPLY_TO.equalsIgnoreCase(property) && !JmsConstants.JMS_TYPE.equalsIgnoreCase(property)); 160 } 161 162 167 public javax.jms.Message preProcessMessage(javax.jms.Message message, Session session) throws Exception { 168 Object payload; 169 javax.jms.Message newMessage; 170 171 if (message instanceof AdtMessage) { 172 payload = ((AdtMessage) message).getAdtPayload(); 173 174 if (payload instanceof XMLType) { 175 newMessage = session.createTextMessage(((XMLType) payload).getStringVal().trim()); 176 } else if (payload instanceof Serializable ) { 177 newMessage = session.createObjectMessage((Serializable ) payload); 178 } else { 179 throw new JMSException ("The payload of the incoming AdtMessage must be serializable."); 180 } 181 JmsMessageUtils.copyJMSProperties(message, newMessage, this); 183 return newMessage; 184 } else { 185 return message; 186 } 187 } 188 189 195 public void close(Session session) throws JMSException { 196 if (session != null) { 197 java.sql.Connection conn = ((AQjmsSession) session).getDBConnection(); 198 try { 199 if (conn != null && !conn.isClosed()) { 200 conn.commit(); 201 conn.close(); 202 } 203 } 204 catch (SQLException e) { 205 JMSException ex = new JMSException (e.getMessage()); 206 ex.setLinkedException(e); 207 throw ex; 208 } 209 } 210 } 211 212 public abstract java.sql.Connection getJdbcConnection() throws JMSException ; 213 214 public boolean isMultipleSessionsPerConnection() { 215 return multipleSessionsPerConnection; 216 } 217 218 public void setMultipleSessionsPerConnection(boolean multipleSessionsPerConnection) { 219 this.multipleSessionsPerConnection = multipleSessionsPerConnection; 220 } 221 222 231 protected ConnectionFactory createConnectionFactory() throws InitialisationException, NamingException { 232 return null; 233 } 234 235 public String getPayloadFactory() 236 { 237 return payloadFactory; 238 } 239 240 public void setPayloadFactory(String payloadFactory) 241 { 242 this.payloadFactory = payloadFactory; 243 } 244 } 245 | Popular Tags |