1 20 21 package org.jivesoftware.smack; 22 23 import java.util.*; 24 import java.io.*; 25 26 import org.jivesoftware.smack.filter.PacketFilter; 27 import org.jivesoftware.smack.packet.Packet; 28 29 34 class PacketWriter { 35 36 private Thread writerThread; 37 private Writer writer; 38 private XMPPConnection connection; 39 private LinkedList queue; 40 private boolean done = false; 41 42 private List listeners = new ArrayList(); 43 private boolean listenersDeleted = false; 44 private Thread listenerThread; 45 private LinkedList sentPackets = new LinkedList(); 46 47 52 protected PacketWriter(XMPPConnection connection) { 53 this.connection = connection; 54 this.writer = connection.writer; 55 this.queue = new LinkedList(); 56 57 writerThread = new Thread () { 58 public void run() { 59 writePackets(); 60 } 61 }; 62 writerThread.setName("Smack Packet Writer"); 63 writerThread.setDaemon(true); 64 65 listenerThread = new Thread () { 66 public void run() { 67 processListeners(); 68 } 69 }; 70 listenerThread.setName("Smack Writer Listener Processor"); 71 listenerThread.setDaemon(true); 72 73 int keepAliveInterval = SmackConfiguration.getKeepAliveInterval(); 76 if (keepAliveInterval > 0) { 77 Thread keepAliveThread = new Thread (new KeepAliveTask(keepAliveInterval)); 78 keepAliveThread.setDaemon(true); 79 keepAliveThread.start(); 80 } 81 } 82 83 88 public void sendPacket(Packet packet) { 89 if (!done) { 90 synchronized(queue) { 91 queue.addFirst(packet); 92 queue.notifyAll(); 93 } 94 synchronized(sentPackets) { 97 sentPackets.addFirst(packet); 98 sentPackets.notifyAll(); 99 } 100 } 101 } 102 103 111 public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) { 112 synchronized (listeners) { 113 listeners.add(new ListenerWrapper(packetListener, packetFilter)); 114 } 115 } 116 117 122 public void removePacketListener(PacketListener packetListener) { 123 synchronized (listeners) { 124 for (int i=0; i<listeners.size(); i++) { 125 ListenerWrapper wrapper = (ListenerWrapper)listeners.get(i); 126 if (wrapper != null && wrapper.packetListener.equals(packetListener)) { 127 listeners.set(i, null); 128 listenersDeleted = true; 131 } 132 } 133 } 134 } 135 136 141 public int getPacketListenerCount() { 142 synchronized (listeners) { 143 return listeners.size(); 144 } 145 } 146 147 152 public void startup() { 153 writerThread.start(); 154 listenerThread.start(); 155 } 156 157 void setWriter(Writer writer) { 158 this.writer = writer; 159 } 160 161 165 public void shutdown() { 166 done = true; 167 } 168 169 174 private Packet nextPacket() { 175 synchronized(queue) { 176 while (!done && queue.size() == 0) { 177 try { 178 queue.wait(2000); 179 } 180 catch (InterruptedException ie) { } 181 } 182 if (queue.size() > 0) { 183 return (Packet)queue.removeLast(); 184 } 185 else { 186 return null; 187 } 188 } 189 } 190 191 private void writePackets() { 192 try { 193 openStream(); 195 while (!done) { 197 Packet packet = nextPacket(); 198 if (packet != null) { 199 synchronized (writer) { 200 writer.write(packet.toXML()); 201 writer.flush(); 202 } 203 } 204 } 205 try { 207 writer.write("</stream:stream>"); 208 writer.flush(); 209 } 210 catch (Exception e) { } 211 finally { 212 try { 213 writer.close(); 214 } 215 catch (Exception e) { } 216 } 217 } 218 catch (IOException ioe){ 219 if (!done) { 220 done = true; 221 connection.packetReader.notifyConnectionError(ioe); 222 } 223 } 224 } 225 226 229 private void processListeners() { 230 while (!done) { 231 Packet sentPacket; 232 synchronized (sentPackets) { 234 while (!done && sentPackets.size() == 0) { 235 try { 236 sentPackets.wait(2000); 237 } 238 catch (InterruptedException ie) { } 239 } 240 if (sentPackets.size() > 0) { 241 sentPacket = (Packet)sentPackets.removeLast(); 242 } 243 else { 244 sentPacket = null; 245 } 246 } 247 if (sentPacket != null) { 248 synchronized (listeners) { 252 if (listenersDeleted) { 253 for (int i=listeners.size()-1; i>=0; i--) { 254 if (listeners.get(i) == null) { 255 listeners.remove(i); 256 } 257 } 258 listenersDeleted = false; 259 } 260 } 261 int size = listeners.size(); 263 for (int i=0; i<size; i++) { 264 ListenerWrapper listenerWrapper = (ListenerWrapper)listeners.get(i); 265 if (listenerWrapper != null) { 266 listenerWrapper.notifyListener(sentPacket); 267 } 268 } 269 } 270 } 271 } 272 273 280 void openStream() throws IOException { 281 StringBuffer stream = new StringBuffer (); 282 stream.append("<stream:stream"); 283 stream.append(" to=\"").append(connection.serviceName).append("\""); 284 stream.append(" xmlns=\"jabber:client\""); 285 stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\""); 286 stream.append(" version=\"1.0\">"); 287 writer.write(stream.toString()); 288 writer.flush(); 289 } 290 291 294 private static class ListenerWrapper { 295 296 private PacketListener packetListener; 297 private PacketFilter packetFilter; 298 299 public ListenerWrapper(PacketListener packetListener, 300 PacketFilter packetFilter) 301 { 302 this.packetListener = packetListener; 303 this.packetFilter = packetFilter; 304 } 305 306 public boolean equals(Object object) { 307 if (object == null) { 308 return false; 309 } 310 if (object instanceof ListenerWrapper) { 311 return ((ListenerWrapper)object).packetListener.equals(this.packetListener); 312 } 313 else if (object instanceof PacketListener) { 314 return object.equals(this.packetListener); 315 } 316 return false; 317 } 318 319 public void notifyListener(Packet packet) { 320 if (packetFilter == null || packetFilter.accept(packet)) { 321 packetListener.processPacket(packet); 322 } 323 } 324 } 325 326 330 private class KeepAliveTask implements Runnable { 331 332 private int delay; 333 334 public KeepAliveTask(int delay) { 335 this.delay = delay; 336 } 337 338 public void run() { 339 while (!done) { 340 synchronized (writer) { 341 try { 342 writer.write(" "); 343 writer.flush(); 344 } 345 catch (Exception e) { } 346 } 347 try { 348 Thread.sleep(delay); 350 } 351 catch (InterruptedException ie) { } 352 } 353 } 354 } 355 } | Popular Tags |