KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jivesoftware > smack > PacketWriter


1 /**
2  * $RCSfile$
3  * $Revision: 2732 $
4  * $Date: 2005-08-26 23:29:04 -0300 (Fri, 26 Aug 2005) $
5  *
6  * Copyright 2003-2004 Jive Software.
7  *
8  * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */

20
21 package org.jivesoftware.smack;
22
23 import java.util.*;
24 import java.io.*;
25
26 import org.jivesoftware.smack.filter.PacketFilter;
27 import org.jivesoftware.smack.packet.Packet;
28
29 /**
30  * Writes packets to a XMPP server.
31  *
32  * @author Matt Tucker
33  */

34 class PacketWriter {
35
36     private Thread JavaDoc writerThread;
37     private Writer writer;
38     private XMPPConnection connection;
39     private LinkedList queue;
40     private boolean done = false;
41     
42     private List listeners = new ArrayList();
43     private boolean listenersDeleted = false;
44     private Thread JavaDoc listenerThread;
45     private LinkedList sentPackets = new LinkedList();
46
47     /**
48      * Creates a new packet writer with the specified connection.
49      *
50      * @param connection the connection.
51      */

52     protected PacketWriter(XMPPConnection connection) {
53         this.connection = connection;
54         this.writer = connection.writer;
55         this.queue = new LinkedList();
56
57         writerThread = new Thread JavaDoc() {
58             public void run() {
59                 writePackets();
60             }
61         };
62         writerThread.setName("Smack Packet Writer");
63         writerThread.setDaemon(true);
64
65         listenerThread = new Thread JavaDoc() {
66             public void run() {
67                 processListeners();
68             }
69         };
70         listenerThread.setName("Smack Writer Listener Processor");
71         listenerThread.setDaemon(true);
72
73         // Schedule a keep-alive task to run if the feature is enabled. will write
74
// out a space character each time it runs to keep the TCP/IP connection open.
75
int keepAliveInterval = SmackConfiguration.getKeepAliveInterval();
76         if (keepAliveInterval > 0) {
77             Thread JavaDoc keepAliveThread = new Thread JavaDoc(new KeepAliveTask(keepAliveInterval));
78             keepAliveThread.setDaemon(true);
79             keepAliveThread.start();
80         }
81     }
82
83     /**
84      * Sends the specified packet to the server.
85      *
86      * @param packet the packet to send.
87      */

88     public void sendPacket(Packet packet) {
89         if (!done) {
90             synchronized(queue) {
91                 queue.addFirst(packet);
92                 queue.notifyAll();
93             }
94             // Add the sent packet to the list of sent packets. The
95
// PacketWriterListeners will be notified of the new packet.
96
synchronized(sentPackets) {
97                 sentPackets.addFirst(packet);
98                 sentPackets.notifyAll();
99             }
100         }
101     }
102
103     /**
104      * Registers a packet listener with this writer. The listener will be
105      * notified of every packet that this writer sends. A packet filter determines
106      * which packets will be delivered to the listener.
107      *
108      * @param packetListener the packet listener to notify of sent packets.
109      * @param packetFilter the packet filter to use.
110      */

111     public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) {
112         synchronized (listeners) {
113             listeners.add(new ListenerWrapper(packetListener, packetFilter));
114         }
115     }
116
117     /**
118      * Removes a packet listener.
119      *
120      * @param packetListener the packet listener to remove.
121      */

122     public void removePacketListener(PacketListener packetListener) {
123         synchronized (listeners) {
124             for (int i=0; i<listeners.size(); i++) {
125                 ListenerWrapper wrapper = (ListenerWrapper)listeners.get(i);
126                 if (wrapper != null && wrapper.packetListener.equals(packetListener)) {
127                     listeners.set(i, null);
128                     // Set the flag to indicate that the listener list needs
129
// to be cleaned up.
130
listenersDeleted = true;
131                 }
132             }
133         }
134     }
135
136     /**
137      * Returns the number of registered packet listeners.
138      *
139      * @return the count of packet listeners.
140      */

141     public int getPacketListenerCount() {
142         synchronized (listeners) {
143             return listeners.size();
144         }
145     }
146
147     /**
148      * Starts the packet writer thread and opens a connection to the server. The
149      * packet writer will continue writing packets until {@link #shutdown} or an
150      * error occurs.
151      */

152     public void startup() {
153         writerThread.start();
154         listenerThread.start();
155     }
156
157     void setWriter(Writer writer) {
158         this.writer = writer;
159     }
160
161     /**
162      * Shuts down the packet writer. Once this method has been called, no further
163      * packets will be written to the server.
164      */

165     public void shutdown() {
166         done = true;
167     }
168
169     /**
170      * Returns the next available packet from the queue for writing.
171      *
172      * @return the next packet for writing.
173      */

