KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jivesoftware > messenger > net > SocketReadThread


1 /**
2  * $RCSfile: SocketReadThread.java,v $
3  * $Revision: 1.27 $
4  * $Date: 2005/04/11 21:12:34 $
5  *
6  * Copyright (C) 2004 Jive Software. All rights reserved.
7  *
8  * This software is published under the terms of the GNU Public License (GPL),
9  * a copy of which is included in this distribution.
10  */

11
12 package org.jivesoftware.messenger.net;
13
14 import org.dom4j.Element;
15 import org.dom4j.io.XPPPacketReader;
16 import org.jivesoftware.messenger.*;
17 import org.jivesoftware.messenger.auth.UnauthorizedException;
18 import org.jivesoftware.messenger.interceptor.InterceptorManager;
19 import org.jivesoftware.messenger.interceptor.PacketRejectedException;
20 import org.jivesoftware.util.LocaleUtils;
21 import org.jivesoftware.util.Log;
22 import org.xmlpull.v1.XmlPullParser;
23 import org.xmlpull.v1.XmlPullParserException;
24 import org.xmlpull.v1.XmlPullParserFactory;
25 import org.xmpp.packet.*;
26
27 import java.io.EOFException JavaDoc;
28 import java.io.IOException JavaDoc;
29 import java.io.InputStreamReader JavaDoc;
30 import java.io.Writer JavaDoc;
31 import java.net.Socket JavaDoc;
32 import java.net.SocketException JavaDoc;
33
34 /**
35  * Reads XMPP XML from a socket.
36  *
37  * @author Derek DeMoro
38  */

