KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > connector > JoramAdapter


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - 2007 ScalAgent Distributed Technologies
4  * Copyright (C) 2004 - 2006 Bull SA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (Bull SA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  * Benoit Pelletier (Bull SA)
24  */

25 package org.objectweb.joram.client.connector;
26
27 import fr.dyade.aaa.agent.AgentServer;
28 import com.scalagent.jmx.JMXServer;
29 import org.objectweb.joram.client.jms.Queue;
30 import org.objectweb.joram.client.jms.Topic;
31 import org.objectweb.joram.client.jms.admin.AdminException;
32 import org.objectweb.joram.client.jms.admin.JoramAdmin;
33 import org.objectweb.joram.client.jms.admin.User;
34 import org.objectweb.joram.client.jms.admin.DeadMQueue;
35 import org.objectweb.joram.client.jms.ha.local.XAHALocalConnectionFactory;
36 import org.objectweb.joram.client.jms.ha.tcp.XAHATcpConnectionFactory;
37 import org.objectweb.joram.client.jms.ha.local.TopicHALocalConnectionFactory;
38 import org.objectweb.joram.client.jms.ha.tcp.TopicHATcpConnectionFactory;
39
40 import org.objectweb.joram.client.jms.local.TopicLocalConnectionFactory;
41 import org.objectweb.joram.client.jms.local.XALocalConnectionFactory;
42 import org.objectweb.joram.client.jms.tcp.TopicTcpConnectionFactory;
43 import org.objectweb.joram.client.jms.tcp.XATcpConnectionFactory;
44 import org.objectweb.joram.client.jms.ConnectionMetaData;
45
46 import java.io.BufferedReader JavaDoc;
47 import java.io.File JavaDoc;
48 import java.io.FileReader JavaDoc;
49 import java.io.IOException JavaDoc;
50 import java.lang.reflect.Method JavaDoc;
51 import java.net.ConnectException JavaDoc;
52 import java.util.Enumeration JavaDoc;
53 import java.util.Hashtable JavaDoc;
54 import java.util.List JavaDoc;
55 import java.util.Properties JavaDoc;
56 import java.util.StringTokenizer JavaDoc;
57 import java.util.Vector JavaDoc;
58
59 import javax.jms.Destination JavaDoc;
60 import javax.jms.Session JavaDoc;
61 import javax.jms.TopicConnectionFactory JavaDoc;
62 import javax.jms.XAConnection JavaDoc;
63 import javax.jms.XAConnectionFactory JavaDoc;
64 import javax.management.MBeanServer JavaDoc;
65 import javax.management.MBeanServerFactory JavaDoc;
66 import javax.management.ObjectName JavaDoc;
67 import javax.naming.Context JavaDoc;
68 import javax.naming.InitialContext JavaDoc;
69 import javax.resource.NotSupportedException JavaDoc;
70 import javax.resource.ResourceException JavaDoc;
71 import javax.resource.spi.ActivationSpec JavaDoc;
72 import javax.resource.spi.BootstrapContext JavaDoc;
73 import javax.resource.spi.CommException JavaDoc;
74 import javax.resource.spi.IllegalStateException JavaDoc;
75 import javax.resource.spi.ResourceAdapterInternalException JavaDoc;
76 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
77 import javax.resource.spi.work.WorkManager JavaDoc;
78 import javax.transaction.xa.XAResource JavaDoc;
79
80 import org.objectweb.util.monolog.api.BasicLevel;
81
82 /**
83  * A <code>JoramAdapter</code> instance manages connectivities to an
84  * underlying JORAM server: outbound connectivity (JCA connection
85  * management contract) and inbound connectivity (asynchronous message
86  * delivery as specified by the JCA message inflow contract).
87  */

88 public class JoramAdapter
89   implements javax.resource.spi.ResourceAdapter JavaDoc,
90              java.io.Serializable JavaDoc, JoramAdapterMBean {
91   /** <code>WorkManager</code> instance provided by the application server. */
92   private transient WorkManager JavaDoc workManager;
93
94   /**
95    * Table holding the adapter's <code>InboundConsumer</code> instances,
96    * for inbound messaging.
97    * <p>
98    * <b>Key:</b> <code>ActivationSpec</code> instance<br>
99    * <b>Value:</b> <code>InboundConsumer</code> instance
100    */

101   private transient Hashtable JavaDoc consumers;
102   /**
103    * Vector holding the <code>ManagedConnectionImpl</code> instances for
104    * managed outbound messaging.
105    */

106   private transient Vector JavaDoc producers;
107   /**
108    * Table holding the adapter's <code>XAConnection</code> instances used for
109    * recovering the XA resources.
110    * <p>
111    * <b>Key:</b> user name<br>
112    * <b>Value:</b> <code>XAConnection</code> instance
113    */

114   private transient Hashtable JavaDoc connections;
115
116   /** <code>true</code> if the adapter has been started. */
117   private boolean started = false;
118   /** <code>true</code> if the adapter has been stopped. */
119   private boolean stopped = false;
120
121   /** <code>true</code> if the underlying JORAM server is collocated. */
122   boolean collocated = false;
123
124   /** <code>true</code> if the underlying a JORAM HA server is defined */
125   boolean isHa = false;
126
127   /** Host name or IP of the underlying JORAM server. */
128   String JavaDoc hostName = "localhost";
129   /** Port number of the underlying JORAM server. */
130   int serverPort = 16010;
131
132   /** Identifier of the JORAM server to start. */
133   short serverId = 0;
134
135   /** Identifier of the JORAM replica to start in case of HA. */
136   short clusterId = AgentServer.NULL_ID;
137
138   /** Platform servers identifiers. */
139   List JavaDoc platformServersIds = null;
140
141   /**
142    * Path to the directory containing JORAM's configuration files
143    * (<code>a3servers.xml</code>, <code>a3debug.cfg</code>
144    * and admin file), needed when starting the collocated JORAM server.
145    */

146   private String JavaDoc platformConfigDir;
147   /** <code>true</code> if the JORAM server to start is persistent. */
148   private boolean persistentPlatform = false;
149   /**
150    * Path to the file containing a description of the administered objects to
151    * create and bind.
152    */

153   private String JavaDoc adminFile = "joram-admin.cfg";
154   private String JavaDoc adminFileXML = "joramAdmin.xml";
155
156
157   /**
158    * Path to the file containing a description of the exported administered objects (destination)
159    */

160   private String JavaDoc adminFileExportXML = "joramAdminExport.xml";
161
162   /** Name of the JORAM server to start. */
163   private String JavaDoc serverName = "s0";
164
165   /** Names of the bound objects. */
166   private static Vector JavaDoc boundNames = new Vector JavaDoc();
167   /** Standard JMSResource MBean ObjectName. */
168   private static ObjectName JavaDoc jmsResourceON;
169   /** Local MBean server. */
170   private static MBeanServer JavaDoc mbs = null;
171
172   /**
173    * Duration in seconds during which connecting is attempted (connecting
174    * might take time if the server is temporarily not reachable); the 0 value
175    * is set for connecting only once and aborting if connecting failed.
176    */

177   public int connectingTimer = 0;
178   /**
179    * Duration in seconds during which a JMS transacted (non XA) session might
180    * be pending; above that duration the session is rolled back and closed;
181    * the 0 value means "no timer".
182    */

183   public int txPendingTimer = 0;
184   /**
185    * Period in milliseconds between two ping requests sent by the client
186    * connection to the server; if the server does not receive any ping
187    * request during more than 2 * cnxPendingTimer, the connection is
188    * considered as dead and processed as required.
189    */

190   public int cnxPendingTimer = 0;
191
192   /**
193    * The maximum number of messages that can be
194    * read at once from a queue.
195    *
196    * Default value is 2 in order to compensate
197    * the former subscription mechanism.
198    */

199   public int queueMessageReadMax = 2;
200
201   /**
202    * The maximum number of acknowledgements
203    * that can be buffered in
204    * Session.DUPS_OK_ACKNOWLEDGE mode when listening to a topic.
205    * Default is 0.
206    */

207   public int topicAckBufferMax = 0;
208
209   /**
210    * This threshold is the maximum messages
211    * number over
212    * which the subscription is passivated.
213    * Default is Integer.MAX_VALUE.
214    */

215   public int topicPassivationThreshold = Integer.MAX_VALUE;
216
217   /**
218    * This threshold is the minimum
219    * messages number below which
220    * the subscription is activated.
221    * Default is 0.
222    */

223   public int topicActivationThreshold = 0;
224
225   /**
226    * Determines whether the produced messages are asynchronously
227    * sent or not (without or with acknowledgement)
228    * Default is false (with ack).
229    */

230   public boolean asyncSend = false;
231
232   /**
233    * Determines whether client threads
234    * which are using the same connection
235    * are synchronized in order to group
236    * together the requests they send.
237    * Default is false.
238    */

239   public boolean multiThreadSync = false;
240
241   /**
242    * The maximum time the threads hang if 'multiThreadSync' is true.
243    * Either they wake up (wait time out) or they are notified (by the
244    * first woken up thread).
245    *
246    * Default is 1 ms.
247    */

248   public int multiThreadSyncDelay = 1;
249
250   /**
251    * Determine whether durablesubscription must be deleted or not
252    * at close time of the InboundConsumer.
253    * Default is false.
254    */

255   public boolean deleteDurableSubscription = false;
256
257   public JMXServer jmxServer;
258
259   private transient JoramAdmin joramAdmin;
260
261   /**
262    * Constructs a <code>JoramAdapter</code> instance.
263    */

264   public JoramAdapter() {
265     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
266       AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
267                                     "JORAM adapter instantiated.");
268
269     consumers = new Hashtable JavaDoc();
270     producers = new Vector JavaDoc();
271
272     java.util.ArrayList JavaDoc array = MBeanServerFactory.findMBeanServer(null);
273     if (!array.isEmpty())
274       mbs = (MBeanServer JavaDoc) array.get(0);
275     jmxServer = new JMXServer(mbs,"JoramAdapter");
276   }
277
278   /**
279    * Initializes the adapter; starts, if needed, a collocated JORAM server,
280    * and if needed again, administers it.
281    *
282    * @exception ResourceAdapterInternalException If the adapter could not be
283    * initialized.
284    */

285   public synchronized void start(BootstrapContext JavaDoc ctx)
286                            throws ResourceAdapterInternalException JavaDoc
287   {
288       // set HA mode if needed
289
joramAdmin.setHa(isHa);
290
291     if (started)
292       throw new ResourceAdapterInternalException JavaDoc("Adapter already started.");
293     if (stopped)
294       throw new ResourceAdapterInternalException JavaDoc("Adapter has been stopped.");
295
296     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
297       AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
298                                     "JORAM adapter starting deployment...");
299
300     workManager = ctx.getWorkManager();
301
302     // Collocated mode: starting the JORAM server.
303
if (collocated) {
304       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
305         AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
306                                       " - Collocated JORAM server is starting...");
307
308       if (persistentPlatform) {
309         System.setProperty("Transaction", "fr.dyade.aaa.util.NTransaction");
310         System.setProperty("NTNoLockFile", "true");
311       } else {
312         System.setProperty("Transaction", "fr.dyade.aaa.util.NullTransaction");
313         System.setProperty("NbMaxAgents", "" + Integer.MAX_VALUE);
314       }
315
316       if (platformConfigDir != null) {
317         System.setProperty("fr.dyade.aaa.agent.A3CONF_DIR", platformConfigDir);
318         System.setProperty("fr.dyade.aaa.DEBUG_DIR", platformConfigDir);
319       }
320
321       try {
322         AgentServer.init(serverId, serverName, null, clusterId);
323         AgentServer.start();
324         if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
325           AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
326                                         " - Collocated JORAM server has successfully started.");
327       } catch (Exception JavaDoc exc) {
328         AgentServer.stop();
329         AgentServer.reset(true);
330
331         throw new ResourceAdapterInternalException JavaDoc("Could not start "
332                                                    + "collocated JORAM "
333                                                    + " instance: " + exc);
334       }
335     }
336
337     // Starting admin.
338
try {
339       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
340         AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
341                                       " - Reading the provided admin file: " + adminFileXML);
342       JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileXML);
343     } catch (Exception JavaDoc exc) {
344       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
345         AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
346                                       "JORAM ADMIN XML not found.");
347     }
348
349     // Starting an admin session...
350
try {
351       adminConnect();
352       serverId = (short) joramAdmin.getPlatformAdmin().getLocalServerId();
353     } catch (Exception JavaDoc exc) {
354       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN))
355         AdapterTracing.dbgAdapter.log(BasicLevel.WARN,
356                                       " - JORAM server not administerable: " + exc);
357     }
358
359     // Recreates the objects (backup) if the export file is present
360
if (joramAdmin != null) {
361         joramAdmin.setAdminFileExportXML(adminFileExportXML);
362
363         try {
364             if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
365                 AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
366                                       " - Reading the provided admin file: " + adminFileExportXML);
367             JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileExportXML);
368
369             // redo the admin connection as the executeXMLAdmin has closed the session
370
adminConnect();
371         } catch (Exception JavaDoc exc) {
372             if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
373                 AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
374                         adminFileExportXML + " not found.");
375         }
376     }
377
378     // Administering as specified in the properties file.
379
try {
380       File JavaDoc file = null;
381
382       try {
383         if (platformConfigDir == null) {
384           java.net.URL JavaDoc url = ClassLoader.getSystemResource(adminFile);
385           file = new File JavaDoc(url.getFile());
386         }
387         else
388           file = new File JavaDoc(platformConfigDir, adminFile);
389       } catch (NullPointerException JavaDoc e) {
390         throw new java.io.FileNotFoundException JavaDoc();
391       }
392
393       FileReader JavaDoc fileReader = new FileReader JavaDoc(file);
394       BufferedReader JavaDoc reader = new BufferedReader JavaDoc(fileReader);
395
396       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
397         AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
398                                       " - Reading the provided admin file: " + file);
399
400       boolean end = false;
401       String JavaDoc line;
402       StringTokenizer JavaDoc tokenizer;
403       String JavaDoc firstToken;
404       String JavaDoc name = null;
405
406       while (! end) {
407         try {
408           line = reader.readLine();
409
410           if (line == null)
411             end = true;
412           else {
413             tokenizer = new StringTokenizer JavaDoc(line);
414
415             if (tokenizer.hasMoreTokens()) {
416               firstToken = tokenizer.nextToken();
417               if (firstToken.equalsIgnoreCase("Host")) {
418                 if (tokenizer.hasMoreTokens())
419                   hostName = tokenizer.nextToken();
420               }
421               else if (firstToken.equalsIgnoreCase("Port")) {
422                 if (tokenizer.hasMoreTokens())
423                   serverPort = Integer.parseInt(tokenizer.nextToken());
424               }
425               else if (firstToken.equalsIgnoreCase("Queue")) {
426                 if (tokenizer.hasMoreTokens()) {
427                   name = tokenizer.nextToken();
428                   createQueue(name);
429                 }
430               }
431               else if (firstToken.equalsIgnoreCase("Topic")) {
432                 if (tokenizer.hasMoreTokens()) {
433                   name = tokenizer.nextToken();
434                   createTopic(name);
435                 }
436               }
437               else if (firstToken.equalsIgnoreCase("User")) {
438                 if (tokenizer.hasMoreTokens())
439                   name = tokenizer.nextToken();
440                 if (tokenizer.hasMoreTokens()) {
441                   String JavaDoc password = tokenizer.nextToken();
442                   createUser(name, password);
443                 }
444                 else
445                   if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
446                     AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
447                                                   " - Missing password for user [" + name + "]");
448               }
449               else if (firstToken.equalsIgnoreCase("CF")) {
450                 if (tokenizer.hasMoreTokens()) {
451                   name = tokenizer.nextToken();
452                   createCF(name);
453                 }
454               }
455               else if (firstToken.equalsIgnoreCase("QCF")) {
456                 if (tokenizer.hasMoreTokens()) {
457                   name = tokenizer.nextToken();
458                   createQCF(name);
459                 }
460               }
461               else if (firstToken.equalsIgnoreCase("TCF")) {
462                 if (tokenizer.hasMoreTokens()) {
463                   name = tokenizer.nextToken();
464                   createTCF(name);
465                 }
466               }
467             }
468           }
469         }
470         // Error while reading one line.
471
catch (IOException JavaDoc exc) {
472         // Error while creating the destination.
473
} catch (AdminException exc) {
474           AdapterTracing.dbgAdapter.log(BasicLevel.ERROR,
475                                         "Creation failed",exc);
476         }
477       }
478     }
479     // No destination to deploy.
480
catch (java.io.FileNotFoundException JavaDoc fnfe) {
481       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
482         AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
483                                       " - No administration task requested.");
484     }
485
486     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
487       AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
488                                     "Server port is " + serverPort);
489
490     started = true;
491
492     // Registering MBeans...
493
try {
494       jmxServer.registerMBean(this,
495                               "joramClient",
496                               "type=JoramAdapter,version=" +
497                               ConnectionMetaData.providerVersion);
498     } catch (Exception JavaDoc e) {
499       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN))
500         AdapterTracing.dbgAdapter.log(BasicLevel.WARN,
501                                       " - Could not register JoramAdapterMBean",
502                                       e);
503     }
504
505     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
506       AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
507                                     "JORAM adapter " +
508                                     ConnectionMetaData.providerVersion +
509                                     " successfully deployed.");
510   }
511
512   /**
513    * Notifies the adapter to terminate the connections it manages, and if
514    * needed, to shut down the collocated JORAM server.
515    */

