KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jivesoftware > messenger > net > ServerSocketReader


1 /**
2  * $RCSfile: ServerSocketReader.java,v $
3  * $Revision: 1.4 $
4  * $Date: 2005/06/17 05:43:20 $
5  *
6  * Copyright (C) 2004 Jive Software. All rights reserved.
7  *
8  * This software is published under the terms of the GNU Public License (GPL),
9  * a copy of which is included in this distribution.
10  */

11
12 package org.jivesoftware.messenger.net;
13
14 import org.dom4j.Element;
15 import org.jivesoftware.messenger.PacketRouter;
16 import org.jivesoftware.messenger.auth.UnauthorizedException;
17 import org.jivesoftware.messenger.interceptor.PacketRejectedException;
18 import org.jivesoftware.messenger.server.IncomingServerSession;
19 import org.jivesoftware.util.JiveGlobals;
20 import org.jivesoftware.util.Log;
21 import org.xmlpull.v1.XmlPullParserException;
22 import org.xmpp.packet.*;
23
24 import java.io.IOException JavaDoc;
25 import java.net.Socket JavaDoc;
26 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
27 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
28 import java.util.concurrent.TimeUnit JavaDoc;
29
30 /**
31  * A SocketReader specialized for server connections. This reader will be used when the open
32  * stream contains a jabber:server namespace. Server-to-server communication requires two
33  * TCP connections between the servers where one is used for sending packets whilst the other
34  * connection is used for receiving packets. The connection used for receiving packets will use
35  * a ServerSocketReader since the other connection will not receive packets.<p>
36  *
37  * The received packets will be routed using another thread to ensure that many received packets
38  * could be routed at the same time. To avoid creating new threads every time a packet is received
39  * each <tt>ServerSocketReader</tt> instance uses a {@link ThreadPoolExecutor}. By default the
40  * maximum number of threads that the executor may have is 50. However, this value may be modified
41  * by changing the property <b>xmpp.server.processing.threads</b>.
42  *
43  * @author Gaston Dombiak
44  */

