KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jivesoftware > messenger > net > SocketReader


1 /**
2  * $RCSfile: SocketReader.java,v $
3  * $Revision: 1.5 $
4  * $Date: 2005/07/20 18:33:38 $
5  *
6  * Copyright (C) 2004 Jive Software. All rights reserved.
7  *
8  * This software is published under the terms of the GNU Public License (GPL),
9  * a copy of which is included in this distribution.
10  */

11
12 package org.jivesoftware.messenger.net;
13
14 import org.xmlpull.v1.XmlPullParserFactory;
15 import org.xmlpull.v1.XmlPullParserException;
16 import org.xmlpull.v1.XmlPullParser;
17 import org.jivesoftware.messenger.*;
18 import org.jivesoftware.messenger.auth.UnauthorizedException;
19 import org.jivesoftware.messenger.interceptor.InterceptorManager;
20 import org.jivesoftware.messenger.interceptor.PacketRejectedException;
21 import org.jivesoftware.util.Log;
22 import org.jivesoftware.util.LocaleUtils;
23 import org.dom4j.io.XPPPacketReader;
24 import org.dom4j.Element;
25 import org.xmpp.packet.*;
26
27 import java.net.Socket JavaDoc;
28 import java.net.SocketException JavaDoc;
29 import java.io.InputStreamReader JavaDoc;
30 import java.io.IOException JavaDoc;
31 import java.io.EOFException JavaDoc;
32 import java.io.Writer JavaDoc;
33
34 /**
35  * A SocketReader creates the appropriate {@link Session} based on the defined namespace in the
36  * stream element and will then keep reading and routing the received packets.
37  *
38  * @author Gaston Dombiak
39  */

