KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > enterprise > connectors > inflow > ConnectorMessageBeanClient


1 /*
2  * The contents of this file are subject to the terms
3  * of the Common Development and Distribution License
4  * (the License). You may not use this file except in
5  * compliance with the License.
6  *
7  * You can obtain a copy of the license at
8  * https://glassfish.dev.java.net/public/CDDLv1.0.html or
9  * glassfish/bootstrap/legal/CDDLv1.0.txt.
10  * See the License for the specific language governing
11  * permissions and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL
14  * Header Notice in each file and include the License file
15  * at glassfish/bootstrap/legal/CDDLv1.0.txt.
16  * If applicable, add the following below the CDDL Header,
17  * with the fields enclosed by brackets [] replaced by
18  * you own identifying information:
19  * "Portions Copyrighted [year] [name of copyright owner]"
20  *
21  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
22  */

23 package com.sun.enterprise.connectors.inflow;
24
25 import com.sun.ejb.MessageBeanClient;
26 import com.sun.ejb.MessageBeanListener;
27 import com.sun.ejb.MessageBeanProtocolManager;
28 import com.sun.enterprise.deployment.ConnectorDescriptor;
29 import com.sun.enterprise.deployment.EjbMessageBeanDescriptor;
30 import com.sun.enterprise.deployment.BundleDescriptor;
31 import com.sun.enterprise.deployment.EnvironmentProperty;
32 import com.sun.enterprise.deployment.MessageListener;
33 import com.sun.enterprise.resource.ResourceHandle;
34 import com.sun.enterprise.connectors.util.*;
35 import com.sun.enterprise.connectors.ConnectorRegistry;
36 import com.sun.enterprise.connectors.ConnectorRuntime;
37 import com.sun.enterprise.connectors.ConnectorRuntimeException;
38 import com.sun.enterprise.connectors.ActiveResourceAdapter;
39 import com.sun.enterprise.connectors.ActiveInboundResourceAdapter;
40 import com.sun.enterprise.connectors.ActiveOutboundResourceAdapter;
41 import com.sun.enterprise.connectors.system.ActiveJmsResourceAdapter;
42 import com.sun.logging.LogDomains;
43 import java.lang.reflect.Method JavaDoc;
44 import java.lang.reflect.Proxy JavaDoc;
45 import java.security.AccessController JavaDoc;
46 import java.security.PrivilegedAction JavaDoc;
47 import java.util.HashSet JavaDoc;
48 import java.util.Iterator JavaDoc;
49 import java.util.Set JavaDoc;
50 import java.util.logging.Level JavaDoc;
51 import java.util.logging.Logger JavaDoc;
52 import javax.naming.InitialContext JavaDoc;
53 import javax.resource.spi.ActivationSpec JavaDoc;
54 import javax.resource.spi.ResourceAdapter JavaDoc;
55 import javax.resource.spi.endpoint.MessageEndpoint JavaDoc;
56 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
57 import javax.resource.spi.UnavailableException JavaDoc;
58 import javax.transaction.xa.XAResource JavaDoc;
59
60 /**
61  * Main helper implementation for message-beans associated with
62  * a queue. Uses connection consumer for concurrent message
63  * delivery.
64  *
65  * @author Qingqing Ouyang
66  */