516   public synchronized void stop()
517   {
518     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
519       AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
520                                     "JORAM adapter stopping...");
521
522     if (! started || stopped)
523       return;
524
525     // Unbinds the bound objects...
526
while (! boundNames.isEmpty())
527       unbind((String JavaDoc) boundNames.remove(0));
528
529     // Finishing the admin session.
530
joramAdmin.getPlatformAdmin().disconnect();
531
532     // Closing the outbound connections, if any.
533
while (! producers.isEmpty()) {
534       try {
535         ((ManagedConnectionImpl) producers.remove(0)).destroy();
536       }
537       catch (Exception JavaDoc exc) {}
538     }
539
540     // Closing the inbound connections, if any.
541
for (Enumeration JavaDoc keys = consumers.keys(); keys.hasMoreElements();)
542       ((InboundConsumer) consumers.get(keys.nextElement())).close();
543
544     // Browsing the recovery connections, if any.
545
if (connections != null) {
546       for (Enumeration JavaDoc keys = connections.keys(); keys.hasMoreElements();) {
547         try {
548           ((XAConnection JavaDoc) connections.get(keys.nextElement())).close();
549         }
550         catch (Exception JavaDoc exc) {}
551       }
552     }
553
554     // If JORAM server is collocated, stopping it.
555
if (collocated) {
556       try {
557         AgentServer.stop();
558       }
559       catch (Exception JavaDoc exc) {}
560     }
561
562     stopped = true;
563
564     try {
565       jmxServer.unregisterMBean("joramClient",
566                                 "type=JoramAdapter,version=" +
567                                 ConnectionMetaData.providerVersion);
568     } catch (Exception JavaDoc e) {
569       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN))
570         AdapterTracing.dbgAdapter.log(BasicLevel.WARN,
571                                       "unregisterMBean",
572                                       e);
573     }
574
575
576     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))
577       AdapterTracing.dbgAdapter.log(BasicLevel.INFO,
578                                     "JORAM adapter successfully stopped.");
579   }
580
581   /**
582    * Notifies the adapter to setup asynchronous message delivery for an
583    * application server endoint.
584    *
585    * @exception IllegalStateException If the adapter is either not started,
586    * or stopped.
587    * @exception NotSupportedException If the provided activation parameters
588    * are invalid.
589    * @exception CommException If the JORAM server is not reachable.
590    * @exception SecurityException If connecting is not allowed.
591    * @exception ResourceException Generic exception.
592    */

