KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jivesoftware > whack > ExternalComponent


1 /**
2  * $RCSfile: ExternalComponent.java,v $
3  * $Revision: 1.7 $
4  * $Date: 2005/05/28 04:54:35 $
5  *
6  * Copyright 2005 Jive Software.
7  *
8  * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */

20
21 package org.jivesoftware.whack;
22
23 import org.dom4j.DocumentException;
24 import org.dom4j.Element;
25 import org.dom4j.io.XMLWriter;
26 import org.dom4j.io.XPPPacketReader;
27 import org.jivesoftware.whack.util.StringUtils;
28 import org.xmlpull.v1.XmlPullParser;
29 import org.xmlpull.v1.XmlPullParserException;
30 import org.xmlpull.v1.XmlPullParserFactory;
31 import org.xmpp.component.Component;
32 import org.xmpp.component.ComponentException;
33 import org.xmpp.component.ComponentManager;
34 import org.xmpp.packet.JID;
35 import org.xmpp.packet.Packet;
36 import org.xmpp.packet.StreamError;
37
38 import javax.net.SocketFactory;
39 import java.io.*;
40 import java.net.Socket JavaDoc;
41 import java.net.UnknownHostException JavaDoc;
42 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
43 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
44 import java.util.concurrent.TimeUnit JavaDoc;
45
46 /**
47  * ExternalComponents are responsible for connecting and authenticating with a remote server and
48  * for sending and processing received packets. In fact, an ExternalComponent is a wrapper on a
49  * Component that provides remote connection capabilities. The actual processing of the packets is
50  * done by the wrapped Component.
51  *
52  * @author Gaston Dombiak
53  */

54 public class ExternalComponent implements Component {
55     
56     /**
57      * The utf-8 charset for decoding and encoding XMPP packet streams.
58      */

59     private static String JavaDoc CHARSET = "UTF-8";
60
61     private Component component;
62     private ExternalComponentManager manager;
63
64     private Socket JavaDoc socket;
65     private XMLWriter xmlSerializer;
66     private XmlPullParserFactory factory = null;
67     private XPPPacketReader reader = null;
68     private Writer writer = null;
69     private boolean shutdown = false;
70
71     private String JavaDoc connectionID;
72     /**
73      * Hold the full domain of this component. The full domain is composed by the subdomain plus
74      * the domain of the server. E.g. conference.jivesoftware.com. The domain may change after a
75      * connection has been established with the server.
76      */

77     private String JavaDoc domain;
78     /**
79      * Holds the subdomain that is associated to this component. The subdomain is the initial part
80      * of the domain. The subdomain cannot be affected after establishing a connection with the
81      * server. E.g. conference.
82      */

83     private String JavaDoc subdomain;
84     /**
85      * Holds the IP address or host name where the connection must be made.
86      */

87     private String JavaDoc host;
88     private int port;
89     private SocketFactory socketFactory;
90
91     /**
92      * Pool of threads that are available for processing the requests.
93      */

94     private ThreadPoolExecutor JavaDoc threadPool;
95     /**
96      * Thread that will read the XML from the socket and ask this component to process the read
97      * packets.
98      */

99     private SocketReadThread readerThread;
100
101     public ExternalComponent(Component component, ExternalComponentManager manager) {
102         // Be default create a pool of 25 threads to process the received requests
103
this(component, manager, 25);
104     }
105
106     public ExternalComponent(Component component, ExternalComponentManager manager, int maxThreads) {
107         this.component = component;
108         this.manager = manager;
109
110         // Create a pool of threads that will process requests received by this component. If more
111
// threads are required then the command will be executed on the SocketReadThread process
112
threadPool = new ThreadPoolExecutor JavaDoc(1, maxThreads, 15, TimeUnit.SECONDS,
113                         new LinkedBlockingQueue JavaDoc<Runnable JavaDoc>(), new ThreadPoolExecutor.CallerRunsPolicy JavaDoc());
114     }
115
116     /**
117      * Generates a connection with the server and tries to authenticate. If an error occurs in any
118      * of the steps then a ComponentException is thrown.
119      *
120      * @param host the host to connect with.
121      * @param port the port to use.
122      * @param socketFactory SocketFactory to be used for generating the socket.
123      * @param subdomain the subdomain that this component will be handling.
124      * @throws ComponentException if an error happens during the connection and authentication steps.
125      */

126     public void connect(String JavaDoc host, int port, SocketFactory socketFactory, String JavaDoc subdomain)
127             throws ComponentException {
128         try {
129             // Open a socket to the server
130
this.socket = socketFactory.createSocket(host, port);
131             this.domain = subdomain + "." + manager.getServerName();
132             this.subdomain = subdomain;
133             // Keep these variables that will be used in case a reconnection is required
134
this.host= host;
135             this.port = port;
136             this.socketFactory = socketFactory;
137
138             try {
139                 factory = XmlPullParserFactory.newInstance();
140                 reader = new XPPPacketReader();
141                 reader.setXPPFactory(factory);
142
143                 reader.getXPPParser().setInput(new InputStreamReader(socket.getInputStream(),
144                         CHARSET));
145
146                 // Get a writer for sending the open stream tag
147
writer =
148                         new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(),
149                                 CHARSET));
150                 // Open the stream.
151
StringBuilder JavaDoc stream = new StringBuilder JavaDoc();
152                 stream.append("<stream:stream");
153                 stream.append(" xmlns=\"jabber:component:accept\"");
154                 stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
155                 stream.append(" to=\"" + domain + "\">");
156                 writer.write(stream.toString());
157                 writer.flush();
158                 stream = null;
159
160                 // Get the answer from the server
161
XmlPullParser xpp = reader.getXPPParser();
162                 for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
163                     eventType = xpp.next();
164                 }
165
166                 // Set the streamID returned from the server
167
connectionID = xpp.getAttributeValue("", "id");
168                 if (xpp.getAttributeValue("", "from") != null) {
169                     this.domain = xpp.getAttributeValue("", "from");
170                 }
171                 xmlSerializer = new XMLWriter(writer);
172
173                 // Handshake with the server
174
stream = new StringBuilder JavaDoc();
175                 stream.append("<handshake>");
176                 stream.append(StringUtils.hash(connectionID + manager.getSecretKey(subdomain)));
177                 stream.append("</handshake>");
178                 writer.write(stream.toString());
179                 writer.flush();
180                 stream = null;
181
182                 // Get the answer from the server
183
try {
184                     Element doc = reader.parseDocument().getRootElement();
185                     if ("error".equals(doc.getName())) {
186                         StreamError error = new StreamError(doc);
187                         // Close the connection
188
socket.close();
189                         socket = null;
190                         // throw the exception with the wrapped error
191
throw new ComponentException(error);
192                     }
193                     // Everything went fine
194
} catch (DocumentException e) {
195                     try { socket.close(); } catch (IOException ioe) {}
196                     throw new ComponentException(e);
197                 } catch (XmlPullParserException e) {
198                     try { socket.close(); } catch (IOException ioe) {}
199                     throw new ComponentException(e);
200                 }
201             } catch (XmlPullParserException e) {
202                 try { socket.close(); } catch (IOException ioe) {}
203                 throw new ComponentException(e);
204             }
205         }
206         catch (UnknownHostException JavaDoc uhe) {
207             try { if (socket != null) socket.close(); } catch (IOException e) {}
208             throw new ComponentException(uhe);
209         }
210         catch (IOException ioe) {
211             try { if (socket != null) socket.close(); } catch (IOException e) {}
212             throw new ComponentException(ioe);
213         }
214     }
215
216     public Component getComponent() {
217         return component;
218     }
219
220     public String JavaDoc getName() {
221         return component.getName();
222     }
223
224     public String JavaDoc getDescription() {
225         return component.getDescription();
226     }
227
228     /**
229      * Returns the domain provided by this component in the connected server. The domain is
230      * composed by the subdomain plus the domain of the server. E.g. conference.jivesoftware.com.
231      * The domain may change after a connection has been established with the server.
232      *
233      * @return the domain provided by this component in the connected server.
234      */