174     private Packet nextPacket() {
175         synchronized(queue) {
176             while (!done && queue.size() == 0) {
177                 try {
178                     queue.wait(2000);
179                 }
180                 catch (InterruptedException JavaDoc ie) { }
181             }
182             if (queue.size() > 0) {
183                 return (Packet)queue.removeLast();
184             }
185             else {
186                 return null;
187             }
188         }
189     }
190
191     private void writePackets() {
192         try {
193             // Open the stream.
194
openStream();
195             // Write out packets from the queue.
196
while (!done) {
197                 Packet packet = nextPacket();
198                 if (packet != null) {
199                     synchronized (writer) {
200                         writer.write(packet.toXML());
201                         writer.flush();
202                     }
203                 }
204             }
205             // Close the stream.
206
try {
207                 writer.write("</stream:stream>");
208                 writer.flush();
209             }
210             catch (Exception JavaDoc e) { }
211             finally {
212                 try {
213                     writer.close();
214                 }
215                 catch (Exception JavaDoc e) { }
216             }
217         }
218         catch (IOException ioe){
219             if (!done) {
220                 done = true;
221                 connection.packetReader.notifyConnectionError(ioe);
222             }
223         }
224     }
225
226     /**
227      * Process listeners.
228      */

229     private void processListeners() {
230         while (!done) {
231             Packet sentPacket;
232             // Wait until a new packet has been sent
233
synchronized (sentPackets) {
234                 while (!done && sentPackets.size() == 0) {
235                     try {
236                         sentPackets.wait(2000);
237                     }
238                     catch (InterruptedException JavaDoc ie) { }
239                 }
240                 if (sentPackets.size() > 0) {
241                     sentPacket = (Packet)sentPackets.removeLast();
242                 }
243                 else {
244                     sentPacket = null;
245                 }
246             }
247             if (sentPacket != null) {
248                 // Clean up null entries in the listeners list if the flag is set. List
249
// removes are done seperately so that the main notification process doesn't
250
// need to synchronize on the list.
251
synchronized (listeners) {
252                     if (listenersDeleted) {
253                         for (int i=listeners.size()-1; i>=0; i--) {
254                             if (listeners.get(i) == null) {
255                                 listeners.remove(i);
256                             }
257                         }
258                         listenersDeleted = false;
259                     }
260                 }
261                 // Notify the listeners of the new sent packet
262
int size = listeners.size();
263                 for (int i=0; i<size; i++) {
264                     ListenerWrapper listenerWrapper = (ListenerWrapper)listeners.get(i);
265                     if (listenerWrapper != null) {
266                         listenerWrapper.notifyListener(sentPacket);
267                     }
268                 }
269             }
270         }
271     }
272
273     /**
274      * Sends to the server a new stream element. This operation may be requested several times
275      * so we need to encapsulate the logic in one place. This message will be sent while doing
276      * TLS, SASL and resource binding.
277      *
278      * @throws IOException If an error occurs while sending the stanza to the server.
279      */

280     void openStream() throws IOException {
281         StringBuffer JavaDoc stream = new StringBuffer JavaDoc();
282         stream.append("<stream:stream");
283         stream.append(" to=\"").append(connection.serviceName).append("\"");
284         stream.append(" xmlns=\"jabber:client\"");
285         stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
286         stream.append(" version=\"1.0\">");
287         writer.write(stream.toString());
288         writer.flush();
289     }
290
291     /**
292      * A wrapper class to associate a packet filter with a listener.
293      */

294     private static class ListenerWrapper {
295
296         private PacketListener packetListener;
297         private PacketFilter packetFilter;
298
299         public ListenerWrapper(PacketListener packetListener,
300                                PacketFilter packetFilter)
301         {
302             this.packetListener = packetListener;
303             this.packetFilter = packetFilter;
304         }
305
306         public boolean equals(Object JavaDoc object) {
307             if (object == null) {
308                 return false;
309             }
310             if (object instanceof ListenerWrapper) {
311                 return ((ListenerWrapper)object).packetListener.equals(this.packetListener);
312             }
313             else if (object instanceof PacketListener) {
314                 return object.equals(this.packetListener);
315             }
316             return false;
317         }
318
319         public void notifyListener(Packet packet) {
320             if (packetFilter == null || packetFilter.accept(packet)) {
321                 packetListener.processPacket(packet);
322             }
323         }
324     }
325
326     /**
327      * A TimerTask that keeps connections to the server alive by sending a space
328      * character on an interval.
329      */

330     private class KeepAliveTask implements Runnable JavaDoc {
331
332         private int delay;
333
334         public KeepAliveTask(int delay) {
335             this.delay = delay;
336         }
337
338         public void run() {
339             while (!done) {
340                 synchronized (writer) {
341                     try {
342                         writer.write(" ");
343                         writer.flush();
344                     }
345                     catch (Exception JavaDoc e) { }
346                 }
347                 try {
348                     // Sleep until we should write the next keep-alive.
349
Thread.sleep(delay);
350                 }
351                 catch (InterruptedException JavaDoc ie) { }
352             }
353         }
354     }
355 }
Popular Tags