593   public void endpointActivation(MessageEndpointFactory JavaDoc endpointFactory,
594                                  ActivationSpec JavaDoc spec)
595               throws ResourceException JavaDoc {
596     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
597       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
598                                     this + " endpointActivation(" + endpointFactory +
599                                     ", " + spec + ")");
600
601     if (! started)
602       throw new IllegalStateException JavaDoc("Non started resource adapter.");
603     if (stopped)
604       throw new IllegalStateException JavaDoc("Stopped resource adapter.");
605
606     if (! (spec instanceof ActivationSpecImpl))
607       throw new ResourceException JavaDoc("Provided ActivationSpec instance is not "
608                                   + "a JORAM activation spec.");
609
610     ActivationSpecImpl specImpl = (ActivationSpecImpl) spec;
611
612     if (! specImpl.getResourceAdapter().equals(this))
613       throw new ResourceException JavaDoc("Supplied ActivationSpec instance "
614                                   + "associated to an other ResourceAdapter.");
615
616     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
617       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
618                                     "Activating Endpoint on JORAM adapter.");
619
620     boolean durable =
621       specImpl.getSubscriptionDurability() != null
622       && specImpl.getSubscriptionDurability().equalsIgnoreCase("Durable");
623
624     boolean transacted = false;
625     try {
626       Class JavaDoc listenerClass = Class.forName("javax.jms.MessageListener");
627       Class JavaDoc[] parameters = { Class.forName("javax.jms.Message") };
628       Method JavaDoc meth = listenerClass.getMethod("onMessage", parameters);
629       transacted = endpointFactory.isDeliveryTransacted(meth);
630     }
631     catch (Exception JavaDoc exc) {
632       throw new ResourceException JavaDoc("Could not determine transactional "
633                                   + "context: " + exc);
634     }
635
636     int maxWorks = 10;
637     try {
638       maxWorks = Integer.parseInt(specImpl.getMaxNumberOfWorks());
639     } catch (Exception JavaDoc exc) {
640       throw new ResourceException JavaDoc("Invalid max number of works instances "
641                                   + "number: " + exc);
642     }
643
644     int maxMessages = 10;
645     try {
646       maxMessages = Integer.parseInt(specImpl.getMaxMessages());
647     } catch (Exception JavaDoc exc) {
648       throw new ResourceException JavaDoc("Invalid max messages "
649                                   + "number: " + exc);
650     }
651
652     int ackMode;
653     try {
654       if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl
655           .getAcknowledgeMode())) {
656         ackMode = Session.AUTO_ACKNOWLEDGE;
657       } else if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl
658           .getAcknowledgeMode())) {
659         ackMode = Session.DUPS_OK_ACKNOWLEDGE;
660       } else {
661         ackMode = Session.AUTO_ACKNOWLEDGE;
662       }
663     } catch (Exception JavaDoc exc) {
664       throw new ResourceException JavaDoc("Invalid acknowledge mode: " + exc);
665     }
666
667     String JavaDoc destType = specImpl.getDestinationType();
668     String JavaDoc destName = specImpl.getDestination();
669
670     try {
671       Destination JavaDoc dest;
672
673       if (destType.equals("javax.jms.Queue"))
674         dest = createQueue(destName);
675       else if (destType.equals("javax.jms.Topic"))
676         dest = createTopic(destName);
677       else
678         throw new NotSupportedException JavaDoc("Invalid destination type provided "
679                                         + "as activation parameter: "
680                                         + destType);
681
682       String JavaDoc userName = specImpl.getUserName();
683       String JavaDoc password = specImpl.getPassword();
684
685       createUser(userName, password);
686
687       XAConnectionFactory JavaDoc connectionFactory = null;
688
689       if (isHa) {
690           if (collocated)
691               connectionFactory = XAHALocalConnectionFactory.create();
692           else {
693               String JavaDoc urlHa = "hajoram://" + hostName + ":" + serverPort;
694               connectionFactory = XAHATcpConnectionFactory.create(urlHa);
695           }
696       } else {
697
698       if (collocated)
699         connectionFactory = XALocalConnectionFactory.create();
700       else
701               connectionFactory = XATcpConnectionFactory.create(hostName, serverPort);
702       }
703
704       ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().connectingTimer = connectingTimer;
705       ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().cnxPendingTimer = cnxPendingTimer;
706       ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().txPendingTimer = txPendingTimer;
707
708       if (queueMessageReadMax > 0) {
709         ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory)
710             .getParameters().queueMessageReadMax = queueMessageReadMax;
711       }
712
713       if (topicAckBufferMax > 0) {
714         ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory)
715             .getParameters().topicAckBufferMax = topicAckBufferMax;
716       }
717
718       if (topicPassivationThreshold > 0) {
719         ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory)
720             .getParameters().topicPassivationThreshold = topicPassivationThreshold;
721       }
722
723       if (topicActivationThreshold > 0) {
724         ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory)
725             .getParameters().topicActivationThreshold = topicActivationThreshold;
726       }
727
728       XAConnection JavaDoc cnx =
729         connectionFactory.createXAConnection(userName, password);
730
731       if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
732         AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
733                                       this + " endpointActivation cnx = " + cnx);
734
735       // Creating and registering a consumer instance for this endpoint.
736
InboundConsumer consumer =
737         new InboundConsumer(workManager,
738                             endpointFactory,
739                             cnx,
740                             dest,
741                             specImpl.getMessageSelector(),
742                             durable,
743                             specImpl.getSubscriptionName(),
744                             transacted,
745                             maxWorks,
746                             maxMessages,
747                             ackMode,
748                             deleteDurableSubscription);
749
750       consumers.put(specImpl, consumer);
751     }
752     catch (javax.jms.JMSSecurityException JavaDoc exc) {
753       throw new SecurityException JavaDoc("Invalid user identification: " + exc);
754     }
755     catch (javax.jms.JMSException JavaDoc exc) {
756       throw new CommException JavaDoc("Could not connect to the JORAM server: "
757                               + exc);
758     }
759     catch (AdminException exc) {
760       throw new ResourceException JavaDoc("Problem when handling the JORAM "
761                                   + "destinations: " + exc);
762     }
763   }
764
765   /**
766    * Notifies the adapter to deactivate message delivery for a given endpoint.
767    */

768   public void endpointDeactivation(MessageEndpointFactory JavaDoc endpointFactory,
769                                    ActivationSpec JavaDoc spec) {
770     if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
771       AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
772                                     this + " endpointDeactivation(" + endpointFactory + ", " + spec + ")");
773    &