KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > JmsServer


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  *
19  * Copyright (c) 2001, 2002 Dan Greff
20  */

21 package com.presumo.jms;
22
23 import com.presumo.util.config.Configuration;
24 import com.presumo.util.config.Preferences;
25 import com.presumo.jms.persistence.PersistentQueue;
26 import com.presumo.jms.plugin.implementation.MemoryMessageQueue;
27 import com.presumo.jms.plugin.MessageQueue;
28 import com.presumo.jms.plugin.transport.ServerTransport;
29 import com.presumo.jms.plugin.transport.Transport;
30 import com.presumo.jms.resources.Resources;
31 import com.presumo.jms.router.RemoteSession;
32 import com.presumo.jms.router.ConnectionListener;
33 import com.presumo.jms.router.Router;
34
35 import com.presumo.util.log.Logger;
36 import com.presumo.util.log.LoggerFactory;
37
38 import java.io.BufferedReader JavaDoc;
39 import java.io.InputStreamReader JavaDoc;
40 import java.io.IOException JavaDoc;
41 import java.io.File JavaDoc;
42 import java.io.FileInputStream JavaDoc;
43
44 import java.util.HashMap JavaDoc;
45 import java.util.Map JavaDoc;
46 import java.util.Set JavaDoc;
47 import java.util.StringTokenizer JavaDoc;
48 import java.util.Properties JavaDoc;
49
50 import javax.jms.JMSException JavaDoc;
51
52 /**
53  * <p>
54  * Encapsulation of a Presumo JMS server. Any application may start
55  * a JMS server within their application with the following steps.
56  * <ol>
57  * <li>Instantiate the server with the constructor that is appropriate
58  * to your application's needs.
59  * <li>Start the server with <code>startup()</code>
60  * <li>Start every type of server listener that is needed. To start
61  * a tcp socket listener on <code>127.0.0.1:2323</code> call:
62  * <ul><li>
63  * <code>startServerTransport("tcp://127.0.0.1:2323")</code>.
64  * </li></ul>
65  * <li>Connect the server to other servers if needed. To connect
66  * to a server at 192.168.1.1 running a TCP listener on port 2323
67  * you would call:
68  * <ul><li>
69  * <code>startClientTransport("tcp://192.168.1.1:2323")</code>
70  * </li></ul>
71  * You may connect to multiple servers.
72  * </ol>
73  * <p>
74  * Currently, only TCP/IP connections are supported.
75  * <p>
76  * If you want to use Presumo for intra-JVM communication you do not
77  * need to create an instance of this class. All message consumers
78  * and producers created from the same <code>javax.jms.Connection</code>
79  * can communicate with each other without a server. Only producers
80  * and consumers created from different <code>javax.jms.Connection's</code>
81  * will need a server to communicate.
82  * </p>
83  *
84  * @author Dan Greff
85  */