40 public abstract class SocketReader implements Runnable JavaDoc {
41
42     /**
43      * The utf-8 charset for decoding and encoding Jabber packet streams.
44      */

45     private static String JavaDoc CHARSET = "UTF-8";
46     /**
47      * Reuse the same factory for all the connections.
48      */

49     private static XmlPullParserFactory factory = null;
50
51     private Socket JavaDoc socket;
52     protected Session session;
53     protected SocketConnection connection;
54     protected String JavaDoc serverName;
55
56     /**
57      * Router used to route incoming packets to the correct channels.
58      */

59     private PacketRouter router;
60     XPPPacketReader reader = null;
61     protected boolean open;
62
63     static {
64         try {
65             factory = XmlPullParserFactory.newInstance();
66         }
67         catch (XmlPullParserException e) {
68             Log.error("Error creating a parser factory", e);
69         }
70     }
71
72     /**
73      * Creates a dedicated reader for a socket.
74      *
75      * @param router the router for sending packets that were read.
76      * @param serverName the name of the server this socket is working for.
77      * @param socket the socket to read from.
78      * @param connection the connection being read.
79      */

80     public SocketReader(PacketRouter router, String JavaDoc serverName, Socket JavaDoc socket,
81             SocketConnection connection) {
82         this.serverName = serverName;
83         this.router = router;
84         this.connection = connection;
85         this.socket = socket;
86     }
87
88     /**
89      * A dedicated thread loop for reading the stream and sending incoming
90      * packets to the appropriate router.
91      */

92     public void run() {
93         try {
94             reader = new XPPPacketReader();
95             reader.setXPPFactory(factory);
96
97             reader.getXPPParser().setInput(new InputStreamReader JavaDoc(socket.getInputStream(),
98                     CHARSET));
99
100             // Read in the opening tag and prepare for packet stream
101
try {
102                 createSession();
103             }
104             catch (IOException JavaDoc e) {
105                 Log.debug("Error creating session", e);
106                 throw e;
107             }
108
109             // Read the packet stream until it ends
110
if (session != null) {
111                 readStream();
112             }
113
114         }
115         catch (EOFException JavaDoc eof) {
116             // Normal disconnect
117
}
118         catch (SocketException JavaDoc se) {
119             // The socket was closed. The server may close the connection for several
120
// reasons (e.g. user requested to remove his account). Do nothing here.
121
}
122         catch (XmlPullParserException ie) {
123             // It is normal for clients to abruptly cut a connection
124
// rather than closing the stream document. Since this is
125
// normal behavior, we won't log it as an error.
126
// Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);
127
}
128         catch (Exception JavaDoc e) {
129             if (session != null) {
130                 Log.warn(LocaleUtils.getLocalizedString("admin.error.stream"), e);
131             }
132         }
133         finally {
134             if (session != null) {
135                 Log.debug("Logging off " + session.getAddress() + " on " + connection);
136                 try {
137                     session.getConnection().close();
138                 }
139                 catch (Exception JavaDoc e) {
140                     Log.warn(LocaleUtils.getLocalizedString("admin.error.connection")
141                             + "\n" + socket.toString());
142                 }
143             }
144             else {
145                 Log.error(LocaleUtils.getLocalizedString("admin.error.connection")
146                         + "\n" + socket.toString());
147             }
148             shutdown();
149         }
150     }
151
152     /**
153      * Read the incoming stream until it ends.
154      */

155     private void readStream() throws Exception JavaDoc {
156         open = true;
157         while (open) {
158             Element doc = reader.parseDocument().getRootElement();
159
160             if (doc == null) {
161                 // Stop reading the stream since the client has sent an end of
162
// stream element and probably closed the connection.
163
return;
164             }
165
166             String JavaDoc tag = doc.getName();
167             if ("message".equals(tag)) {
168                 Message packet = null;
169                 try {
170                     packet = new Message(doc);
171                 }
172                 catch(IllegalArgumentException JavaDoc e) {
173                     // The original packet contains a malformed JID so answer with an error.
174
Message reply = new Message();
175                     reply.setID(doc.attributeValue("id"));
176                     reply.setTo(session.getAddress());
177                     reply.getElement().addAttribute("from", doc.attributeValue("to"));
178                     reply.setError(PacketError.Condition.jid_malformed);
179                     session.process(reply);
180                     continue;
181                 }
182                 processMessage(packet);
183             }
184             else if ("presence".equals(tag)) {
185                 Presence packet = null;
186                 try {
187                     packet = new Presence(doc);
188                 }
189                 catch (IllegalArgumentException JavaDoc e) {
190                     // The original packet contains a malformed JID so answer an error
191
Presence reply = new Presence();
192                     reply.setID(doc.attributeValue("id"));
193                     reply.setTo(session.getAddress());
194                     reply.getElement().addAttribute("from", doc.attributeValue("to"));
195                     reply.setError(PacketError.Condition.jid_malformed);
196                     session.process(reply);
197                     continue;
198                 }
199                 try {
200                     packet.getType();
201                 }
202                 catch (IllegalArgumentException JavaDoc e) {
203                     Log.warn("Invalid presence type", e);
204                     // The presence packet contains an invalid presence type so replace it with
205
// an available presence type
206
packet.setType(null);
207                 }
208                 processPresence(packet);
209             }
210             else if ("iq".equals(tag)) {
211                 IQ packet = null;
212                 try {
213                     packet = getIQ(doc);
214                 }
215                 catch(IllegalArgumentException JavaDoc e) {
216                     // The original packet contains a malformed JID so answer an error
217
IQ reply = new IQ();
218                     if (!doc.elements().isEmpty()) {
219                         reply.setChildElement(((Element) doc.elements().get(0)).createCopy());
220                     }
221                     reply.setID(doc.attributeValue("id"));
222                     reply.setTo(session.getAddress());
223                     if (doc.attributeValue("to") != null) {
224                         reply.getElement().addAttribute("from", doc.attributeValue("to"));
225                     }
226                     reply.setError(PacketError.Condition.jid_malformed);
227                     session.process(reply);
228                     continue;
229                 }
230                 processIQ(packet);
231             }
232             else {
233                 if (!processUnknowPacket(doc)) {
234                     Log.warn(LocaleUtils.getLocalizedString("admin.error.packet.tag") +
235                             doc.asXML());
236                     open = false;
237                 }
238             }
239         }
240     }
241
242     /**
243      * Process the received IQ packet. Registered
244      * {@link org.jivesoftware.messenger.interceptor.PacketInterceptor} will be invoked before
245      * and after the packet was routed.<p>
246      *
247      * Subclasses may redefine this method for different reasons such as modifying the sender
248      * of the packet to avoid spoofing, rejecting the packet or even process the packet in
249      * another thread.
250      *
251      * @param packet the received packet.
252      */

253     protected void processIQ(IQ packet) throws UnauthorizedException {
254         try {
255             // Invoke the interceptors before we process the read packet
256
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
257                     false);
258             router.route(packet);
259             // Invoke the interceptors after we have processed the read packet
260
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
261                     true);
262             session.incrementClientPacketCount();
263         }
264         catch (PacketRejectedException e) {
265             // An interceptor rejected this packet so answer a not_allowed error
266
IQ reply = new IQ();
267             reply.setChildElement(packet.getChildElement().createCopy());
268             reply.setID(packet.getID());
269             reply.setTo(session.getAddress());
270             reply.setFrom(packet.getTo());
271             reply.setError(PacketError.Condition.not_allowed);
272             session.process(reply);
273             // Check if a message notifying the rejection should be sent
274
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
275                 // A message for the rejection will be sent to the sender of the rejected packet
276
Message notification = new Message();
277                 notification.setTo(session.getAddress());
278                 notification.setFrom(packet.getTo());
279                 notification.setBody(e.getRejectionMessage());
280                 session.process(notification);
281             }
282         }
283     }
284
285     /**
286      * Process the received Presence packet. Registered
287      * {@link org.jivesoftware.messenger.interceptor.PacketInterceptor} will be invoked before
288      * and after the packet was routed.<p>
289      *
290      * Subclasses may redefine this method for different reasons such as modifying the sender
291      * of the packet to avoid spoofing, rejecting the packet or even process the packet in
292      * another thread.
293      *
294      * @param packet the received packet.
295      */

