1 22 package org.jboss.ejb3.mdb; 23 24 import java.lang.reflect.InvocationTargetException ; 25 import java.lang.reflect.Method ; 26 import java.util.ArrayList ; 27 import java.util.HashMap ; 28 import java.util.HashSet ; 29 import java.util.Hashtable ; 30 import java.util.Map ; 31 32 import javax.ejb.ActivationConfigProperty ; 33 import javax.ejb.EJBException ; 34 import javax.ejb.MessageDriven ; 35 import javax.ejb.Timer ; 36 import javax.ejb.TimerService ; 37 import javax.ejb.TransactionAttribute ; 38 import javax.ejb.TransactionAttributeType ; 39 import javax.ejb.TransactionManagement ; 40 import javax.ejb.TransactionManagementType ; 41 import javax.jms.Connection ; 42 import javax.jms.ConnectionConsumer ; 43 import javax.jms.Destination ; 44 import javax.jms.ExceptionListener ; 45 import javax.jms.JMSException ; 46 import javax.jms.Message ; 47 import javax.jms.MessageListener ; 48 import javax.jms.ObjectMessage ; 49 import javax.jms.Queue ; 50 import javax.jms.QueueConnection ; 51 import javax.jms.ServerSessionPool ; 52 import javax.jms.Topic ; 53 import javax.jms.TopicConnection ; 54 import javax.management.MBeanServer ; 55 import javax.management.ObjectName ; 56 import javax.naming.Context ; 57 import javax.naming.NamingException ; 58 import javax.transaction.Transaction ; 59 import org.jboss.annotation.ejb.AcknowledgementMode; 60 import org.jboss.annotation.ejb.Consumer; 61 import org.jboss.annotation.ejb.DefaultActivationSpecs; 62 import org.jboss.annotation.ejb.Durability; 63 import org.jboss.annotation.ejb.Local; 64 import org.jboss.annotation.ejb.MessageProperties; 65 import org.jboss.annotation.ejb.MessagePropertiesImpl; 66 import org.jboss.annotation.ejb.Producer; 67 import org.jboss.annotation.ejb.Producers; 68 import org.jboss.aop.AspectManager; 69 import org.jboss.aop.MethodInfo; 70 import org.jboss.aop.advice.Interceptor; 71 import org.jboss.aop.joinpoint.Invocation; 72 import org.jboss.aop.joinpoint.InvocationResponse; 73 import org.jboss.aop.joinpoint.MethodInvocation; 74 import org.jboss.aop.util.MethodHashing; 75 import org.jboss.aop.util.PayloadKey; 76 import org.jboss.deployment.DeploymentException; 77 import org.jboss.ejb3.Container; 78 import org.jboss.ejb3.EJBContainer; 79 import org.jboss.ejb3.EJBContainerInvocation; 80 import org.jboss.ejb3.Ejb3Module; 81 import org.jboss.ejb3.ProxyFactoryHelper; 82 import org.jboss.ejb3.ThreadLocalENCFactory; 83 import org.jboss.ejb3.Ejb3Deployment; 84 import org.jboss.ejb3.interceptor.InterceptorInfoRepository; 85 import org.jboss.ejb3.mdb.inflow.JBossMessageEndpointFactory; 86 import org.jboss.ejb3.timerservice.TimedObjectInvoker; 87 import org.jboss.ejb3.timerservice.TimerServiceFactory; 88 import org.jboss.ejb3.tx.TxUtil; 89 import org.jboss.jms.ConnectionFactoryHelper; 90 import org.jboss.jms.asf.ServerSessionPoolFactory; 91 import org.jboss.jms.asf.StdServerSessionPool; 92 import org.jboss.jms.jndi.JMSProviderAdapter; 93 import org.jboss.logging.Logger; 94 95 101 public class ConsumerContainer extends MessagingContainer 102 { 103 private static final Logger log = Logger.getLogger(ConsumerContainer.class); 104 105 protected Class messagingType = null; 106 protected Method ON_MESSAGE; 107 108 protected ArrayList <ProducerFactory> producers = new ArrayList <ProducerFactory>(); 109 110 115 protected final static String DEFAULT_DESTINATION_TYPE = "javax.jms.Topic"; 116 117 118 122 public static final String CONSUMER_MESSAGE = "CONSUMER_MESSAGE"; 123 124 125 public ConsumerContainer(String ejbName, AspectManager manager, ClassLoader cl, String beanClassName, 126 Hashtable ctxProperties, InterceptorInfoRepository interceptorRepository, 127 Ejb3Deployment deployment) 128 { 129 super(ejbName, manager, cl, beanClassName, ctxProperties, interceptorRepository, deployment); 130 } 131 132 public InvocationResponse dynamicInvoke(Invocation invocation) throws Throwable 133 { 134 ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); 135 EJBContainerInvocation newSi = null; 136 ThreadLocalENCFactory.push(enc); 137 try 138 { 139 Thread.currentThread().setContextClassLoader(classloader); 140 MethodInvocation si = (MethodInvocation) invocation; 141 MethodInfo info = (MethodInfo) methodInterceptors.get(si.getMethodHash()); 142 if (info == null) 143 { 144 throw new RuntimeException ("Could not resolve beanClass method from proxy call"); 145 } 146 147 Interceptor[] aspects = info.getInterceptors(); 148 newSi = new EJBContainerInvocation(info, aspects); 149 newSi.setArguments(si.getArguments()); 150 newSi.setMetaData(si.getMetaData()); 151 newSi.setAdvisor(this); 152 153 InvocationResponse response = new InvocationResponse(newSi.invokeNext()); 154 response.setContextInfo(newSi.getResponseContextInfo()); 155 return response; 156 } 157 finally 158 { 159 Thread.currentThread().setContextClassLoader(oldLoader); 160 ThreadLocalENCFactory.pop(); 161 } 162 } 163 164 protected Method getOnMessage() 165 { 166 if (ON_MESSAGE != null) 167 return ON_MESSAGE; 168 169 try 170 { 171 final Class arg = Message .class; 172 ON_MESSAGE = javax.jms.MessageListener .class.getMethod("onMessage", new Class []{arg}); 173 } 174 catch (Exception e) 175 { 176 e.printStackTrace(); 177 throw new ExceptionInInitializerError (e); 178 } 179 180 return ON_MESSAGE; 181 } 182 183 public Object localInvoke(MethodInfo info, Object [] args) throws Throwable 184 { 185 if (info.getAdvisedMethod().equals(getOnMessage())) 186 { 187 ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); 188 ThreadLocalENCFactory.push(enc); 189 190 try 191 { 192 Message message = (Message )args[0]; 193 MethodInvocation invocation = (MethodInvocation) ((ObjectMessage ) message).getObject(); 194 invocation.getMetaData().addMetaData(CONSUMER_MESSAGE, CONSUMER_MESSAGE, message, PayloadKey.TRANSIENT); 195 return this.dynamicInvoke(invocation); 196 } 197 finally 198 { 199 Thread.currentThread().setContextClassLoader(oldLoader); 200 ThreadLocalENCFactory.pop(); 201 } 202 } 203 else 204 return super.localInvoke(info, args); 205 } 206 207 public Class getMessagingType() 208 { 209 return javax.jms.MessageListener .class; 210 } 211 212 public MethodInfo getMethodInfo(Method method) 213 { 214 MethodInfo info = new MethodInfo(); 215 info.setAdvisor(this); 216 info.setAdvisedMethod(method); 217 info.setUnadvisedMethod(method); 218 219 return info; 220 } 221 222 public Map getActivationConfigProperties() 223 { 224 HashMap result = new HashMap (); 225 Consumer annotation = (Consumer ) resolveAnnotation(Consumer .class); 226 for (ActivationConfigProperty property : annotation.activationConfig()) 227 { 228 addActivationSpecProperty(result, property); 229 } 230 231 DefaultActivationSpecs defaultSpecsAnnotation = (DefaultActivationSpecs)resolveAnnotation(DefaultActivationSpecs.class); 232 if (defaultSpecsAnnotation != null) 233 { 234 for (ActivationConfigProperty property : defaultSpecsAnnotation.value()) 235 { 236 addActivationSpecProperty(result, property); 237 } 238 } 239 240 return result; 241 } 242 243 244 253 public void start() throws Exception 254 { 255 super.start(); 256 257 registerProducers(); 258 } 259 260 public Class [] getProducerInterfaces(Container container1) 261 { 262 Class beanClass = container1.getBeanClass(); 263 Class [] interfaces = beanClass.getInterfaces(); 264 if (interfaces.length == 0) throw new RuntimeException ("Bean class must implement at least one interface: " + beanClass.getName()); 265 if (interfaces.length == 1) 266 { 267 return interfaces; 268 } 269 ArrayList localInterfaces = new ArrayList (); 270 for (int i = 0; i < interfaces.length; i++) 271 { 272 if (interfaces[i].isAnnotationPresent(Producer.class)) 273 { 274 localInterfaces.add(interfaces[i]); 275 } 276 } 277 Producer annotation = (Producer)resolveAnnotation(Producer.class); 278 if (annotation != null) 279 { 280 Class producer = annotation.producer(); 281 if (producer != null) 282 localInterfaces.add(producer); 283 } 284 285 Producers producersAnnotation = (Producers)resolveAnnotation(Producers.class); 286 if (producersAnnotation != null) 287 { 288 for (Producer producerAnnotation : producersAnnotation.value()) 289 { 290 Class producer = producerAnnotation.producer(); 291 if (producer != null) 292 localInterfaces.add(producer); 293 } 294 } 295 296 if (localInterfaces.size() == 0) return null; 297 interfaces = (Class []) localInterfaces.toArray(new Class [localInterfaces.size()]); 298 return interfaces; 299 } 300 301 protected void registerProducers() throws Exception 302 { 303 Destination dest = (Destination ) getInitialContext().lookup(getDestination()); 304 Class [] producers = getProducerInterfaces(this); 305 MessageProperties props = (MessageProperties) resolveAnnotation(MessageProperties.class); 306 if (props == null) props = new MessagePropertiesImpl(); 307 for (Class producer : producers) 308 { 309 log.debug("Producer: " + producer.getName()); 310 ProducerFactory producerFactory = null; 311 if (producer.isAnnotationPresent(Local.class)) 312 { 313 producerFactory = new LocalProducerFactory(this, producer, props, dest, getInitialContext(), initialContextProperties); 314 } 315 else 316 { 317 producerFactory = new RemoteProducerFactory(this, producer, props, dest, getInitialContext(), initialContextProperties); 318 } 319 this.producers.add(producerFactory); 320 producerFactory.start(); 321 } 322 } 323 324 protected void unregisterProducers() throws Exception 325 { 326 for (ProducerFactory factory : producers) 327 { 328 factory.stop(); 329 } 330 } 331 332 protected void populateActivationSpec() 333 { 334 DefaultActivationSpecs defaultSpecs = (DefaultActivationSpecs) resolveAnnotation(DefaultActivationSpecs.class); 335 if (defaultSpecs != null) 336 { 337 activationSpec.merge(defaultSpecs.value()); 338 } 339 340 Consumer md = (Consumer ) resolveAnnotation(Consumer .class); 341 activationSpec.merge(md.activationConfig()); 342 } 343 344 public void stop() throws Exception 345 { 346 super.stop(); 347 unregisterProducers(); 348 } 349 } 350 | Popular Tags |