86 public class JmsServer
87 {
88
89   /**
90    * The directory to store messages queues which will contain persistent
91    * messages. If set to <code>null</code> the JmsServer will not
92    * write anything to disk and persistent messages will not be much more
93    * reliable than non-persistent messages.
94    */

95   protected final String JavaDoc persistentDir;
96
97   /**
98    * If using persistent messages, the files created by Presumo will have
99    * this prefix.
100    */

101   protected final String JavaDoc persistentPrefix;
102
103   /**
104    * Once the persistent queue size on disk (in bytes) is greater than
105    * this value it will be compressed and internal fragmentation caused
106    * by deleted messages will be removed.
107    */

108   protected final int persistentLogSize;
109
110   private final Logger logger;
111   private Map JavaDoc serverMap;
112   private Map JavaDoc clientMap;
113   private Router router;
114
115     /////////////////////////////////////////////////////////////////////////
116
// Constructors //
117
/////////////////////////////////////////////////////////////////////////
118

119   /**
120    * Create a server with no mechanisms to transactionally store messages
121    * on the disk. All persistent and non-persistent messages will be kept
122    * in memory. You should only use the server if you are not using
123    * persistent messages.
124    */

125   public JmsServer()
126   {
127     this(null);
128   }
129
130   /**
131    * Create a server which will create transactional queues for the
132    * storage of persistent messages in the given directory. The
133    * directory must be ths same between application reboots. It
134    * is erroneous to use a temporary directory since the queues
135    * must servive a reboot.
136    *
137    * @param persistentDir Directory which persistent queues will be
138    * written to. Application must have write permissions.
139    */

140   public JmsServer(String JavaDoc persistentDir)
141   {
142     this(persistentDir, "PresumoJms");
143   }
144
145
146
147   /**
148    * Create a server which will create transactional queues for the
149    * storage of persistent messages in the given directory. The
150    * directory must be ths same between application reboots. It
151    * is erroneous to use a temporary directory since the queues
152    * must servive a reboot.
153    * <p>
154    * You may change the prefix of all files created by Presumo
155    * using this constructor. The default value is "PresumoJms".
156    * <p>
157    * The prefix must be the same across reboots and cannot change.
158    * If you change the prefix name, all stored messages using
159    * the previous prefix name will be lost.
160    * </p>
161    *
162    * @param persistentDir Directory which persistent queues will be
163    * written to. Application must have write permissions.
164    * @param persistentPrefix Prefix to use for all persistent filenames.
165    */

166   public JmsServer(String JavaDoc persistentDir,
167                    String JavaDoc persistentPrefix)
168   {
169     this(persistentDir, persistentPrefix, 100000);
170   }
171
172
173   /**
174    * Create a server which will create transactional queues for the
175    * storage of persistent messages in the given directory. The
176    * directory must be ths same between application reboots. It
177    * is erroneous to use a temporary directory since the queues
178    * must servive a reboot.
179    * <p>
180    * You may change the prefix of all files created by Presumo
181    * using this constructor. The default value is "PresumoJms".
182    * <p>
183    * The prefix must be the same across reboots and cannot change.
184    * If you change the prefix name, all stored messages using
185    * the previous prefix name will be lost.
186    * <p>
187    * You may also specify the max size in bytes the log file size
188    * will get before the persistent mechanism resolves the contents
189    * to a more permanent file.
190    * </p>
191    * The default value is 100,000 bytes which is a good number for
192    * small messages (less than 1K) but might need to be larger
193    * for larger messages.
194    *
195    * @param persistentDir Directory which persistent queues will be
196    * written to. Application must have write permissions.
197    * @param persistentPrefix Prefix to use for all persistent filenames.
198    * @param persistentLogSize Max size in bytes of the persistent log file.
199    */

200   public JmsServer(String JavaDoc persistentDir,
201                    String JavaDoc persistentPrefix,
202                    int persistentLogSize)
203   {
204     this.persistentDir = persistentDir;
205     this.persistentPrefix = persistentPrefix;
206     this.persistentLogSize = persistentLogSize;
207
208     serverMap = new HashMap JavaDoc();
209     clientMap = new HashMap JavaDoc();
210     logger = LoggerFactory.getLogger(JmsServer.class, Resources.getBundle());
211   }
212
213
214     ///////////////////////////////////////////////////////////////////////////
215
// Public Methods //
216
///////////////////////////////////////////////////////////////////////////
217

218
219   /**
220    * Starts the server, but you must specifically create server listners
221    * or connections to other servers via <code>startServerTransport()</code>
222    * and <code>startClientTransport()</code>.
223    *
224    * @see startServerTransport
225    * @see startClientTransport
226    */

227   public synchronized void startup() throws JMSException JavaDoc
228   {
229     logger.entry("startup");
230
231     if (router == null) {
232       try {
233         MessageQueue queue = null;
234         
235         if (persistentDir == null) {
236           queue = new MemoryMessageQueue();
237         } else {
238           File JavaDoc dir = new File JavaDoc(persistentDir);
239           PersistentQueue pqueue =
240             new PersistentQueue(dir, persistentPrefix, persistentLogSize);
241           pqueue.open();
242           queue = pqueue;
243         }
244     
245         router = new Router(queue);
246
247       } catch (IOException JavaDoc ioe) {
248         JMSException JavaDoc jmsex = new JMSException JavaDoc("Unable to initialize queue");
249         jmsex.setLinkedException(ioe);
250         throw jmsex;
251       }
252     }
253
254     logger.exit("startup");
255   }
256   
257   /**
258    * Stops the server. The server will disconnect with all other servers it
259    * is connected to and will disconnect all clients.
260    */

261   public synchronized void shutdown()
262   {
263     logger.entry("shutdown");
264     
265     // Clone the information to an array, since the connection information
266
// structures will change as we are shutting down.
267
Set JavaDoc serverSet = serverMap.keySet();
268     Set JavaDoc clientSet = clientMap.keySet();
269
270     String JavaDoc [] servers = new String JavaDoc[serverSet.size()];
271     String JavaDoc [] clients = new String JavaDoc[clientSet.size()];
272
273     serverSet.toArray(servers);
274     clientSet.toArray(clients);
275
276     int i;
277     for (i=0; i < servers.length; ++i) {
278       stopServerTransport(servers[i]);
279     }
280
281     for (i=0; i < clients.length; ++i) {
282       stopClientTransport(clients[i]);
283     }
284
285     router.closeRouter();
286     router = null;
287
288     logger.exit("shutdown");
289   }
290
291
292   /**
293    * Start a built-in server transport mechanism to listen for and
294    * accept connections from client transport mechanismism of the
295    * same protocal.
296    * </p>
297    * <p>
298    * The format of the passed in <code>url</code> should be:
299    * </p>
300    * <code>protocal://inetaddress:port</code>
301    * <ul>
302    * <li><b>protocal</b> - Identifier for built-in protocal.
303    * <li><b>inetaddress</b> - Address to run the protocal on. The loopback
304    * address is usually correct here (i.e. 127.0.0.1 or localhost). The one
305    * exception is a multi-homed machine.
306    * <li><b>port</b> - Port the protocal should listen on.
307    * </ul>
308    * </p>
309    * Currently the only built-in protocal is "tcp" for a TCP/IP
310    * implementation.
311    *
312    * You must start the server before creating connections to it.
313    *
314    * @see startup
315    */

316   public synchronized void startServerTransport(String JavaDoc url) throws JMSException JavaDoc
317   {
318     logger.entry("startServerTransport", url);
319
320     // Clean up when there are more supported protocals
321
if (! url.toLowerCase().startsWith("tcp")) {
322       throw new JMSException JavaDoc(Resources.getResourceString("PJMSE0004", url));
323     }
324     else if (serverMap.containsKey(url)) {
325       throw new JMSException JavaDoc(Resources.getResourceString("PJMSE0002", url));
326     }
327     else {
328       ServerTransport transport = new
329         com.presumo.jms.plugin.implementation.transport.tcp.ServerTransportImpl();
330       transport.setURL(url);
331       transport.setRouter(router);
332       transport.start();
333       serverMap.put(url, transport);
334     }
335
336     logger.exit("startServerTransport");
337   }
338
339   /**
340    * Stop the given server transport.
341    */

342   public synchronized void stopServerTransport(String JavaDoc url)
343   {
344     logger.entry("stopServerTransport", url);
345     
346     ServerTransport transport = (ServerTransport) serverMap.get(url);
347     
348     if (transport != null) {
349       transport.close();
350       serverMap.remove(url);
351     }
352     
353     logger.exit("stopServerTransport");
354   }
355   
356
357   /**
358    * Start a client transport to connect to another server.
359    */

360   public synchronized void startClientTransport(String JavaDoc url)
361     throws JMSException JavaDoc
362   {
363     logger.entry("startClientTransport", url);
364
365     // Clean up when there are more supported protocals
366
if (! url.toLowerCase().startsWith("tcp")) {
367       throw new JMSException JavaDoc(Resources.getResourceString("PJMSE0004", url));
368     }
369     else if (clientMap.containsKey(url)) {
370       throw new JMSException JavaDoc(Resources.getResourceString("PJMSE0002", url));
371     }
372     else {
373
374       String JavaDoc host = getHost(url);
375       int port = getPort(url);
376
377       try {
378         Transport transport = new
379           com.presumo.jms.plugin.implementation.transport.tcp.TransportImpl(host, port);
380         ConnectionListener cl = new ConnectionListener() {
381             public void connectionLost(RemoteSession session) {
382               // todo:: handle lost connections gracefully
383
}
384           };
385         RemoteSession session = new RemoteSession(router, transport, cl);
386         session.start();
387         
388         clientMap.put(url, transport);
389       } catch (java.io.IOException JavaDoc ioe) {
390         JMSException JavaDoc jmsex = new JMSException JavaDoc("Unable to connect to server " + host+":"+port);
391         jmsex.setLinkedException(ioe);
392         throw jmsex;
393       }
394     }
395
396     logger.exit("startClientTransport");
397   }
398
399   /**
400    * Stop a client transpot connected to another server.
401    */

402   public void stopClientTransport(String JavaDoc url)
403   {
404     logger.entry("stopClientTransport", url);
405
406     Transport transport = (Transport) clientMap.get(url);
407     
408     if (transport != null) {
409       transport.close();
410       clientMap.remove(url);
411     }
412
413     logger.exit("stopClientTransport");
414   }
415
416
417     ///////////////////////////////////////////////////////////////////////////
418
// Protected Methods //
419
///////////////////////////////////////////////////////////////////////////
420

421   /**
422    * Augment the basic functionality by shutting down the server and
423    * releasing all resources before being garbage collected.
424    */

425   protected void finalize() throws Throwable JavaDoc
426   {
427     shutdown();
428     super.finalize();
429   }
430
431     ///////////////////////////////////////////////////////////////////////////
432
// Private Methods //
433
///////////////////////////////////////////////////////////////////////////
434

435   private String JavaDoc getHost(String JavaDoc url) throws JMSException JavaDoc
436   {
437     logger.entry("getHost", url);
438     
439     int loc = url.lastIndexOf('/');
440     int loc2 = url.lastIndexOf(':');
441     
442     if (loc == -1 || loc2 == -1) {
443       throw new JMSException JavaDoc("Malformed URL: " + url);
444     }
445
446     String JavaDoc retval = url.substring(loc+1, loc2);
447     logger.exit("getHost", retval);
448     return retval;
449   }
450
451   private int getPort(String JavaDoc url) throws JMSException JavaDoc
452   {
453     logger.entry("getPort", url);
454
455     int loc = url.lastIndexOf(':');
456     if (loc == -1) {
457       throw new JMSException JavaDoc("Malformed URL: " + url);
458     }
459     
460     int retval = 0;
461     try {
462       retval = Integer.parseInt(url.substring(loc+1));
463     } catch (NumberFormatException JavaDoc nfe) {
464       JMSException JavaDoc jmsex = new JMSException JavaDoc("Malformed URL: " + url);
465       jmsex.setLinkedException(nfe);
466       throw jmsex;
467     }
468     
469     logger.exit("getPort", new Integer JavaDoc(retval));
470     return retval;
471   }
472     
473
474     ///////////////////////////////////////////////////////////////////////////
475
// Static Methods //
476
///////////////////////////////////////////////////////////////////////////
477

478   /**
479    * Used to invoke a JVM with just the Presumo JMS server running within it.
480    * <ul><li>
481    * Usage: java com.presumo.jms.JmsServer [config.properties]
482    * </li></ul>
483    * Where config.properties is the filename of a property file containing
484    * configuration for the server. If the property file is not given the
485    * server will attempt to start on localhost:2323 with no persistent
486    * mechanisms.
487    */

488   public static void main(String JavaDoc [] args) throws Exception JavaDoc
489   {
490     if (args.length > 1) {
491       System.err.println("Usage: JmsServer <server.properties>");
492       System.exit(-1);
493     }
494
495     try {
496       //
497
// Read in the test properties
498
//
499
Properties JavaDoc props = new Properties JavaDoc();
500       if (args.length == 1) {
501         FileInputStream JavaDoc fis = null;
502         try {
503           fis = new FileInputStream JavaDoc(args[0]);
504           props.load(fis);
505         } finally {
506           if (fis != null) fis.close();
507         }
508       }
509
510       String JavaDoc persistentDir = props.getProperty("PersistentDirectory");
511       String JavaDoc serverTransports = props.getProperty("ServerTransports",
512                                                   "tcp://localhost:2323");
513       String JavaDoc clientTransports = props.getProperty("ClientTransports");
514
515
516       // Start the server
517
//
518
JmsServer server = new JmsServer(persistentDir);
519       server.startup();
520       
521       if (serverTransports != null && serverTransports.length() > 0) {
522         StringTokenizer JavaDoc tokens =
523           new StringTokenizer JavaDoc(serverTransports.trim(), ";");
524         while(tokens.hasMoreTokens()) {
525           String JavaDoc serverTransport = tokens.nextToken().trim();
526           server.startServerTransport(serverTransport);
527         }
528       }
529       
530       if (clientTransports != null && clientTransports.length() > 0) {
531         StringTokenizer JavaDoc tokens =
532           new StringTokenizer JavaDoc(clientTransports.trim(), ";");
533         while(tokens.hasMoreTokens()) {
534           String JavaDoc clientTransport = tokens.nextToken().trim();
535           server.startClientTransport(clientTransport);
536         }
537       }
538       
539       System.out.println(Resources.getResourceString("STARTUP_COMPLETE")+"\n");
540
541       
542       // Wait for the user to request a shutdown.
543
//
544
BufferedReader JavaDoc input = new BufferedReader JavaDoc(new InputStreamReader JavaDoc(System.in));
545       String JavaDoc choice = "";
546       while(! choice.toLowerCase().equals("exit")) {
547         System.out.println(Resources.getResourceString("SHUTDOWN_INSTRUCTIONS"));
548         choice = input.readLine();
549       }
550     
551       // Shutdown the server
552
//
553
server.shutdown();
554       System.out.println(Resources.getResourceString("SHUTDOWN_COMPLETE"));
555
556     } catch (Throwable JavaDoc t) {
557       System.err.println("The following error occured while starting the server:");
558       t.printStackTrace();
559       System.exit(-1);
560     }
561
562   }
563
564 }
565
Popular Tags