1 23 package org.objectweb.joram.client.connector; 24 25 import javax.jms.*; 26 import javax.resource.NotSupportedException ; 27 import javax.resource.ResourceException ; 28 import javax.resource.spi.CommException ; 29 import javax.resource.spi.SecurityException ; 30 import javax.resource.spi.endpoint.MessageEndpointFactory ; 31 import javax.resource.spi.work.WorkManager ; 32 33 import java.util.Vector ; 34 35 import org.objectweb.util.monolog.api.BasicLevel; 36 37 38 43 class InboundConsumer implements javax.jms.ServerSessionPool 44 { 45 46 private WorkManager workManager; 47 48 private MessageEndpointFactory endpointFactory; 49 50 51 private XAConnection cnx; 52 53 private String subName = null; 54 55 56 private boolean transacted; 57 58 59 private int maxWorks; 60 61 private int ackMode; 62 63 64 private boolean closeDurSub; 65 66 67 private ConnectionConsumer cnxConsumer; 68 69 private int serverSessions = 0; 70 71 72 private Vector pool; 73 74 75 98 InboundConsumer(WorkManager workManager, 99 MessageEndpointFactory endpointFactory, 100 XAConnection cnx, 101 Destination dest, 102 String selector, 103 boolean durable, 104 String subName, 105 boolean transacted, 106 int maxWorks, 107 int maxMessages, 108 int ackMode, 109 boolean closeDurSub) throws ResourceException { 110 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 111 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, "InboundConsumer(" + workManager + 112 ", " + endpointFactory + 113 ", " + cnx + 114 ", " + dest + 115 ", " + selector + 116 ", " + durable + 117 ", " + subName + 118 ", " + transacted + 119 ", " + maxWorks + 120 ", " + maxMessages + 121 "," + ackMode + 122 "," + closeDurSub + ")"); 123 124 this.workManager = workManager; 125 this.endpointFactory = endpointFactory; 126 this.cnx = cnx; 127 this.transacted = transacted; 128 this.ackMode = ackMode; 129 130 if (maxWorks < 0) maxWorks = 0; 131 this.maxWorks = maxWorks; 132 133 pool = new Vector (maxWorks); 134 135 try { 136 if (durable) { 137 if (! (dest instanceof javax.jms.Topic )) 138 throw new NotSupportedException ("Can't set a durable subscription " 139 + "on a JMS queue."); 140 141 if (subName == null) 142 throw new NotSupportedException ("Missing durable " 143 + "subscription name."); 144 145 this.subName = subName; 146 147 cnxConsumer = 148 cnx.createDurableConnectionConsumer((javax.jms.Topic ) dest, 149 subName, 150 selector, 151 this, 152 maxMessages); 153 } else { 154 cnxConsumer = cnx.createConnectionConsumer(dest, 155 selector, 156 this, 157 maxMessages); 158 } 159 160 cnx.start(); 161 } 162 catch (JMSSecurityException exc) { 163 throw new SecurityException ("Target destination not readble: " 164 + exc); 165 } 166 catch (javax.jms.IllegalStateException exc) { 167 throw new CommException ("Connection with the JORAM server is lost."); 168 } 169 catch (JMSException exc) { 170 throw new ResourceException ("Could not set asynchronous consumer: " 171 + exc); 172 } 173 } 174 175 181 public ServerSession getServerSession() 182 throws JMSException { 183 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 184 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getServerSession()"); 185 186 try { 187 synchronized (pool) { 188 if (pool.isEmpty()) { 189 if (maxWorks > 0) { 190 if (serverSessions < maxWorks) { 191 return newSession(); 193 } else { 194 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 196 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 197 "ServerSessionPool waits for " 198 + "a free ServerSession."); 199 pool.wait(); 200 return (ServerSession) pool.remove(0); 201 } 202 } else { 203 return newSession(); 205 } 206 } else { 207 return (ServerSession) pool.remove(0); 208 } 209 } 210 } catch (Exception exc) { 211 throw new JMSException("Error while getting server session from pool: " 212 + exc); 213 } 214 } 215 216 private InboundSession newSession() { 217 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 218 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 219 "ServerSessionPool provides " 220 + "new ServerSession."); 221 serverSessions++; 222 return new InboundSession(this, 223 workManager, 224 endpointFactory, 225 cnx, 226 transacted, 227 ackMode); 228 } 229 230 231 void releaseSession(InboundSession session) { 232 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 233 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " releaseSession(" + session + ")"); 234 235 try { 236 synchronized (pool) { 237 pool.add(session); 238 pool.notify(); 239 } 240 } catch (Exception exc) {} 241 } 242 243 244 void close() { 245 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 246 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " close() unsubscribe subscription: "+ closeDurSub); 247 248 try { 249 cnxConsumer.close(); 250 251 if (closeDurSub) { 252 if (subName != null) { 253 Session session = cnx.createSession(true, 0); 254 session.unsubscribe(subName); 255 } 256 } 257 258 cnx.close(); 259 } 260 catch (JMSException exc) {} 261 } 262 } 263 | Popular Tags |