1 23 package com.sun.enterprise.connectors.inflow; 24 25 import com.sun.ejb.MessageBeanClient; 26 import com.sun.ejb.MessageBeanListener; 27 import com.sun.ejb.MessageBeanProtocolManager; 28 import com.sun.enterprise.deployment.ConnectorDescriptor; 29 import com.sun.enterprise.deployment.EjbMessageBeanDescriptor; 30 import com.sun.enterprise.deployment.BundleDescriptor; 31 import com.sun.enterprise.deployment.EnvironmentProperty; 32 import com.sun.enterprise.deployment.MessageListener; 33 import com.sun.enterprise.resource.ResourceHandle; 34 import com.sun.enterprise.connectors.util.*; 35 import com.sun.enterprise.connectors.ConnectorRegistry; 36 import com.sun.enterprise.connectors.ConnectorRuntime; 37 import com.sun.enterprise.connectors.ConnectorRuntimeException; 38 import com.sun.enterprise.connectors.ActiveResourceAdapter; 39 import com.sun.enterprise.connectors.ActiveInboundResourceAdapter; 40 import com.sun.enterprise.connectors.ActiveOutboundResourceAdapter; 41 import com.sun.enterprise.connectors.system.ActiveJmsResourceAdapter; 42 import com.sun.logging.LogDomains; 43 import java.lang.reflect.Method ; 44 import java.lang.reflect.Proxy ; 45 import java.security.AccessController ; 46 import java.security.PrivilegedAction ; 47 import java.util.HashSet ; 48 import java.util.Iterator ; 49 import java.util.Set ; 50 import java.util.logging.Level ; 51 import java.util.logging.Logger ; 52 import javax.naming.InitialContext ; 53 import javax.resource.spi.ActivationSpec ; 54 import javax.resource.spi.ResourceAdapter ; 55 import javax.resource.spi.endpoint.MessageEndpoint ; 56 import javax.resource.spi.endpoint.MessageEndpointFactory ; 57 import javax.resource.spi.UnavailableException ; 58 import javax.transaction.xa.XAResource ; 59 60 67 public class ConnectorMessageBeanClient 68 implements MessageBeanClient, MessageEndpointFactory { 69 70 private static String MESSAGE_ENDPOINT = 71 "javax.resource.spi.endpoint.MessageEndpoint"; 72 73 private ConnectorRegistry registry_; 74 75 private MessageBeanProtocolManager messageBeanPM_; 76 private EjbMessageBeanDescriptor descriptor_; 77 private BasicResourceAllocator allocator_; 78 private boolean started_ = false; 79 80 private final int CREATED = 0; 81 private final int BLOCKED = 1; 82 private final int UNBLOCKED = 2; 83 private int myState=CREATED; 84 85 private final long WAIT_TIME = 60000; 86 87 private String beanID_; 90 private static Logger logger = 91 LogDomains.getLogger(LogDomains.RSR_LOGGER); 92 93 98 public ConnectorMessageBeanClient(EjbMessageBeanDescriptor descriptor) { 99 descriptor_ = descriptor; 100 allocator_ = new BasicResourceAllocator(); 101 102 String appName = descriptor.getApplication().getName(); 103 104 String moduleID = 105 descriptor.getEjbBundleDescriptor().getModuleID(); 106 107 String beanName = descriptor.getName(); 108 109 beanID_ = appName + ":" + moduleID + ":" + beanName; 110 111 try { 112 registry_ = ConnectorRegistry.getInstance(); 113 } catch (Exception e) { 114 } 115 116 } 117 118 130 public void setup(MessageBeanProtocolManager messageBeanPM) 131 throws Exception { 132 boolean d = true; 133 134 messageBeanPM_ = messageBeanPM; 135 136 String resourceAdapterMid = descriptor_.getResourceAdapterMid(); 137 ActiveResourceAdapter activeRar = null; 138 if (resourceAdapterMid == null) { 139 resourceAdapterMid = ConnectorRuntime.DEFAULT_JMS_ADAPTER; 140 } 141 activeRar = registry_.getActiveResourceAdapter(resourceAdapterMid); 142 143 if(activeRar == null && 144 resourceAdapterMid.equals(ConnectorRuntime.DEFAULT_JMS_ADAPTER)) { 145 ConnectorRuntime crt = ConnectorRuntime.getRuntime(); 146 crt.loadDeferredResourceAdapter(resourceAdapterMid); 147 activeRar = registry_.getActiveResourceAdapter(resourceAdapterMid); 148 } 149 150 if (activeRar == null) { 151 String msg = "Resource adapter "+resourceAdapterMid+ " is not deployed"; 152 throw new ConnectorRuntimeException(msg); 153 } 154 155 if (activeRar instanceof ActiveJmsResourceAdapter) { 156 ActiveJmsResourceAdapter jmsRa = (ActiveJmsResourceAdapter) activeRar; 157 jmsRa.updateMDBRuntimeInfo(descriptor_, 158 messageBeanPM_.getPoolDescriptor()); 159 } 160 161 if (!(activeRar instanceof ActiveInboundResourceAdapter)) { 162 throw new Exception ("Resource Adapter selected doesn't support Inflow"); 163 } 164 ActiveInboundResourceAdapter rar = (ActiveInboundResourceAdapter) activeRar; 165 166 ResourceAdapter ra = rar.getResourceAdapter(); 168 169 ConnectorDescriptor desc = rar.getDescriptor(); 170 171 String msgListenerType = getDescriptor().getMessageListenerType(); 172 if (msgListenerType == null || "".equals(msgListenerType)) 173 msgListenerType = "javax.jms.MessageListener"; 174 175 Iterator i = 176 desc.getInboundResourceAdapter().getMessageListeners().iterator(); 177 178 MessageListener msgListener = null; 179 while (i.hasNext()) { 180 msgListener = (MessageListener) i.next(); 181 if (msgListenerType.equals(msgListener.getMessageListenerType())) 182 break; 183 } 184 185 String activationSpecClassName = null; 186 if (msgListener != null) { 187 activationSpecClassName = msgListener.getActivationSpecClass(); 188 } 189 190 191 if (activationSpecClassName != null) { 192 if (logger.isLoggable(Level.FINEST)) { 193 String msg = 194 "ActivationSpecClassName = " + activationSpecClassName; 195 logger.log(Level.FINEST, msg); 196 } 197 198 try { 199 ClassLoader cl = rar.getClassLoader(); 200 Class aClass = cl.loadClass(activationSpecClassName); 201 202 if (logger.isLoggable(Level.FINEST)) { 203 logger.log(Level.FINEST, "classloader = " 204 + aClass.getClassLoader()); 205 logger.log(Level.FINEST, "classloader parent = " 206 + aClass.getClassLoader().getParent()); 207 } 208 209 ActivationSpec activationSpec = 210 (ActivationSpec ) aClass.newInstance(); 211 Set props = RARUtils.getMergedActivationConfigProperties(getDescriptor()); 212 213 AccessController.doPrivileged 214 (new SetMethodAction(activationSpec, props)); 215 216 activationSpec.setResourceAdapter(ra); 217 218 226 227 boolean validate = 228 "true".equals(System.getProperty("validate.jms.ra")); 229 if (validate) { 230 try { 231 activationSpec.validate(); 232 } catch (Exception ex) { 233 logger.log(Level.SEVERE, 234 "endpointfactory.as_validate_Failed", ex); 235 } 236 } 237 238 myState=BLOCKED; 239 ra.endpointActivation(this, activationSpec); 240 241 rar.addEndpointFactoryInfo(beanID_, 242 new MessageEndpointFactoryInfo(this, activationSpec)); 243 244 245 } catch (Exception ex) { 246 247 ex.printStackTrace(); 248 throw (Exception ) (new Exception ()).initCause(ex); 249 } 250 } else { 251 throw new Exception ("Unsupported message listener type"); 253 } 254 } 255 256 262 public void start() throws Exception { 263 logger.logp(Level.FINEST, 264 "ConnectorMessageBeanClient", "start", "called..."); 265 started_ = true; 266 myState=UNBLOCKED; 267 synchronized (this) { 268 notifyAll(); 269 } 270 } 271 272 277 public void close() { 278 logger.logp(Level.FINEST, 279 "ConnectorMessageBeanClient", "close", "called..."); 280 281 started_ = false; 283 284 String resourceAdapterMid = getDescriptor().getResourceAdapterMid(); 285 286 ActiveResourceAdapter activeRar = 287 registry_.getActiveResourceAdapter(resourceAdapterMid); 288 289 if (activeRar instanceof ActiveInboundResourceAdapter) { ActiveInboundResourceAdapter rar = 291 (ActiveInboundResourceAdapter) activeRar; 292 MessageEndpointFactoryInfo info = 293 rar.getEndpointFactoryInfo(beanID_); 294 295 if (info != null) { 296 rar.getResourceAdapter().endpointDeactivation( 297 info.getEndpointFactory(), info.getActivationSpec()); 298 299 rar.removeEndpointFactoryInfo(beanID_); 300 } else { 301 logger.log(Level.FINE,"Not de-activating the end point, since it is not activated"); 302 } 303 } 304 } 305 306 private EjbMessageBeanDescriptor getDescriptor() { 307 return descriptor_; 308 } 309 310 322 public MessageEndpoint 323 createEndpoint (XAResource xa) throws UnavailableException { 324 325 synchronized (this) { 329 if (myState == BLOCKED) { 330 try { 331 wait(WAIT_TIME); 332 }catch (Exception e) { 333 }finally { 335 336 myState = UNBLOCKED; 340 } 341 } 342 } 343 344 if (!started_) { 345 logger.log(Level.WARNING, "endpointfactory.unavailable"); 346 throw new UnavailableException ( 347 "EndpointFactory is currently not available"); 348 } 349 350 MessageEndpoint endpoint = null; 351 try { 352 ResourceHandle resourceHandle = allocator_.createResource(xa); 353 354 MessageBeanListener listener = 355 messageBeanPM_.createMessageBeanListener(resourceHandle); 356 357 String moduleID = getDescriptor().getApplication().getModuleID(); 362 Class endpointClass = null; 363 ClassLoader loader = null; 364 try { 365 BundleDescriptor moduleDesc = 366 getDescriptor().getEjbBundleDescriptor(); 367 loader = moduleDesc.getClassLoader(); 368 }catch(Exception e){ 369 logger.log(Level.WARNING, "endpointfactory.loader_not_found",e); 370 } 371 372 if (loader == null) { 373 loader = Thread.currentThread().getContextClassLoader(); 374 } 375 376 endpointClass = loader.loadClass(MESSAGE_ENDPOINT); 377 378 379 String msgListenerType = getDescriptor().getMessageListenerType(); 380 if (msgListenerType == null || "".equals(msgListenerType)) 381 msgListenerType = "javax.jms.MessageListener"; 382 383 Class listenerClass = loader.loadClass(msgListenerType); 384 385 MessageEndpointInvocationHandler handler = 386 new MessageEndpointInvocationHandler(listener, messageBeanPM_); 387 endpoint = (MessageEndpoint ) Proxy.newProxyInstance 388 (loader, new Class [] {endpointClass, listenerClass}, handler); 389 390 } catch (Exception ex) { 391 throw (UnavailableException ) 392 (new UnavailableException ()).initCause(ex); 393 } 394 395 return endpoint; 396 } 397 398 403 public boolean isDeliveryTransacted(Method method) { 404 return messageBeanPM_.isDeliveryTransacted(method); 405 } 406 407 410 public String toString() { 411 return beanID_; 412 } 413 414 } 415 | Popular Tags |