39 public class SocketReadThread extends Thread JavaDoc {
40
41     /**
42      * The utf-8 charset for decoding and encoding Jabber packet streams.
43      */

44     private static String JavaDoc CHARSET = "UTF-8";
45     /**
46      * Reuse the same factory for all the connections.
47      */

48     private static XmlPullParserFactory factory = null;
49
50     private Socket JavaDoc socket;
51     private Session session;
52     private SocketConnection connection;
53     private String JavaDoc serverName;
54     /**
55      * Router used to route incoming packets to the correct channels.
56      */

57     private PacketRouter router;
58     private boolean clearSignout = false;
59     XPPPacketReader reader = null;
60
61     static {
62         try {
63             factory = XmlPullParserFactory.newInstance();
64         }
65         catch (XmlPullParserException e) {
66             Log.error("Error creating a parser factory", e);
67         }
68     }
69
70     /**
71      * Creates a dedicated read thread for a socket.
72      *
73      * @param router the router for sending packets that were read.
74      * @param serverName the name of the server this socket is working for.
75      * @param socket the socket to read from.
76      * @param connection the connection being read.
77      */

78     public SocketReadThread(PacketRouter router, String JavaDoc serverName, Socket JavaDoc socket,
79             SocketConnection connection)
80     {
81         super("SRT reader");
82         this.serverName = serverName;
83         this.router = router;
84         this.connection = connection;
85         this.socket = socket;
86     }
87
88     /**
89      * A dedicated thread loop for reading the stream and sending incoming
90      * packets to the appropriate router.
91      */

92     public void run() {
93         try {
94             reader = new XPPPacketReader();
95             reader.setXPPFactory(factory);
96
97             reader.getXPPParser().setInput(new InputStreamReader JavaDoc(socket.getInputStream(),
98                     CHARSET));
99
100             // Read in the opening tag and prepare for packet stream
101
createSession();
102
103             // Read the packet stream until it ends
104
if (session != null) {
105                 readStream();
106             }
107
108         }
109         catch (EOFException JavaDoc eof) {
110             // Normal disconnect
111
}
112         catch (SocketException JavaDoc se) {
113             // The socket was closed. The server may close the connection for several
114
// reasons (e.g. user requested to remove his account). Do nothing here.
115
}
116         catch (XmlPullParserException ie) {
117             // Check if the user abruptly cut the connection without sending previously an
118
// unavailable presence
119
if (clearSignout == false) {
120                 if (session != null && session.getStatus() == Session.STATUS_AUTHENTICATED) {
121                     if (session instanceof ClientSession) {
122                         Presence presence = ((ClientSession) session).getPresence();
123                         if (presence != null) {
124                             // Simulate an unavailable presence sent by the user.
125
Presence packet = presence.createCopy();
126                             packet.setType(Presence.Type.unavailable);
127                             packet.setFrom(session.getAddress());
128                             router.route(packet);
129                             clearSignout = true;
130                         }
131                     }
132                 }
133             }
134             // It is normal for clients to abruptly cut a connection
135
// rather than closing the stream document. Since this is
136
// normal behavior, we won't log it as an error.
137
// Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);
138
}
139         catch (Exception JavaDoc e) {
140             if (session != null) {
141                 Log.warn(LocaleUtils.getLocalizedString("admin.error.stream"), e);
142             }
143         }
144         finally {
145             if (session != null) {
146                 Log.debug("Logging off " + session.getAddress() + " on " + connection);
147                 try {
148                     // Allow everything to settle down after a disconnect
149
// e.g. presence updates to avoid sending double
150
// presence unavailable's
151
sleep(3000);
152                     session.getConnection().close();
153                 }
154                 catch (Exception JavaDoc e) {
155                     Log.warn(LocaleUtils.getLocalizedString("admin.error.connection")
156                             + "\n" + socket.toString());
157                 }
158             }
159             else {
160                 Log.error(LocaleUtils.getLocalizedString("admin.error.connection")
161                         + "\n" + socket.toString());
162             }
163         }
164     }
165
166     /**
167      * Read the incoming stream until it ends.
168      */

169     private void readStream() throws Exception JavaDoc {
170         while (true) {
171             Element doc = reader.parseDocument().getRootElement();
172
173             if (doc == null) {
174                 // Stop reading the stream since the client has sent an end of
175
// stream element and probably closed the connection.
176
return;
177             }
178
179             String JavaDoc tag = doc.getName();
180             if ("message".equals(tag)) {
181                 Message packet = null;
182                 try {
183                     packet = new Message(doc);
184                 }
185                 catch(IllegalArgumentException JavaDoc e) {
186                     // The original packet contains a malformed JID so answer with an error.
187
Message reply = new Message();
188                     reply.setID(doc.attributeValue("id"));
189                     reply.setTo(session.getAddress());
190                     reply.getElement().addAttribute("from", doc.attributeValue("to"));
191                     reply.setError(PacketError.Condition.jid_malformed);
192                     session.process(reply);
193                     continue;
194                 }
195                 // Notify the session that a new packet has been received. This could be useful
196
// for client sessions for setting the real sender of the packet and avoid spoofing
197
session.packetReceived(packet);
198                 try {
199                     // Invoke the interceptors before we process the read packet
200
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
201                             false);
202                     router.route(packet);
203                     // Invoke the interceptors after we have processed the read packet
204
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
205                             true);
206                     session.incrementClientPacketCount();
207                 }
208                 catch (PacketRejectedException e) {
209                     // An interceptor rejected this packet so answer a not_allowed error
210
Message reply = new Message();
211                     reply.setID(packet.getID());
212                     reply.setTo(session.getAddress());
213                     reply.setFrom(packet.getTo());
214                     reply.setError(PacketError.Condition.not_allowed);
215                     session.process(reply);
216                 }
217             }
218             else if ("presence".equals(tag)) {
219                 Presence packet = null;
220                 try {
221                     packet = new Presence(doc);
222                 }
223                 catch(IllegalArgumentException JavaDoc e) {
224                     // The original packet contains a malformed JID so answer an error
225
Presence reply = new Presence();
226                     reply.setID(doc.attributeValue("id"));
227                     reply.setTo(session.getAddress());
228                     reply.getElement().addAttribute("from", doc.attributeValue("to"));
229                     reply.setError(PacketError.Condition.jid_malformed);
230                     session.process(reply);
231                     continue;
232                 }
233                 // Notify the session that a new packet has been received. This could be useful
234
// for client sessions for setting the real sender of the packet and avoid spoofing
235
session.packetReceived(packet);
236                 try {
237                     // Invoke the interceptors before we process the read packet
238
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
239                             false);
240                     router.route(packet);
241                     // Invoke the interceptors after we have processed the read packet
242
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
243                             true);
244                     session.incrementClientPacketCount();
245                     // Update the flag that indicates if the user made a clean sign out
246
clearSignout = (Presence.Type.unavailable == packet.getType() ? true : false);
247                 }
248                 catch (PacketRejectedException e) {
249                     // An interceptor rejected this packet so answer a not_allowed error
250
Presence reply = new Presence();
251                     reply.setID(packet.getID());
252                     reply.setTo(session.getAddress());
253                     reply.setFrom(packet.getTo());
254                     reply.setError(PacketError.Condition.not_allowed);
255                     session.process(reply);
256                 }
257             }
258             else if ("iq".equals(tag)) {
259                 IQ packet = null;
260                 try {
261                     packet = getIQ(doc);
262                 }
263                 catch(IllegalArgumentException JavaDoc e) {
264                     // The original packet contains a malformed JID so answer an error
265
IQ reply = new IQ();
266                     if (!doc.elements().isEmpty()) {
267                         reply.setChildElement(((Element) doc.elements().get(0)).createCopy());
268                     }
269                     reply.setID(doc.attributeValue("id"));
270                     reply.setTo(session.getAddress());
271                     if (doc.attributeValue("to") != null) {
272                         reply.getElement().addAttribute("from", doc.attributeValue("to"));
273                     }
274                     reply.setError(PacketError.Condition.jid_malformed);
275                     session.process(reply);
276                     continue;
277                 }
278                 // Notify the session that a new packet has been received. This could be useful
279
// for client sessions for setting the real sender of the packet and avoid spoofing
280
session.packetReceived(packet);
281                 try {
282                     // Invoke the interceptors before we process the read packet
283
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
284                             false);
285                     router.route(packet);
286                     // Invoke the interceptors after we have processed the read packet
287
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
288                             true);
289                     session.incrementClientPacketCount();
290                 }
291                 catch (PacketRejectedException e) {
292                     // An interceptor rejected this packet so answer a not_allowed error
293
IQ reply = new IQ();
294                     reply.setChildElement(packet.getChildElement().createCopy());
295                     reply.setID(packet.getID());
296                     reply.setTo(session.getAddress());
297                     reply.setFrom(packet.getTo());
298                     reply.setError(PacketError.Condition.not_allowed);
299                     session.process(reply);
300                 }
301             }
302             else {
303                 throw new XmlPullParserException(LocaleUtils.getLocalizedString(
304                         "admin.error.packet.tag") + tag);
305             }
306         }
307     }
308
309     private IQ getIQ(Element doc) {
310         Element query = doc.element("query");
311         if (query != null && "jabber:iq:roster".equals(query.getNamespaceURI())) {
312             return new Roster(doc);
313         }
314         else {
315             return new IQ(doc);
316         }
317     }
318
319     /**
320      * Uses the XPP to grab the opening stream tag and create an active session
321      * object. The session to create will depend on the sent namespace. In all
322      * cases, the method obtains the opening stream tag, checks for errors, and
323      * either creates a session or returns an error and kills the connection.
324      * If the connection remains open, the XPP will be set to be ready for the
325      * first packet. A call to next() should result in an START_TAG state with
326      * the first packet in the stream.
327      */

