1 25 package org.objectweb.joram.client.connector; 26 27 import javax.jms.JMSException ; 28 import javax.jms.Session ; 29 import javax.jms.XAConnection ; 30 import javax.jms.XASession ; 31 import javax.resource.spi.endpoint.MessageEndpoint ; 32 import javax.resource.spi.endpoint.MessageEndpointFactory ; 33 import javax.resource.spi.work.WorkManager ; 34 import javax.transaction.xa.XAResource ; 35 36 import org.objectweb.util.monolog.api.BasicLevel; 37 38 43 class InboundSession implements javax.jms.ServerSession , 44 javax.resource.spi.work.Work , 45 javax.jms.MessageListener 46 { 47 48 private InboundConsumer consumer; 49 50 51 private WorkManager workManager; 52 53 private MessageEndpointFactory endpointFactory; 54 55 59 private Session session; 60 61 private XAResource xaResource = null; 62 63 64 75 InboundSession(InboundConsumer consumer, 76 WorkManager workManager, 77 MessageEndpointFactory endpointFactory, 78 XAConnection cnx, 79 boolean transacted, 80 int ackMode) { 81 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 82 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 83 "InboundSession(" + consumer + 84 "," + workManager + 85 "," + endpointFactory + 86 "," + cnx + 87 "," + transacted + 88 "," + ackMode + ")"); 89 90 this.consumer = consumer; 91 this.workManager = workManager; 92 this.endpointFactory = endpointFactory; 93 94 try { 95 if (transacted) { 96 session = cnx.createXASession(); 97 xaResource = ((XASession ) session).getXAResource(); 98 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 99 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 100 "InboundSession xaResource = " + xaResource); 101 } 102 else { 103 session = cnx.createSession(false, ackMode); 104 } 105 106 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 107 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 108 "InboundSession session = " + session); 109 session.setMessageListener(this); 110 } 111 catch (JMSException exc) {} 112 } 113 114 115 121 public Session getSession() throws JMSException 122 { 123 return session; 124 } 125 126 131 public void start() throws JMSException 132 { 133 try { 134 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 135 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 136 "ServerSession submits Work instance."); 137 workManager.scheduleWork(this); 138 } 139 catch (Exception exc) { 140 throw new JMSException ("Can't start the adapter session for processing " 141 + "the delivered messages: " + exc); 142 } 143 } 144 145 146 public void release() { 147 try { 148 session.close(); 149 } catch (JMSException exc) { 150 } 151 } 152 153 154 public void run() 155 { 156 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 157 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 158 "ServerSession runs wrapped Session."); 159 session.run(); 160 consumer.releaseSession(this); 161 } 162 163 164 public void onMessage(javax.jms.Message message) { 165 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 166 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 167 this + " onMessage(" + message + ")"); 168 169 MessageEndpoint endpoint = null; 170 try { 171 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 172 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 173 "ServerSession passes message to listener."); 174 endpoint = endpointFactory.createEndpoint(xaResource); 175 ((javax.jms.MessageListener ) endpoint).onMessage(message); 176 endpoint.release(); 177 } catch (Exception exc) { 178 try { 179 if (endpoint != null) endpoint.release(); 181 } catch (Exception e) { 182 } 184 throw new java.lang.IllegalStateException ("Could not get endpoint " 185 + "instance: " + exc); 186 } 187 } 188 } 189 | Popular Tags |