KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > server > MessageServer


1 package com.ubermq.jms.server;
2 import java.io.*;
3 import java.lang.reflect.*;
4 import java.net.*;
5 import java.nio.channels.*;
6 import java.util.*;
7
8 import org.apache.log4j.*;
9
10 import com.ubermq.jms.client.*;
11 import com.ubermq.jms.client.unicast.*;
12 import com.ubermq.jms.common.datagram.*;
13 import com.ubermq.jms.common.datagram.impl.*;
14 import com.ubermq.jms.server.cluster.*;
15 import com.ubermq.jms.server.journal.*;
16 import com.ubermq.jms.server.journal.impl.*;
17 import com.ubermq.jms.server.proc.*;
18 import com.ubermq.jms.server.ssl.*;
19 import com.ubermq.kernel.*;
20
21 /**
22  * An executable class that provides typical message server
23  * functionality for TCP connections. This includes
24  * listening on a port, accepting new connections,
25  * processing I/O for those connections, determining
26  * which connections should receive messages, outputting
27  * messages to receipient connections, managing unacknowledged
28  * messages, providing durable subscriber and message selector
29  * functionality, and managing overflow conditions.
30  * <P>
31  * The message server also allows direct intra-process connections
32  * via Pipe objects. This functionality is provided by the
33  * <code>connectPipes</code> method. This can significantly
34  * increase performance of JMS clients that run in the same
35  * JVM as the messaging server.
36  * <P>
37  * A message server hub-and-spoke topology is very common
38  * and applicable for many enterprise messaging
39  * problems.
40  */

