KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > il > oil > OILServerILService


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.mq.il.oil;
8
9 import java.io.BufferedInputStream JavaDoc;
10 import java.io.BufferedOutputStream JavaDoc;
11 import java.io.EOFException JavaDoc;
12 import java.io.IOException JavaDoc;
13 import java.io.ObjectInputStream JavaDoc;
14 import java.io.ObjectOutputStream JavaDoc;
15 import java.lang.reflect.Method JavaDoc;
16 import java.net.InetAddress JavaDoc;
17 import java.net.ServerSocket JavaDoc;
18 import java.net.Socket JavaDoc;
19 import java.net.SocketException JavaDoc;
20 import java.net.UnknownHostException JavaDoc;
21 import java.rmi.RemoteException JavaDoc;
22 import java.util.Properties JavaDoc;
23
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.naming.InitialContext JavaDoc;
27 import javax.net.ServerSocketFactory;
28
29 import org.jboss.logging.Logger;
30 import org.jboss.mq.AcknowledgementRequest;
31 import org.jboss.mq.ConnectionToken;
32 import org.jboss.mq.DurableSubscriptionID;
33 import org.jboss.mq.SpyDestination;
34 import org.jboss.mq.SpyMessage;
35 import org.jboss.mq.Subscription;
36 import org.jboss.mq.TransactionRequest;
37 import org.jboss.mq.il.Invoker;
38 import org.jboss.mq.il.ServerIL;
39 import org.jboss.security.SecurityDomain;
40 import org.jboss.system.server.ServerConfigUtil;
41
42 /**
43  * Implements the ServerILJMXService which is used to manage the JVM IL.
44  *
45  * @author Hiram Chirino (Cojonudo14@hotmail.com)
46  * @version $Revision: 1.30 $
47  *
48  * @jmx:mbean extends="org.jboss.mq.il.ServerILJMXServiceMBean"
49  */