235     public String JavaDoc getDomain() {
236         return domain;
237     }
238
239     /**
240      * Returns the subdomain provided by this component in the connected server. E.g. conference.
241      *
242      * @return the subdomain provided by this component in the connected server.
243      */

244     public String JavaDoc getSubdomain() {
245         return subdomain;
246     }
247
248     /**
249      * Returns the ComponentManager that created this component.
250      *
251      * @return the ComponentManager that created this component.
252      */

253     ExternalComponentManager getManager() {
254         return manager;
255     }
256
257     public void processPacket(final Packet packet) {
258         threadPool.execute(new Runnable JavaDoc() {
259             public void run() {
260                 component.processPacket(packet);
261             }
262         });
263     }
264
265     public void send(Packet packet) {
266         synchronized (writer) {
267             try {
268                 xmlSerializer.write(packet.getElement());
269                 xmlSerializer.flush();
270             }
271             catch (IOException e) {
272                 // Log the exception
273
manager.getLog().error(e);
274                 // Unbind this component from the serviced subdomain
275
try {
276                     manager.removeComponent(subdomain);
277                 } catch (ComponentException e1) {
278                     manager.getLog().error(e);
279                 }
280             }
281         }
282     }
283
284     public void initialize(JID jid, ComponentManager componentManager) throws ComponentException {
285         component.initialize(jid, componentManager);
286     }
287
288     public void start() {
289         // Everything went fine so start reading packets from the server
290
readerThread = new SocketReadThread(this, reader);
291         readerThread.setDaemon(true);
292         readerThread.start();
293         // Notify the component that it will be notified of new received packets
294
component.start();
295     }
296
297     public void shutdown() {
298         shutdown = true;
299         disconnect();
300     }
301
302     private void disconnect() {
303         if (readerThread != null) {
304             readerThread.shutdown();
305         }
306         threadPool.shutdown();
307         if (socket != null && !socket.isClosed()) {
308             try {
309                 synchronized (writer) {
310                     try {
311                         writer.write("</stream:stream>");
312                         xmlSerializer.flush();
313                     }
314                     catch (IOException e) {}
315                 }
316             }
317             catch (Exception JavaDoc e) {
318                 // Do nothing
319
}
320             try {
321                 socket.close();
322             }
323             catch (Exception JavaDoc e) {
324                 manager.getLog().error(e);
325             }
326         }
327     }
328
329     /**
330      * Notification message that the connection with the server was lost unexpectedly. We will try
331      * to reestablish the connection for ever until the connection has been reestablished or this
332      * thread has been stopped.
333      */

334     public void connectionLost() {
335         readerThread = null;
336         boolean isConnected = false;
337         while (!isConnected && !shutdown) {
338             try {
339                 connect(host, port, socketFactory, subdomain);
340                 isConnected = true;
341                 // It may be possible that while a new connection was being established the
342
// component was required to shutdown so in this case we need to close the new
343
// connection
344
if (shutdown) {
345                     disconnect();
346                 }
347             } catch (ComponentException e) {
348                 manager.getLog().error("Error trying to reconnect with the server", e);
349                 // Wait for 5 seconds until the next retry
350
try {
351                     Thread.sleep(5000);
352                 } catch (InterruptedException JavaDoc e1) {}
353             }
354         }
355     }
356 }
357
Popular Tags