41 public class MessageServer extends KernelBasedServer implements Runnable JavaDoc, PipeEndpoint
42 {
43     private static final Logger log = Logger.getLogger(MessageServer.class);
44     private boolean started = false;
45
46     // datagram processing & protocls
47
private IMessageProcessor datagramProcessor;
48     private Set protocols = new LinkedHashSet();
49
50     // connection and I/O management
51
private ReadWriteTransformThread[] read, write;
52
53     private static final String JavaDoc DEFAULT_RW_THREAD_COUNT =
54         String.valueOf(2 * Runtime.getRuntime().availableProcessors());
55     private static final int RW_THREAD_COUNT =
56         Integer
57             .valueOf(Configurator.getProperty(ServerConfig.RW_THREAD_COUNT, DEFAULT_RW_THREAD_COUNT))
58             .intValue();
59
60     // datagram factory stuff.
61
private static final String JavaDoc DATAGRAM_FACTORY_CLASS =
62         Configurator.getProperty(ServerConfig.DATAGRAM_FACTORY_CLASS, ServerDatagramFactory.class.getName());
63     private static final String JavaDoc DATAGRAM_INSTANCE_METHOD = "getInstance";
64
65     // clustering.
66
private ClusterMembership clusterMembership;
67
68     public MessageServer(String JavaDoc[] args)
69     {
70         super(args.length > 0 ? args[0] : null);
71     }
72
73     public MessageServer(Properties props)
74     {
75         super(props);
76     }
77
78     protected void init()
79     {
80         try
81         {
82             // open journal
83
ISettingsRepository fj = new BinarySettingsRepository();
84
85             // start read/write threads
86
read = new ReadWriteTransformThread[RW_THREAD_COUNT];
87             write = new ReadWriteTransformThread[RW_THREAD_COUNT];
88             log.debug("Creating read/write pool of size " + RW_THREAD_COUNT);
89             for (int i = 0; i < RW_THREAD_COUNT; i++)
90             {
91                 read[i] = new ReadWriteTransformThread(SelectionKey.OP_READ);
92                 read[i].start();
93
94                 write[i] = new ReadWriteTransformThread(SelectionKey.OP_WRITE);
95                 write[i].start();
96             }
97
98             // create single datagram processor
99
this.datagramProcessor = createDatagramProc(fj);
100
101             // if so configured, we join a cluster.
102
if (Boolean
103                 .valueOf(Configurator.getProperty(ServerConfig.CLUSTER_ENABLE, "false"))
104                 .booleanValue())
105             {
106                 try
107                 {
108                     Class JavaDoc clusterImpl =
109                         Class.forName(
110                             Configurator.getProperty(
111                                 ServerConfig.CLUSTER_IMPLEMENTATION,
112                                 JGroupsClusterMembership.class.getName()));
113                     this.clusterMembership = (ClusterMembership)clusterImpl.newInstance();
114
115                     // join please!
116
clusterMembership.join(new PipeConnectionFactory(this));
117                 }
118                 catch (ClassCastException JavaDoc cce)
119                 {
120                     log.fatal(
121                         "Your cluster provider must implement " + ClusterMembership.class.getName(),
122                         cce);
123                 }
124             }
125
126         }
127         catch (Exception JavaDoc x)
128         {
129             log.fatal("Could not initialize the server", x);
130             throw new IllegalStateException JavaDoc(x.getMessage());
131         }
132     }
133
134     /**
135      * Resets the list of protocols to an empty set.<P>
136      *
137      * @since 2.1
138      */

139     public void resetProtocols()
140     {
141         protocols.clear();
142     }
143
144     /**
145      * Adds a protocol to the message server. This method must only be
146      * used before the <code>run</code> method is invoked. <P>
147      *
148      * @since 2.1
149      */

150     public void add(Protocol p)
151     {
152         protocols.add(p);
153     }
154
155     /**
156      * Adds the standard protocols. This includes the default UberMQ protocol,
157      * the Secure UberMQ protocol (if enabled) and the admin protocol.<P>
158      *
159      * @since 2.1
160      */

161     public void addStandardProtocols()
162     {
163         // do it.
164
add(new DefaultProtocol(getDatagramFactory(), DefaultProtocol.getConfiguredBindAddress()));
165         add(new AdminProtocol());
166         add(new SSLProtocol(getDatagramFactory(), this, SSLProtocol.getConfiguredBindAddress()));
167     }
168
169     /**
170      * Obtains a Set of <code>Protocol</code> objects describing the protocols that
171      * are active for this server.
172      * @return a Set of Protocol objects.
173      */

174     public Set getProtocols()
175     {
176         return protocols;
177     }
178
179     /**
180      * Executes the message server. This message server potentially creates an
181      * acceptor thread, if so configured, and one or more I/O threads to perform
182      * channel based I/O on behalf of connected clients.
183      */

184     protected URI exec()
185     {
186         URI serviceURI = null;
187
188         try
189         {
190             // go through registered protocols
191
Iterator iter = protocols.iterator();
192             while (iter.hasNext())
193             {
194                 Protocol p = (Protocol)iter.next();
195                 if (p.isEnabled())
196                 {
197                     p.start(datagramProcessor, new IConnectionInfo.ConnectionAcceptor()
198                     {
199                         public void acceptIncomingConnection(IConnectionInfo incoming)
200                         {
201                             int i = new Random().nextInt(read.length);
202                             write[i].register((ConnectionInfo)incoming, true);
203                             read[i].register((ConnectionInfo)incoming, true);
204                         }
205                     });
206                     log.info("Protocol " + p.toString() + " started");
207
208                     if (serviceURI == null)
209                         serviceURI = p.getServiceURI();
210                 }
211             }
212
213             // we're done.
214
started = true;
215         }
216         catch (Exception JavaDoc x)
217         {
218             log.fatal("Initialization failed", x);
219         }
220
221         // set server name and return URi
222
if (datagramProcessor instanceof DatagramProc && serviceURI != null)
223         {
224             ((DatagramProc)datagramProcessor).setServerName(serviceURI.toString());
225         }
226         return serviceURI;
227     }
228
229     /**
230      * Stops the server. This shuts down all I/O threads asynchronously, and waits
231      * for the listener thread to shutdown.
232      */

233     public void stop() throws InterruptedException JavaDoc
234     {
235         for (int i = 0; i < read.length; i++)
236         {
237             read[i].interrupt();
238         }
239         for (int i = 0; i < write.length; i++)
240         {
241             write[i].interrupt();
242         }
243
244         // stop all protocols
245
Iterator iter = protocols.iterator();
246         while (iter.hasNext())
247         {
248             Protocol p = (Protocol)iter.next();
249             p.stop();
250         }
251     }
252
253     protected IMessageProcessor createDatagramProc(ISettingsRepository settings) throws IOException
254     {
255         DatagramProc dp = new DatagramProc(settings, getDatagramFactoryHolder());
256         return dp;
257     }
258
259     /**
260      * Returns the server's datagram processor.
261      *
262      * @return an IMessageProcessor
263      */

264     protected IMessageProcessor getDatagramProc()
265     {
266         return datagramProcessor;
267     }
268
269     /**
270      * Returns the datagram factory holder to be used by this
271      * instance of the message server. The Datagram Factory Holder
272      * contains instances of all datagram factories required for
273      * normal operation.<p>
274      *
275      * The default will call <code>getDatagramFactory</code> and
276      * use that as the wire protocol. Actual datagram instances
277      * will be created by the default <code>DatagramFactory</code>
278      * implementation.<p>
279      *
280      * @see com.ubermq.jms.common.datagram.impl.DatagramFactory
281      * @return a datagram factory holder
282      */

283     protected DatagramFactoryHolder getDatagramFactoryHolder()
284     {
285         return new DatagramFactoryHolder(getDatagramFactory(), ServerDatagramFactory.getInstance());
286     }
287
288     /**
289      * Returns the datagram factory holder that clients should
290      * use to connect to this server. This may be different
291      * than the holder returned by the protected <code>getDatagramFactoryHolder</code>
292      * method.
293      *
294      * @return a DatagramFactoryHolder that can be used by clients.
295      */

296     public DatagramFactoryHolder getClientDatagramFactoryHolder()
297     {
298         return DatagramFactory.getHolder();
299     }
300
301     /**
302      * If this server is a part of a cluster, this returns the cluster
303      * membership that represents the cluster endpoint. This can be used to
304      * change the state of the server in the cluster. If the server is not part
305      * of a cluster, null is returned.
306      *
307      * @return a ClusterMembership if the server is part of a cluster, or null otherwise.
308      */

309     protected ClusterMembership getClusterMembership()
310     {
311         return clusterMembership;
312     }
313
314     /**
315      * Pipes are first class constructs in this message server implementation.
316      * A pipe connection factory can call this method on an in-process server
317      * and directly connect to the server, and receive datagrams.
318      * <P>
319      * This method creates a pipe and registers it with the server's I/O
320      * process and internal datagram processor. The caller is responsible
321      * for registering a client-side message handler and servicing the Pipe
322      * from an I/O standpoint.
323      * <P>
324      * @param upstream the Upstream pipe, representing data flowing from
325      * the client to the server.
326      * @param downstream the Downstream pipe, representing data flowing from
327      * the server to the client.
328      * @throws IOException if the pipes are not valid.
329      */

330     public PipeConnectionInfo connectPipes(Pipe upstream, Pipe downstream, IDatagramFactory df) throws java.io.IOException JavaDoc
331     {
332         return doConnectPipes(upstream, downstream, df, datagramProcessor);
333     }
334
335     protected PipeConnectionInfo doConnectPipes(
336         Pipe upstream,
337         Pipe downstream,
338         IDatagramFactory df,
339         IMessageProcessor dp)
340         throws IOException
341     {
342         // create a connection info that represents the server
343
PipeConnectionInfo ci = new PipeConnectionInfo(upstream.source(), downstream.sink(), df, dp);
344         dp.accept(ci);
345         write[0].register(ci, true);
346         read[0].register(ci, true);
347         return ci;
348     }
349
350     /**
351      * Runs the UberMQ JMS server.<P>
352      * 1. attempts to make an entry into a high availability cluster.<br>
353      * 2. recovers from a checkpointed state.<br>
354      * 3. performs I/O.<P>
355      * The I/O Theory is that we read in information
356      * i.e. messages that are then Transformed
357      * into outgoing destination lists
358      * or control metadata.
359      */

360     public static void main(String JavaDoc[] args)
361     {
362         final MessageServer s = new MessageServer(args);
363         s.addStandardProtocols();
364
365         // run it
366
s.run();
367         log.info(
368             "UberMQ "
369                 + com.ubermq.jms.client.impl.Connection.UBERMQ_PROVIDER_VERSION
370                 + " running at "
371                 + s.getServiceUrl());
372     }
373
374     /**
375      * Returns the datagram factory to be used to interpret byte streams
376      * coming from clients. The default logic looks at the properties file
377      * to determine if a new factory has been indicated at runtime. If the
378      * creation fails, the default factory is used.
379      * <P>
380      * This can be overridden by subclasses to provide a different wire protocol.
381      *
382      * @return a class that implements IDatagramFactory and IAckDatagramFactory
383      */

384     IDatagramFactory getDatagramFactory()
385     {
386         Class JavaDoc clazz = null;
387         try
388         {
389             clazz = Class.forName(DATAGRAM_FACTORY_CLASS);
390             return (IDatagramFactory)clazz.newInstance();
391         }
392         catch (ClassNotFoundException JavaDoc e)
393         {
394             // fall through
395
}
396         catch (ClassCastException JavaDoc e)
397         {
398             // fall through
399
}
400         catch (Exception JavaDoc e)
401         {
402             try
403             {
404                 Method getInstance = clazz.getMethod(DATAGRAM_INSTANCE_METHOD, null);
405                 return (IDatagramFactory)getInstance.invoke(null, null);
406             }
407             catch (Exception JavaDoc e2)
408             {
409                 // fall through
410
log.error("", e2);
411             }
412         }
413
414         return DatagramFactory.getInstance();
415     }
416 }
417
Popular Tags