45 public class ServerSocketReader extends SocketReader {
46
47     /**
48      * Pool of threads that are available for processing the requests.
49      */

50     private ThreadPoolExecutor JavaDoc threadPool;
51
52     public ServerSocketReader(PacketRouter router, String JavaDoc serverName, Socket JavaDoc socket,
53             SocketConnection connection) {
54         super(router, serverName, socket, connection);
55         // Create a pool of threads that will process received packets. If more threads are
56
// required then the command will be executed on the SocketReader process
57
int maxThreads = JiveGlobals.getIntProperty("xmpp.server.processing.threads", 50);
58         threadPool =
59                 new ThreadPoolExecutor JavaDoc(1, maxThreads, 60, TimeUnit.SECONDS,
60                         new LinkedBlockingQueue JavaDoc<Runnable JavaDoc>(),
61                         new ThreadPoolExecutor.CallerRunsPolicy JavaDoc());
62     }
63
64     /**
65      * Processes the packet in another thread if the packet has not been rejected.
66      *
67      * @param packet the received packet.
68      */

69     protected void processIQ(final IQ packet) throws UnauthorizedException {
70         try {
71             packetReceived(packet);
72             // Process the packet in another thread
73
threadPool.execute(new Runnable JavaDoc() {
74                 public void run() {
75                     try {
76                         ServerSocketReader.super.processIQ(packet);
77                     }
78                     catch (UnauthorizedException e) {
79                         Log.error("Error processing packet", e);
80                     }
81                 }
82             });
83         }
84         catch (PacketRejectedException e) {
85             Log.debug("IQ rejected: " + packet.toXML(), e);
86         }
87     }
88
89     /**
90      * Processes the packet in another thread if the packet has not been rejected.
91      *
92      * @param packet the received packet.
93      */

94     protected void processPresence(final Presence packet) throws UnauthorizedException {
95         try {
96             packetReceived(packet);
97             // Process the packet in another thread
98
threadPool.execute(new Runnable JavaDoc() {
99                 public void run() {
100                     try {
101                         ServerSocketReader.super.processPresence(packet);
102                     }
103                     catch (UnauthorizedException e) {
104                         Log.error("Error processing packet", e);
105                     }
106                 }
107             });
108         }
109         catch (PacketRejectedException e) {
110             Log.debug("Presence rejected: " + packet.toXML(), e);
111         }
112     }
113
114     /**
115      * Processes the packet in another thread if the packet has not been rejected.
116      *
117      * @param packet the received packet.
118      */

119     protected void processMessage(final Message packet) throws UnauthorizedException {
120         try {
121             packetReceived(packet);
122             // Process the packet in another thread
123
threadPool.execute(new Runnable JavaDoc() {
124                 public void run() {
125                     try {
126                         ServerSocketReader.super.processMessage(packet);
127                     }
128                     catch (UnauthorizedException e) {
129                         Log.error("Error processing packet", e);
130                     }
131                 }
132             });
133         }
134         catch (PacketRejectedException e) {
135             Log.debug("Message rejected: " + packet.toXML(), e);
136         }
137     }
138
139     /**
140      * Remote servers may send subsequent db:result packets so we need to process them in order
141      * to validate new domains.
142      *
143      * @param doc the unknown DOM element that was received
144      * @return true if the packet is a db:result packet otherwise false.
145      */

146     protected boolean processUnknowPacket(Element doc) {
147         // Handle subsequent db:result packets
148
if ("db".equals(doc.getNamespacePrefix()) && "result".equals(doc.getName())) {
149             if (!((IncomingServerSession) session).validateSubsequentDomain(doc)) {
150                 open = false;
151             }
152             return true;
153         }
154         else if ("db".equals(doc.getNamespacePrefix()) && "verify".equals(doc.getName())) {
155             // The Receiving Server is reusing an existing connection for sending the
156
// Authoritative Server a request for verification of a key
157
((IncomingServerSession) session).verifyReceivedKey(doc);
158             return true;
159         }
160         return false;
161     }
162
163     /**
164      * Make sure that the received packet has a TO and FROM values defined and that it was sent
165      * from a previously validated domain. If the packet does not matches any of the above
166      * conditions then a PacketRejectedException will be thrown.
167      *
168      * @param packet the received packet.
169      * @throws PacketRejectedException if the packet does not include a TO or FROM or if the packet
170      * was sent from a domain that was not previously validated.
171      */

172     private void packetReceived(Packet packet) throws PacketRejectedException {
173         if (packet.getTo() == null || packet.getFrom() == null) {
174             Log.debug("Closing IncomingServerSession due to packet with no TO or FROM: " +
175                     packet.toXML());
176             // Send a stream error saying that the packet includes no TO or FROM
177
StreamError error = new StreamError(StreamError.Condition.improper_addressing);
178             connection.deliverRawText(error.toXML());
179             // Close the underlying connection
180
connection.close();
181             open = false;
182             throw new PacketRejectedException("Packet with no TO or FROM attributes");
183         }
184         else if (!((IncomingServerSession) session).isValidDomain(packet.getFrom().getDomain())) {
185             Log.debug("Closing IncomingServerSession due to packet with invalid domain: " +
186                     packet.toXML());
187             // Send a stream error saying that the packet includes an invalid FROM
188
StreamError error = new StreamError(StreamError.Condition.invalid_from);
189             connection.deliverRawText(error.toXML());
190             // Close the underlying connection
191
connection.close();
192             open = false;
193             throw new PacketRejectedException("Packet with no TO or FROM attributes");
194         }
195     }
196
197     protected void shutdown() {
198         super.shutdown();
199         // Shutdown the pool of threads that are processing packets sent by
200
// the remote server
201
threadPool.shutdown();
202     }
203
204     boolean createSession(String JavaDoc namespace) throws UnauthorizedException, XmlPullParserException,
205             IOException JavaDoc {
206         if ("jabber:server".equals(namespace)) {
207             // The connected client is a server so create an IncomingServerSession
208
session = IncomingServerSession.createSession(serverName, reader, connection);
209             return true;
210         }
211         return false;
212     }
213 }
214
Popular Tags