67 public class ConnectorMessageBeanClient
68     implements MessageBeanClient, MessageEndpointFactory JavaDoc {
69
70     private static String JavaDoc MESSAGE_ENDPOINT =
71     "javax.resource.spi.endpoint.MessageEndpoint";
72     
73     private ConnectorRegistry registry_;
74    
75     private MessageBeanProtocolManager messageBeanPM_;
76     private EjbMessageBeanDescriptor descriptor_;
77     private BasicResourceAllocator allocator_;
78     private boolean started_ = false;
79
80     private final int CREATED = 0;
81     private final int BLOCKED = 1;
82     private final int UNBLOCKED = 2;
83     private int myState=CREATED;
84
85     private final long WAIT_TIME = 60000;
86
87     //unique identify a message-driven bean
88
private String JavaDoc beanID_; //appName:modlueID:beanName
89

90     private static Logger JavaDoc logger =
91     LogDomains.getLogger(LogDomains.RSR_LOGGER);
92
93     /**
94      * Creates an instance of <code>ConnectorMessageBeanClient</code>
95      *
96      * @param descriptor <code>EjbMessageBeanDescriptor</code> object.
97      */

98     public ConnectorMessageBeanClient(EjbMessageBeanDescriptor descriptor) {
99         descriptor_ = descriptor;
100         allocator_ = new BasicResourceAllocator();
101         
102         String JavaDoc appName = descriptor.getApplication().getName();
103         
104         String JavaDoc moduleID =
105             descriptor.getEjbBundleDescriptor().getModuleID();
106         
107         String JavaDoc beanName = descriptor.getName();
108         
109         beanID_ = appName + ":" + moduleID + ":" + beanName;
110
111     try {
112             registry_ = ConnectorRegistry.getInstance();
113         } catch (Exception JavaDoc e) {
114         }
115
116     }
117     
118     /**
119      * Gets executed as part of message bean deployment. Creates the
120      * <code>ActivationSpec</code> javabean and does endpointfactory
121      * activation with the resource adapter. This code also converts
122      * all J2EE 1.3 MDB properties to MQ resource adapter activation
123      * spec properties, if user doesnt specifies resource adapter
124      * module name in sun-ejb-jar.xml of the MDB. This is done using
125      * <code>com.sun.enterprise.connector.system.ActiveJmsResourceAdapter
126      * </code>
127      *
128      * @param pm <code>MessageBeanProtocolManager</code> object.
129      */

130     public void setup(MessageBeanProtocolManager messageBeanPM)
131         throws Exception JavaDoc {
132         boolean d = true;
133
134         messageBeanPM_ = messageBeanPM;
135         
136         String JavaDoc resourceAdapterMid = descriptor_.getResourceAdapterMid();
137         ActiveResourceAdapter activeRar = null;
138         if (resourceAdapterMid == null) {
139             resourceAdapterMid = ConnectorRuntime.DEFAULT_JMS_ADAPTER;
140         }
141         activeRar = registry_.getActiveResourceAdapter(resourceAdapterMid);
142         
143         if(activeRar == null &&
144               resourceAdapterMid.equals(ConnectorRuntime.DEFAULT_JMS_ADAPTER)) {
145             ConnectorRuntime crt = ConnectorRuntime.getRuntime();
146             crt.loadDeferredResourceAdapter(resourceAdapterMid);
147             activeRar = registry_.getActiveResourceAdapter(resourceAdapterMid);
148         }
149
150         if (activeRar == null) {
151         String JavaDoc msg = "Resource adapter "+resourceAdapterMid+ " is not deployed";
152         throw new ConnectorRuntimeException(msg);
153         }
154
155         if (activeRar instanceof ActiveJmsResourceAdapter) {
156             ActiveJmsResourceAdapter jmsRa = (ActiveJmsResourceAdapter) activeRar;
157             jmsRa.updateMDBRuntimeInfo(descriptor_,
158                                        messageBeanPM_.getPoolDescriptor());
159         }
160
161         if (!(activeRar instanceof ActiveInboundResourceAdapter)) {
162             throw new Exception JavaDoc("Resource Adapter selected doesn't support Inflow");
163         }
164         ActiveInboundResourceAdapter rar = (ActiveInboundResourceAdapter) activeRar;
165
166         //the resource adapter this MDB client is deployed to
167
ResourceAdapter ra = rar.getResourceAdapter();
168         
169         ConnectorDescriptor desc = rar.getDescriptor();
170         
171         String JavaDoc msgListenerType = getDescriptor().getMessageListenerType();
172         if (msgListenerType == null || "".equals(msgListenerType))
173           msgListenerType = "javax.jms.MessageListener";
174
175         Iterator JavaDoc i =
176             desc.getInboundResourceAdapter().getMessageListeners().iterator();
177
178         MessageListener msgListener = null;
179         while (i.hasNext()) {
180             msgListener = (MessageListener) i.next();
181             if (msgListenerType.equals(msgListener.getMessageListenerType()))
182                 break;
183         }
184
185         String JavaDoc activationSpecClassName = null;
186         if (msgListener != null) {
187             activationSpecClassName = msgListener.getActivationSpecClass();
188         }
189
190     
191         if (activationSpecClassName != null) {
192             if (logger.isLoggable(Level.FINEST)) {
193                 String JavaDoc msg =
194                     "ActivationSpecClassName = " + activationSpecClassName;
195                 logger.log(Level.FINEST, msg);
196             }
197             
198             try {
199                 ClassLoader JavaDoc cl = rar.getClassLoader();
200                 Class JavaDoc aClass = cl.loadClass(activationSpecClassName);
201                 
202                 if (logger.isLoggable(Level.FINEST)) {
203                     logger.log(Level.FINEST, "classloader = "
204                             + aClass.getClassLoader());
205                     logger.log(Level.FINEST, "classloader parent = "
206                             + aClass.getClassLoader().getParent());
207                 }
208
209                 ActivationSpec JavaDoc activationSpec =
210                     (ActivationSpec JavaDoc) aClass.newInstance();
211                 Set JavaDoc props = RARUtils.getMergedActivationConfigProperties(getDescriptor());
212
213                 AccessController.doPrivileged
214                     (new SetMethodAction(activationSpec, props));
215                     
216                 activationSpec.setResourceAdapter(ra);
217                 
218                 /*
219                   AccessController.doPrivileged(new PrivilegedAction() {
220                   public java.lang.Object run() {
221                   activationSpec.setResourceAdapter(ra);
222                   return null;
223                   }
224                   });
225                 */

226                 
227                 boolean validate =
228                     "true".equals(System.getProperty("validate.jms.ra"));
229                 if (validate) {
230                     try {
231                         activationSpec.validate();
232                     } catch (Exception JavaDoc ex) {
233                         logger.log(Level.SEVERE,
234                                 "endpointfactory.as_validate_Failed", ex);
235                     }
236                 }
237                 
238                 myState=BLOCKED;
239                 ra.endpointActivation(this, activationSpec);
240             
241                 rar.addEndpointFactoryInfo(beanID_,
242                         new MessageEndpointFactoryInfo(this, activationSpec));
243         
244                 
245             } catch (Exception JavaDoc ex) {
246         
247                 ex.printStackTrace();
248                 throw (Exception JavaDoc) (new Exception JavaDoc()).initCause(ex);
249             }
250         } else {
251             //FIXME throw some exception here.
252
throw new Exception JavaDoc("Unsupported message listener type");
253         }
254     }
255
256     /**
257      * Marks the completion of MDB deployment. Unblocks the createEndPoint
258      * method call.
259      *
260      * @throws Exception
261      */

262     public void start() throws Exception JavaDoc {
263         logger.logp(Level.FINEST,
264                 "ConnectorMessageBeanClient", "start", "called...");
265         started_ = true;
266     myState=UNBLOCKED;
267         synchronized (this) {
268         notifyAll();
269         }
270     }
271
272     /**
273      * Does endpoint deactivation with the resource adapter.
274      * Also remove sthe <code>MessageEndpointFactoryInfo</code>
275      * from house keeping.
276      */

277     public void close() {
278         logger.logp(Level.FINEST,
279                 "ConnectorMessageBeanClient", "close", "called...");
280         
281         started_ = false; //no longer available
282

283         
284         String JavaDoc resourceAdapterMid = getDescriptor().getResourceAdapterMid();
285         
286         ActiveResourceAdapter activeRar =
287             registry_.getActiveResourceAdapter(resourceAdapterMid);
288
289         if (activeRar instanceof ActiveInboundResourceAdapter) { //in case the RA is already undeployed
290
ActiveInboundResourceAdapter rar =
291            (ActiveInboundResourceAdapter) activeRar;
292             MessageEndpointFactoryInfo info =
293                 rar.getEndpointFactoryInfo(beanID_);
294             
295         if (info != null) {
296                 rar.getResourceAdapter().endpointDeactivation(
297                     info.getEndpointFactory(), info.getActivationSpec());
298             
299                 rar.removeEndpointFactoryInfo(beanID_);
300         } else {
301             logger.log(Level.FINE,"Not de-activating the end point, since it is not activated");
302         }
303         }
304     }
305
306     private EjbMessageBeanDescriptor getDescriptor() {
307         return descriptor_;
308     }
309
310     /**
311      * Creates a MessageEndpoint. This method gets blocked either until start()
312      * is called or until one minute. This is the time for completion
313      * of MDB deployment.
314      *
315      * Internally this method creates a message bean listener from the MDB
316      * container and a proxy object fo delivering messages.
317      *
318      * @return <code>MessageEndpoint</code> object.
319      * @throws <code>UnavailableException</code> In case of any failure. This
320      * should change.
321      */

322     public MessageEndpoint JavaDoc
323     createEndpoint (XAResource JavaDoc xa) throws UnavailableException JavaDoc {
324         
325     // This is a temperory workaround for blocking the the create enpoint
326
// until the deployment completes. One thread would wait for maximum a
327
// a minute.
328
synchronized (this) {
329         if (myState == BLOCKED) {
330                try {
331                wait(WAIT_TIME);
332            }catch (Exception JavaDoc e) {
333                // This exception should not affect the functionality.
334
}finally {
335
336            // Once the first thread comes out of wait, block is
337
// is removed. This makes sure that the time for which the
338
// the block remains is limited. Max 2x6000.
339
myState = UNBLOCKED;
340            }
341             }
342         }
343         
344         if (!started_) {
345             logger.log(Level.WARNING, "endpointfactory.unavailable");
346             throw new UnavailableException JavaDoc(
347                     "EndpointFactory is currently not available");
348         }
349         
350         MessageEndpoint JavaDoc endpoint = null;
351         try {
352             ResourceHandle resourceHandle = allocator_.createResource(xa);
353             
354             MessageBeanListener listener =
355                 messageBeanPM_.createMessageBeanListener(resourceHandle);
356
357             //Use the MDB's application classloader to load the
358
//message listener class. If it is generic listener
359
//class, it is expected to be packaged with the MDB application
360
//or in the system classpath.
361
String JavaDoc moduleID = getDescriptor().getApplication().getModuleID();
362             Class JavaDoc endpointClass = null;
363             ClassLoader JavaDoc loader = null;
364             try {
365         BundleDescriptor moduleDesc =
366             getDescriptor().getEjbBundleDescriptor();
367         loader = moduleDesc.getClassLoader();
368             }catch(Exception JavaDoc e){
369                 logger.log(Level.WARNING, "endpointfactory.loader_not_found",e);
370             }
371
372             if (loader == null) {
373                 loader = Thread.currentThread().getContextClassLoader();
374             }
375
376             endpointClass = loader.loadClass(MESSAGE_ENDPOINT);
377             
378
379             String JavaDoc msgListenerType = getDescriptor().getMessageListenerType();
380             if (msgListenerType == null || "".equals(msgListenerType))
381                 msgListenerType = "javax.jms.MessageListener";
382
383             Class JavaDoc listenerClass = loader.loadClass(msgListenerType);
384
385             MessageEndpointInvocationHandler handler =
386                 new MessageEndpointInvocationHandler(listener, messageBeanPM_);
387             endpoint = (MessageEndpoint JavaDoc) Proxy.newProxyInstance
388                 (loader, new Class JavaDoc[] {endpointClass, listenerClass}, handler);
389             
390         } catch (Exception JavaDoc ex) {
391             throw (UnavailableException JavaDoc)
392                 (new UnavailableException JavaDoc()).initCause(ex);
393         }
394             
395         return endpoint;
396     }
397
398     /**
399      * Checks whether the message delivery is transacted for the method.
400      *
401      * @return true or false.
402      */

403     public boolean isDeliveryTransacted(Method JavaDoc method) {
404         return messageBeanPM_.isDeliveryTransacted(method);
405     }
406     
407     /**
408      * @return beanID of the message bean client
409      */

410     public String JavaDoc toString() {
411         return beanID_;
412     }
413
414 }
415
Popular Tags