328     private void createSession() throws UnauthorizedException, XmlPullParserException, IOException JavaDoc {
329         XmlPullParser xpp = reader.getXPPParser();
330         for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
331             eventType = xpp.next();
332         }
333
334         // Create the correct session based on the sent namespace
335
if ("jabber:client".equals(xpp.getNamespace(null))) {
336             // The connected client is a regular client so create a ClientSession
337
session = ClientSession.createSession(serverName, reader, connection);
338         }
339         else if ("jabber:component:accept".equals(xpp.getNamespace(null))) {
340             // The connected client is a component so create a ComponentSession
341
session = ComponentSession.createSession(serverName, reader, connection);
342         }
343         else {
344             Writer JavaDoc writer = connection.getWriter();
345             StringBuilder JavaDoc sb = new StringBuilder JavaDoc();
346             sb.append("<?xml version='1.0' encoding='");
347             sb.append(CHARSET);
348             sb.append("'?>");
349             // Include the bad-namespace-prefix in the response
350
sb.append("<stream:error>");
351             sb.append("<bad-namespace-prefix xmlns=\"urn:ietf:params:xml:ns:xmpp-streams\"/>");
352             sb.append("</stream:error>");
353             sb.append("</stream:stream>");
354             writer.write(sb.toString());
355             writer.flush();
356             // Close the underlying connection
357
connection.close();
358         }
359     }
360 }
361
Popular Tags