296     protected void processPresence(Presence packet) throws UnauthorizedException {
297         try {
298             // Invoke the interceptors before we process the read packet
299
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
300                     false);
301             router.route(packet);
302             // Invoke the interceptors after we have processed the read packet
303
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
304                     true);
305             session.incrementClientPacketCount();
306         }
307         catch (PacketRejectedException e) {
308             // An interceptor rejected this packet so answer a not_allowed error
309
Presence reply = new Presence();
310             reply.setID(packet.getID());
311             reply.setTo(session.getAddress());
312             reply.setFrom(packet.getTo());
313             reply.setError(PacketError.Condition.not_allowed);
314             session.process(reply);
315             // Check if a message notifying the rejection should be sent
316
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
317                 // A message for the rejection will be sent to the sender of the rejected packet
318
Message notification = new Message();
319                 notification.setTo(session.getAddress());
320                 notification.setFrom(packet.getTo());
321                 notification.setBody(e.getRejectionMessage());
322                 session.process(notification);
323             }
324         }
325     }
326
327     /**
328      * Process the received Message packet. Registered
329      * {@link org.jivesoftware.messenger.interceptor.PacketInterceptor} will be invoked before
330      * and after the packet was routed.<p>
331      *
332      * Subclasses may redefine this method for different reasons such as modifying the sender
333      * of the packet to avoid spoofing, rejecting the packet or even process the packet in
334      * another thread.
335      *
336      * @param packet the received packet.
337      */

338     protected void processMessage(Message packet) throws UnauthorizedException {
339         try {
340             // Invoke the interceptors before we process the read packet
341
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
342                     false);
343             router.route(packet);
344             // Invoke the interceptors after we have processed the read packet
345
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
346                     true);
347             session.incrementClientPacketCount();
348         }
349         catch (PacketRejectedException e) {
350             // An interceptor rejected this packet
351
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
352                 // A message for the rejection will be sent to the sender of the rejected packet
353
Message reply = new Message();
354                 reply.setID(packet.getID());
355                 reply.setTo(session.getAddress());
356                 reply.setFrom(packet.getTo());
357                 reply.setType(packet.getType());
358                 reply.setThread(packet.getThread());
359                 reply.setBody(e.getRejectionMessage());
360                 session.process(reply);
361             }
362         }
363     }
364
365     /**
366      * Returns true if a received packet of an unkown type (i.e. not a Message, Presence
367      * or IQ) has been processed. If the packet was not processed then an exception will
368      * be thrown which will make the thread to stop processing further packets.
369      *
370      * @param doc the DOM element of an unkown type.
371      * @return true if a received packet has been processed.
372      */

373     abstract boolean processUnknowPacket(Element doc);
374
375     private IQ getIQ(Element doc) {
376         Element query = doc.element("query");
377         if (query != null && "jabber:iq:roster".equals(query.getNamespaceURI())) {
378             return new Roster(doc);
379         }
380         else {
381             return new IQ(doc);
382         }
383     }
384
385     /**
386      * Uses the XPP to grab the opening stream tag and create an active session
387      * object. The session to create will depend on the sent namespace. In all
388      * cases, the method obtains the opening stream tag, checks for errors, and
389      * either creates a session or returns an error and kills the connection.
390      * If the connection remains open, the XPP will be set to be ready for the
391      * first packet. A call to next() should result in an START_TAG state with
392      * the first packet in the stream.
393      */

394     private void createSession() throws UnauthorizedException, XmlPullParserException, IOException JavaDoc {
395         XmlPullParser xpp = reader.getXPPParser();
396         for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
397             eventType = xpp.next();
398         }
399
400         // Create the correct session based on the sent namespace
401
if (!createSession(xpp.getNamespace(null))) {
402             // No session was created because of an invalid namespace prefix so answer a stream
403
// error and close the underlying connection
404
Writer JavaDoc writer = connection.getWriter();
405             StringBuilder JavaDoc sb = new StringBuilder JavaDoc();
406             sb.append("<?xml version='1.0' encoding='");
407             sb.append(CHARSET);
408             sb.append("'?>");
409             // Include the bad-namespace-prefix in the response
410
StreamError error = new StreamError(StreamError.Condition.bad_namespace_prefix);
411             sb.append(error.toXML());
412             writer.write(sb.toString());
413             writer.flush();
414             // Close the underlying connection
415
connection.close();
416         }
417     }
418
419     /**
420      * Notification message indicating that the SocketReader is shutting down. The thread will
421      * stop reading and processing new requests. Subclasses may want to redefine this message
422      * for releasing any resource they might need.
423      */

424     protected void shutdown() {
425     }
426
427     /**
428      * Creates the appropriate {@link Session} subclass based on the specified namespace.
429      *
430      * @param namespace the namespace sent in the stream element. eg. jabber:client.
431      * @return the created session or null.
432      * @throws UnauthorizedException
433      * @throws XmlPullParserException
434      * @throws IOException
435      */

436     abstract boolean createSession(String JavaDoc namespace) throws UnauthorizedException,
437             XmlPullParserException, IOException JavaDoc;
438 }
439
Popular Tags