50 public final class OILServerILService
51    extends org.jboss.mq.il.ServerILJMXService
52    implements java.lang.Runnable JavaDoc, OILServerILServiceMBean
53 {
54    /**
55     * logger instance.
56     */

57    final static private Logger log = Logger.getLogger(OILServerILService.class);
58
59    /**
60     * The default timeout for the server socket. This is
61     * set so the socket will periodically return to check
62     * the running flag.
63     */

64    private final static int SO_TIMEOUT = 5000;
65
66    /**
67     * The JMS server where requests are forwarded to.
68     */

69    //private static JMSServerInvoker server;
70
private Invoker server;
71
72    /**
73     * If the TcpNoDelay option should be used on the socket.
74     */

75    private boolean enableTcpNoDelay = false;
76
77    /**
78     * The read timeout
79     */

80    private int readTimeout = 0;
81
82    /** The security domain name to use with SSL aware socket factories.
83     */

84    private String JavaDoc securityDomain;
85
86    /* The javax.net.SocketFactory implementation class to use on the client.
87     */

88    private String JavaDoc clientSocketFactoryName;
89    /** The socket factory used to obtain the server socket.
90     */

91    private ServerSocketFactory serverSocketFactory;
92    /**
93     * The listening socket that receives incomming connections
94     * for servicing.
95     */

96    private ServerSocket JavaDoc serverSocket;
97
98    /**
99     * The managed serverIL.
100     */

101    private OILServerIL serverIL;
102
103    /**
104     * The running flag that all worker and server
105     * threads check to determine if the service should
106     * be stopped.
107     */

108    private volatile boolean running;
109
110    /**
111     * The server port to bind to.
112     */

113    private int serverBindPort = 0;
114
115    /**
116     * The internet address to bind to by
117     * default.
118     */

119    private InetAddress JavaDoc bindAddress = null;
120
121    /**
122     * Number of OIL Worker threads started.
123     */

124    private int threadNumber = 0;
125
126    /**
127     * The connection properties passed to the client to connect to this IL
128     */

129    private Properties JavaDoc connectionProperties;
130
131    /**
132     * This class is used to encapsulate the basic connection and
133     * work for a connected client thread. The run() method of this
134     * class processes requests and sends responses to and from a
135     * single client. All requests are forwarded to the outer class'
136     * JMS server instance.
137     *
138     * @author Brian Weaver (weave@opennms.org)
139     */

140    private final class Client implements Runnable JavaDoc
141    {
142       /**
143        * The TCP/IP socket for communications.
144        */

145       private Socket JavaDoc sock;
146
147       /**
148        * The object output stream running on top of the
149        * socket's output stream.
150        */

151       private ObjectOutputStream JavaDoc out;
152
153       /**
154        * The objec5t input stream running on top of the
155        * socket's input stream
156        */

157       private ObjectInputStream JavaDoc in;
158
159       /**
160        * Allocates a new runnable instance to process requests from
161        * the server's client and pass them to the JMS server. The
162        * passed socket is used for all communications between the
163        * service and the client.
164        *
165        * @param s The socket used for communications.
166        *
167        * @throws java.io.IOException Thrown if an I/O error occurs
168        * constructing the object streams.
169        */

170       Client(Socket JavaDoc s) throws IOException JavaDoc
171       {
172          this.sock = s;
173          this.out = new ObjectOutputStream JavaDoc(new BufferedOutputStream JavaDoc(this.sock.getOutputStream()));
174          this.out.flush();
175          this.in = new ObjectInputStream JavaDoc(new BufferedInputStream JavaDoc(this.sock.getInputStream()));
176          sock.setTcpNoDelay(enableTcpNoDelay);
177          if (log.isTraceEnabled())
178             log.trace("Setting TcpNoDelay Option to:" + enableTcpNoDelay);
179       }
180
181       /**
182        * The main threads processing routine. This loop processes
183        * requests from the server and sends the appropriate responses
184        * based upon the results.
185        */

186       public void run()
187       {
188          int code = 0;
189          boolean closed = false;
190          ConnectionToken connectionToken = null;
191
192          while (!closed && running)
193          {
194             try
195             {
196                // read in the next request/directive
197
// from the client
198
//
199
code = in.readByte();
200             }
201             catch (EOFException JavaDoc e)
202             {
203                // end of file, exit processing loop
204
//
205
break;
206             }
207             catch (IOException JavaDoc e)
208             {
209                if (closed || !running)
210                {
211                   // exit out of the loop if the connection
212
// is closed or the service is shutdown
213
//
214
break;
215                }
216                log.warn("Connection failure (1).", e);
217                break;
218             }
219
220             // now based upon the input directive, preform the
221
// requested action. Any exceptions are processed
222
// and potentially returned to the client.
223
//
224
try
225             {
226                Object JavaDoc result = null;
227
228                switch (code)
229                {
230                   case OILConstants.SET_SPY_DISTRIBUTED_CONNECTION :
231                      // assert connectionToken == null
232
connectionToken = (ConnectionToken) in.readObject();
233                      break;
234
235                   case OILConstants.ACKNOWLEDGE :
236                      AcknowledgementRequest ack = new AcknowledgementRequest();
237                      ack.readExternal(in);
238                      server.acknowledge(connectionToken, ack);
239                      break;
240
241                   case OILConstants.ADD_MESSAGE :
242                      server.addMessage(connectionToken, SpyMessage.readMessage(in));
243                      break;
244
245                   case OILConstants.BROWSE :
246                      result = server.browse(connectionToken, (Destination JavaDoc) in.readObject(), (String JavaDoc) in.readObject());
247                      break;
248
249                   case OILConstants.CHECK_ID :
250                      String JavaDoc ID = (String JavaDoc) in.readObject();
251                      server.checkID(ID);
252                      if (connectionToken != null)
253                         connectionToken.setClientID(ID);
254                      break;
255
256                   case OILConstants.CONNECTION_CLOSING :
257                      server.connectionClosing(connectionToken);
258                      closed = true;
259                      break;
260
261                   case OILConstants.CREATE_QUEUE :
262                      result = server.createQueue(connectionToken, (String JavaDoc) in.readObject());
263                      break;
264
265                   case OILConstants.CREATE_TOPIC :
266                      result = server.createTopic(connectionToken, (String JavaDoc) in.readObject());
267                      break;
268
269                   case OILConstants.DELETE_TEMPORARY_DESTINATION :
270                      server.deleteTemporaryDestination(connectionToken, (SpyDestination) in.readObject());
271                      break;
272
273                   case OILConstants.GET_ID :
274                      result = server.getID();
275                      if (connectionToken != null)
276                         connectionToken.setClientID((String JavaDoc) result);
277                      break;
278
279                   case OILConstants.GET_TEMPORARY_QUEUE :
280                      result = server.getTemporaryQueue(connectionToken);
281                      break;
282
283                   case OILConstants.GET_TEMPORARY_TOPIC :
284                      result = server.getTemporaryTopic(connectionToken);
285                      break;
286
287                   case OILConstants.RECEIVE :
288                      result = server.receive(connectionToken, in.readInt(), in.readLong());
289                      break;
290
291                   case OILConstants.SET_ENABLED :
292                      server.setEnabled(connectionToken, in.readBoolean());
293                      break;
294
295                   case OILConstants.SUBSCRIBE :
296                      server.subscribe(connectionToken, (Subscription) in.readObject());
297                      break;
298
299                   case OILConstants.TRANSACT :
300                      TransactionRequest trans = new TransactionRequest();
301                      trans.readExternal(in);
302                      server.transact(connectionToken, trans);
303                      break;
304
305                   case OILConstants.UNSUBSCRIBE :
306                      server.unsubscribe(connectionToken, in.readInt());
307                      break;
308
309                   case OILConstants.DESTROY_SUBSCRIPTION :
310                      server.destroySubscription(connectionToken, (DurableSubscriptionID) in.readObject());
311                      break;
312
313                   case OILConstants.CHECK_USER :
314                      result = server.checkUser((String JavaDoc) in.readObject(), (String JavaDoc) in.readObject());
315                      break;
316
317                   case OILConstants.PING :
318                      server.ping(connectionToken, in.readLong());
319                      break;
320
321                   case OILConstants.AUTHENTICATE :
322                      result = server.authenticate((String JavaDoc) in.readObject(), (String JavaDoc) in.readObject());
323                      break;
324
325                   default :
326                      throw new RemoteException JavaDoc("Bad method code !");
327                }
328
329                //Everthing was OK, otherwise
330
//an exception would have prevented us from getting
331
//to this point in the code ;-)
332
//
333
try
334                {
335                   if (result == null)
336                   {
337                      out.writeByte(OILConstants.SUCCESS);
338                   }
339                   else
340                   {
341                      out.writeByte(OILConstants.SUCCESS_OBJECT);
342                      out.writeObject(result);
343                      out.reset();
344                   }
345                   out.flush();
346                }
347                catch (IOException JavaDoc e)
348                {
349                   if (closed)
350                      break;
351
352                   log.warn("Connection failure (2).", e);
353                   break;
354                }
355
356             }
357             catch (Exception JavaDoc e)
358             {
359                // this processes any exceptions from the actually switch
360
// statement processing
361
//
362
if (closed)
363                   break;
364
365                log.warn("Client request resulted in a server exception: ", e);
366
367                try
368                {
369                   out.writeByte(OILConstants.EXCEPTION);
370                   out.writeObject(e);
371                   out.reset();
372                   out.flush();
373                }
374                catch (IOException JavaDoc e2)
375                {
376                   if (closed)
377                      break;
378
379                   log.warn("Connection failure (3).", e);
380                   break;
381                }
382             } // end catch of try { switch(code) ... }
383
} // end whlie loop
384

385          try
386          {
387             if (!closed)
388             {
389                try
390                {
391                   server.connectionClosing(connectionToken);
392                }
393                catch (JMSException JavaDoc e)
394                {
395                   // do nothing
396
}
397             }
398             in.close();
399             out.close();
400          }
401          catch (IOException JavaDoc e)
402          {
403             log.warn("Connection failure during connection close.", e);
404          }
405          finally
406          {
407             try
408             {
409                sock.close();
410             }
411             catch (IOException JavaDoc e)
412             {
413                log.warn("Connection failure during connection close.", e);
414             }
415          }
416       } // end run method
417
}
418
419    /**
420     * Used to construct the GenericConnectionFactory (bindJNDIReferences()
421     * builds it) Sets up the connection properties need by a client to use this
422     * IL
423     *
424     * @return The ClientConnectionProperties value
425     */

426    public java.util.Properties JavaDoc getClientConnectionProperties()
427    {
428       return connectionProperties;
429    }
430
431    /**
432     * Gives this JMX service a name.
433     *
434     * @return The Name value
435     */

436    public String JavaDoc getName()
437    {
438       return "JBossMQ-OILServerIL";
439    }
440
441    /**
442     * Used to construct the GenericConnectionFactory (bindJNDIReferences()
443     * builds it)
444     *
445     * @return The ServerIL value
446     * @returns ServerIL the instance of this IL
447     */

448    public ServerIL getServerIL()
449    {
450       return serverIL;
451    }
452
453    /**
454     * Main processing method for the OILServerILService object
455     */

456    public void run()
457    {
458       try
459       {
460          while (running)
461          {
462             Socket JavaDoc socket = null;
463             try
464             {
465                socket = serverSocket.accept();
466             }
467             catch (java.io.InterruptedIOException JavaDoc e)
468             {
469                continue;
470             }
471
472             // it's possible that the service is no longer
473
// running but it got a connection, no point in
474
// starting up a thread!
475
//
476
if (!running)
477             {
478                if (socket != null)
479                {
480                   try
481                   {
482                      socket.close();
483                   }
484                   catch (Exception JavaDoc e)
485                   {
486                      // do nothing
487
}
488                }
489                return;
490             }
491
492             try
493             {
494                socket.setSoTimeout(readTimeout);
495                new Thread JavaDoc(new Client(socket), "OIL Worker-" + threadNumber++).start();
496             }
497             catch (IOException JavaDoc ie)
498             {
499                if (log.isDebugEnabled())
500                {
501                   log.debug("IOException processing client connection", ie);
502                   log.debug("Dropping client connection, server will not terminate");
503                }
504             }
505          }
506       }
507       catch (SocketException JavaDoc e)
508       {
509          // There is no easy way (other than string comparison) to
510
// determine if the socket exception is caused by connection
511
// reset by peer. In this case, it's okay to ignore both
512
// SocketException and IOException.
513
if (running)
514             log.warn("SocketException occured (Connection reset by peer?). Cannot initialize the OILServerILService.");
515       }
516       catch (IOException JavaDoc e)
517       {
518          if (running)
519             log.warn("IOException occured. Cannot initialize the OILServerILService.", e);
520       }
521       catch (Throwable JavaDoc t)
522       {
523          log.warn("Unexpected error occured. Cannot initialize the OILServerILService.", t);
524       }
525       try
526       {
527          serverSocket.close();
528       }
529       catch (Exception JavaDoc e)
530       {
531          log.debug("error closing server socket", e);
532       }
533       return;
534    }
535
536    /**
537     * Starts this IL, and binds it to JNDI
538     *
539     * @exception Exception Description of Exception
540     */

541    public void startService() throws Exception JavaDoc
542    {
543       super.startService();
544
545       running = true;
546       this.server = lookupJMSServer();
547
548       // Use the default javax.net.ServerSocketFactory if none was set
549
if (serverSocketFactory == null)
550          serverSocketFactory = ServerSocketFactory.getDefault();
551
552       /* See if the server socket supports setSecurityDomain(SecurityDomain)
553       if an securityDomain was specified
554       */

555       if (securityDomain != null)
556       {
557          try
558          {
559             InitialContext JavaDoc ctx = new InitialContext JavaDoc();
560             Class JavaDoc ssfClass = serverSocketFactory.getClass();
561             SecurityDomain domain = (SecurityDomain) ctx.lookup(securityDomain);
562             Class JavaDoc[] parameterTypes = { SecurityDomain.class };
563             Method JavaDoc m = ssfClass.getMethod("setSecurityDomain", parameterTypes);
564             Object JavaDoc[] args = { domain };
565             m.invoke(serverSocketFactory, args);
566          }
567          catch (NoSuchMethodException JavaDoc e)
568          {
569             log.error("Socket factory does not support setSecurityDomain(SecurityDomain)");
570          }
571          catch (Exception JavaDoc e)
572          {
573             log.error("Failed to setSecurityDomain=" + securityDomain + " on socket factory");
574          }
575       }
576
577       // Create the server socket using the socket factory
578
serverSocket = serverSocketFactory.createServerSocket(serverBindPort, 50, bindAddress);
579       serverSocket.setSoTimeout(SO_TIMEOUT);
580
581       InetAddress JavaDoc socketAddress = serverSocket.getInetAddress();
582       if (log.isInfoEnabled())
583          log.info("JBossMQ OIL service available at : " + socketAddress + ":" + serverSocket.getLocalPort());
584
585       new Thread JavaDoc(server.getThreadGroup(), this, "OIL Worker Server").start();
586
587       /* We need to check the socketAddress against "0.0.0.0/0.0.0.0"
588          because this is not a valid address on Win32 while it is for
589          *NIX. See BugParade bug #4343286.
590          */

591       socketAddress = ServerConfigUtil.fixRemoteAddress(socketAddress);
592
593       serverIL = new OILServerIL(socketAddress, serverSocket.getLocalPort(),
594          clientSocketFactoryName, enableTcpNoDelay);
595
596       // Initialize the connection poperties using the base class.
597
connectionProperties = super.getClientConnectionProperties();
598       connectionProperties.setProperty(
599          OILServerILFactory.CLIENT_IL_SERVICE_KEY,
600          "org.jboss.mq.il.oil.OILClientILService");
601       connectionProperties.setProperty(OILServerILFactory.OIL_PORT_KEY, "" + serverSocket.getLocalPort());
602       connectionProperties.setProperty(OILServerILFactory.OIL_ADDRESS_KEY, "" + socketAddress.getHostAddress());
603       connectionProperties.setProperty(OILServerILFactory.OIL_TCPNODELAY_KEY, enableTcpNoDelay ? "yes" : "no");
604
605       bindJNDIReferences();
606
607    }
608
609    /**
610     * Stops this IL, and unbinds it from JNDI.
611     */

612    public void stopService()
613    {
614       try
615       {
616          unbindJNDIReferences();
617       }
618       catch (Exception JavaDoc e)
619       {
620          log.error("Exception unbinding from JNDI", e);
621       }
622       try
623       {
624          running = false;
625          if (serverSocket != null)
626             serverSocket.close();
627       }
628       catch (Exception JavaDoc e)
629       {
630          log.debug("Exception stopping server thread", e);
631       }
632    }
633
634    /**
635     * Get the OIL server listening port
636     *
637     * @return Value of property serverBindPort.
638     *
639     * @jmx:managed-attribute
640     */

641    public int getServerBindPort()
642    {
643       return serverBindPort;
644    }
645
646    /**
647     * Set the OIL server listening port
648     *
649     * @param serverBindPort New value of property serverBindPort.
650     *
651     * @jmx:managed-attribute
652     */

653    public void setServerBindPort(int serverBindPort)
654    {
655       this.serverBindPort = serverBindPort;
656    }
657
658    /**
659     * Get the interface address the OIL server bind its listening port on.
660     *
661     * @return The hostname or dotted decimal address that the service is
662     * bound to.
663     *
664     * @jmx:managed-attribute
665     */

666    public String JavaDoc getBindAddress()
667    {
668       String JavaDoc addr = "0.0.0.0";
669       if (bindAddress != null)
670          addr = bindAddress.getHostName();
671       return addr;
672    }
673    /**
674     * Set the interface address the OIL server bind its listening port on.
675     *
676     * @param host The host address to bind to, if any.
677     *
678     * @throws java.net.UnknownHostException Thrown if the hostname cannot
679     * be resolved to an InetAddress object.
680     *
681     * @jmx:managed-attribute
682     */

683    public void setBindAddress(String JavaDoc host) throws UnknownHostException JavaDoc
684    {
685       // If host is null or empty use any address
686
if (host == null || host.length() == 0)
687          bindAddress = null;
688       else
689          bindAddress = InetAddress.getByName(host);
690    }
691
692    /**
693     * Gets the enableTcpNoDelay.
694     * @return Returns a boolean
695     *
696     * @jmx:managed-attribute
697     */

698    public boolean getEnableTcpNoDelay()
699    {
700       return enableTcpNoDelay;
701    }
702
703    /**
704     * Sets the enableTcpNoDelay.
705     * @param enableTcpNoDelay The enableTcpNoDelay to set
706     *
707     * @jmx:managed-attribute
708     */

709    public void setEnableTcpNoDelay(boolean enableTcpNoDelay)
710    {
711       this.enableTcpNoDelay = enableTcpNoDelay;
712    }
713
714    /**
715     * Gets the socket read timeout.
716     * @return Returns the read timeout in milli-seconds
717     *
718     * @jmx:managed-attribute
719     */

720    public int getReadTimeout()
721    {
722       return readTimeout;
723    }
724
725    /**
726     * Sets the read time out.
727     * @param timeout The read time out in milli seconds
728     *
729     * @jmx:managed-attribute
730     */

731    public void setReadTimeout(int timeout)
732    {
733       this.readTimeout = timeout;
734    }
735
736    /** Get the javax.net.SocketFactory implementation class to use on the
737     *client.
738     * @jmx:managed-attribute
739     */

740    public String JavaDoc getClientSocketFactory()
741    {
742       return clientSocketFactoryName;
743    }
744    /** Set the javax.net.SocketFactory implementation class to use on the
745     *client.
746     * @jmx:managed-attribute
747     */

748    public void setClientSocketFactory(String JavaDoc name)
749    {
750       this.clientSocketFactoryName = name;
751    }
752
753    /** Set the javax.net.ServerSocketFactory implementation class to use to
754     *create the service SocketFactory.
755     *@jmx:managed-attribute
756     */

757    public void setServerSocketFactory(String JavaDoc name) throws Exception JavaDoc
758    {
759       ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
760       Class JavaDoc ssfClass = loader.loadClass(name);
761       serverSocketFactory = (ServerSocketFactory) ssfClass.newInstance();
762    }
763    /** Get the javax.net.ServerSocketFactory implementation class to use to
764     *create the service SocketFactory.
765     *@jmx:managed-attribute
766     */

767    public String JavaDoc getServerSocketFactory()
768    {
769       String JavaDoc name = null;
770       if (serverSocketFactory != null)
771          name = serverSocketFactory.getClass().getName();
772       return name;
773    }
774
775    /** Set the security domain name to use with SSL aware socket factories
776     *@jmx:managed-attribute
777     */

778    public void setSecurityDomain(String JavaDoc domainName)
779    {
780       this.securityDomain = domainName;
781    }
782    /** Get the security domain name to use with SSL aware socket factories
783     *@jmx:managed-attribute
784     */

785    public String JavaDoc getSecurityDomain()
786    {
787       return this.securityDomain;
788    }
789 }
790 // vim:expandtab:tabstop=3:shiftwidth=3
791
Popular Tags