1 11 12 package org.jivesoftware.messenger.net; 13 14 import org.xmlpull.v1.XmlPullParserFactory; 15 import org.xmlpull.v1.XmlPullParserException; 16 import org.xmlpull.v1.XmlPullParser; 17 import org.jivesoftware.messenger.*; 18 import org.jivesoftware.messenger.auth.UnauthorizedException; 19 import org.jivesoftware.messenger.interceptor.InterceptorManager; 20 import org.jivesoftware.messenger.interceptor.PacketRejectedException; 21 import org.jivesoftware.util.Log; 22 import org.jivesoftware.util.LocaleUtils; 23 import org.dom4j.io.XPPPacketReader; 24 import org.dom4j.Element; 25 import org.xmpp.packet.*; 26 27 import java.net.Socket ; 28 import java.net.SocketException ; 29 import java.io.InputStreamReader ; 30 import java.io.IOException ; 31 import java.io.EOFException ; 32 import java.io.Writer ; 33 34 40 public abstract class SocketReader implements Runnable { 41 42 45 private static String CHARSET = "UTF-8"; 46 49 private static XmlPullParserFactory factory = null; 50 51 private Socket socket; 52 protected Session session; 53 protected SocketConnection connection; 54 protected String serverName; 55 56 59 private PacketRouter router; 60 XPPPacketReader reader = null; 61 protected boolean open; 62 63 static { 64 try { 65 factory = XmlPullParserFactory.newInstance(); 66 } 67 catch (XmlPullParserException e) { 68 Log.error("Error creating a parser factory", e); 69 } 70 } 71 72 80 public SocketReader(PacketRouter router, String serverName, Socket socket, 81 SocketConnection connection) { 82 this.serverName = serverName; 83 this.router = router; 84 this.connection = connection; 85 this.socket = socket; 86 } 87 88 92 public void run() { 93 try { 94 reader = new XPPPacketReader(); 95 reader.setXPPFactory(factory); 96 97 reader.getXPPParser().setInput(new InputStreamReader (socket.getInputStream(), 98 CHARSET)); 99 100 try { 102 createSession(); 103 } 104 catch (IOException e) { 105 Log.debug("Error creating session", e); 106 throw e; 107 } 108 109 if (session != null) { 111 readStream(); 112 } 113 114 } 115 catch (EOFException eof) { 116 } 118 catch (SocketException se) { 119 } 122 catch (XmlPullParserException ie) { 123 } 128 catch (Exception e) { 129 if (session != null) { 130 Log.warn(LocaleUtils.getLocalizedString("admin.error.stream"), e); 131 } 132 } 133 finally { 134 if (session != null) { 135 Log.debug("Logging off " + session.getAddress() + " on " + connection); 136 try { 137 session.getConnection().close(); 138 } 139 catch (Exception e) { 140 Log.warn(LocaleUtils.getLocalizedString("admin.error.connection") 141 + "\n" + socket.toString()); 142 } 143 } 144 else { 145 Log.error(LocaleUtils.getLocalizedString("admin.error.connection") 146 + "\n" + socket.toString()); 147 } 148 shutdown(); 149 } 150 } 151 152 155 private void readStream() throws Exception { 156 open = true; 157 while (open) { 158 Element doc = reader.parseDocument().getRootElement(); 159 160 if (doc == null) { 161 return; 164 } 165 166 String tag = doc.getName(); 167 if ("message".equals(tag)) { 168 Message packet = null; 169 try { 170 packet = new Message(doc); 171 } 172 catch(IllegalArgumentException e) { 173 Message reply = new Message(); 175 reply.setID(doc.attributeValue("id")); 176 reply.setTo(session.getAddress()); 177 reply.getElement().addAttribute("from", doc.attributeValue("to")); 178 reply.setError(PacketError.Condition.jid_malformed); 179 session.process(reply); 180 continue; 181 } 182 processMessage(packet); 183 } 184 else if ("presence".equals(tag)) { 185 Presence packet = null; 186 try { 187 packet = new Presence(doc); 188 } 189 catch (IllegalArgumentException e) { 190 Presence reply = new Presence(); 192 reply.setID(doc.attributeValue("id")); 193 reply.setTo(session.getAddress()); 194 reply.getElement().addAttribute("from", doc.attributeValue("to")); 195 reply.setError(PacketError.Condition.jid_malformed); 196 session.process(reply); 197 continue; 198 } 199 try { 200 packet.getType(); 201 } 202 catch (IllegalArgumentException e) { 203 Log.warn("Invalid presence type", e); 204 packet.setType(null); 207 } 208 processPresence(packet); 209 } 210 else if ("iq".equals(tag)) { 211 IQ packet = null; 212 try { 213 packet = getIQ(doc); 214 } 215 catch(IllegalArgumentException e) { 216 IQ reply = new IQ(); 218 if (!doc.elements().isEmpty()) { 219 reply.setChildElement(((Element) doc.elements().get(0)).createCopy()); 220 } 221 reply.setID(doc.attributeValue("id")); 222 reply.setTo(session.getAddress()); 223 if (doc.attributeValue("to") != null) { 224 reply.getElement().addAttribute("from", doc.attributeValue("to")); 225 } 226 reply.setError(PacketError.Condition.jid_malformed); 227 session.process(reply); 228 continue; 229 } 230 processIQ(packet); 231 } 232 else { 233 if (!processUnknowPacket(doc)) { 234 Log.warn(LocaleUtils.getLocalizedString("admin.error.packet.tag") + 235 doc.asXML()); 236 open = false; 237 } 238 } 239 } 240 } 241 242 253 protected void processIQ(IQ packet) throws UnauthorizedException { 254 try { 255 InterceptorManager.getInstance().invokeInterceptors(packet, session, true, 257 false); 258 router.route(packet); 259 InterceptorManager.getInstance().invokeInterceptors(packet, session, true, 261 true); 262 session.incrementClientPacketCount(); 263 } 264 catch (PacketRejectedException e) { 265 IQ reply = new IQ(); 267 reply.setChildElement(packet.getChildElement().createCopy()); 268 reply.setID(packet.getID()); 269 reply.setTo(session.getAddress()); 270 reply.setFrom(packet.getTo()); 271 reply.setError(PacketError.Condition.not_allowed); 272 session.process(reply); 273 if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) { 275 Message notification = new Message(); 277 notification.setTo(session.getAddress()); 278 notification.setFrom(packet.getTo()); 279 notification.setBody(e.getRejectionMessage()); 280 session.process(notification); 281 } 282 } 283 } 284 285 296 protected void processPresence(Presence packet) throws UnauthorizedException { 297 try { 298 InterceptorManager.getInstance().invokeInterceptors(packet, session, true, 300 false); 301 router.route(packet); 302 InterceptorManager.getInstance().invokeInterceptors(packet, session, true, 304 true); 305 session.incrementClientPacketCount(); 306 } 307 catch (PacketRejectedException e) { 308 Presence reply = new Presence(); 310 reply.setID(packet.getID()); 311 reply.setTo(session.getAddress()); 312 reply.setFrom(packet.getTo()); 313 reply.setError(PacketError.Condition.not_allowed); 314 session.process(reply); 315 if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) { 317 Message notification = new Message(); 319 notification.setTo(session.getAddress()); 320 notification.setFrom(packet.getTo()); 321 notification.setBody(e.getRejectionMessage()); 322 session.process(notification); 323 } 324 } 325 } 326 327 338 protected void processMessage(Message packet) throws UnauthorizedException { 339 try { 340 InterceptorManager.getInstance().invokeInterceptors(packet, session, true, 342 false); 343 router.route(packet); 344 InterceptorManager.getInstance().invokeInterceptors(packet, session, true, 346 true); 347 session.incrementClientPacketCount(); 348 } 349 catch (PacketRejectedException e) { 350 if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) { 352 Message reply = new Message(); 354 reply.setID(packet.getID()); 355 reply.setTo(session.getAddress()); 356 reply.setFrom(packet.getTo()); 357 reply.setType(packet.getType()); 358 reply.setThread(packet.getThread()); 359 reply.setBody(e.getRejectionMessage()); 360 session.process(reply); 361 } 362 } 363 } 364 365 373 abstract boolean processUnknowPacket(Element doc); 374 375 private IQ getIQ(Element doc) { 376 Element query = doc.element("query"); 377 if (query != null && "jabber:iq:roster".equals(query.getNamespaceURI())) { 378 return new Roster(doc); 379 } 380 else { 381 return new IQ(doc); 382 } 383 } 384 385 394 private void createSession() throws UnauthorizedException, XmlPullParserException, IOException { 395 XmlPullParser xpp = reader.getXPPParser(); 396 for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) { 397 eventType = xpp.next(); 398 } 399 400 if (!createSession(xpp.getNamespace(null))) { 402 Writer writer = connection.getWriter(); 405 StringBuilder sb = new StringBuilder (); 406 sb.append("<?xml version='1.0' encoding='"); 407 sb.append(CHARSET); 408 sb.append("'?>"); 409 StreamError error = new StreamError(StreamError.Condition.bad_namespace_prefix); 411 sb.append(error.toXML()); 412 writer.write(sb.toString()); 413 writer.flush(); 414 connection.close(); 416 } 417 } 418 419 424 protected void shutdown() { 425 } 426 427 436 abstract boolean createSession(String namespace) throws UnauthorizedException, 437 XmlPullParserException, IOException ; 438 } 439 | Popular Tags |