KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jivesoftware > messenger > Channel


1 /**
2  * $RCSfile: Channel.java,v $
3  * $Revision: 1.5 $
4  * $Date: 2004/12/03 08:59:50 $
5  *
6  * Copyright (C) 2004 Jive Software. All rights reserved.
7  *
8  * This software is published under the terms of the GNU Public License (GPL),
9  * a copy of which is included in this distribution.
10  */

11
12 package org.jivesoftware.messenger;
13
14 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
15 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
16 import java.util.concurrent.TimeUnit JavaDoc;
17 import org.jivesoftware.util.LocaleUtils;
18 import org.jivesoftware.util.Log;
19 import org.xmpp.packet.Packet;
20
21 /**
22  * A channel provides a mechanism to queue work units for processing. Each work unit is
23  * encapsulated as a ChannelMessage, and processing of each message is performed by a
24  * ChannelHandler.<p>
25  *
26  * As a request is handled by the system, it will travel through a sequence of channels.
27  * This architecture has a number of advantages:
28  * <ul>
29  * <li> Each request doesn't need to correspond to a thread. Instead, a thread pool
30  * in each channel processes requests from a queue.
31  * <li> Due to the queue at each channel, the system is much better able to respond
32  * to load spikes.
33  * </ul><p>
34  *
35  * Channels are modeled after SEDA stages. For much much more in-depth architecture information,
36  * refer to the <a HREF="http://www.cs.berkeley.edu/~mdw/proj/sandstorm/">SEDA website</a>.
37  *
38  * @author Matt Tucker
39  */

40 public class Channel<T extends Packet> {
41
42     private String JavaDoc name;
43     private ChannelHandler channelHandler;
44
45     ThreadPoolExecutor JavaDoc executor;
46
47     /**
48      * Creates a new channel. The channel should be registered after it's created.
49      *
50      * @param name the name of the channel.
51      * @param channelHandler the handler for this channel.
52      */

53     public Channel(String JavaDoc name, ChannelHandler<T> channelHandler) {
54         this.name = name;
55         this.channelHandler = channelHandler;
56
57         executor = new ThreadPoolExecutor JavaDoc(1, 8, 15, TimeUnit.SECONDS, new LinkedBlockingQueue JavaDoc());
58     }
59
60     /**
61      * Returns the name of the channel.
62      *
63      * @return the name of the channel.
64      */

65     public String JavaDoc getName() {
66         return name;
67     }
68
69     /**
70      * Enqueus a message to be handled by this channel. After the ChannelHandler is done
71      * processing the message, it will be sent to the next channel. Messages with a higher
72      * priority will be handled first.
73      *
74      * @param packet an XMPP packet to add to the channel for processing.
75      */

76     public void add(final T packet) {
77         Runnable JavaDoc r = new Runnable JavaDoc() {
78             public void run() {
79                 try {
80                     channelHandler.process(packet);
81                 }
82                 catch (Exception JavaDoc e) {
83                     Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
84                    
85                         try {
86                             Session session = SessionManager.getInstance().getSession(packet.getFrom());
87                             session.getConnection().close();
88                         }
89                         catch (Exception JavaDoc e1) {
90                            Log.error(e1);
91                         }
92                 }
93             }
94         };
95         executor.execute(r);
96     }
97
98     /**
99      * Returns true if the channel is currently running. The channel can be started and
100      * stopped by calling the start() and stop() methods.
101      *
102      * @return true if the channel is running.
103      */

104     public boolean isRunning() {
105         return !executor.isShutdown();
106     }
107
108     /**
109      * Starts the channel, which means that worker threads will start processing messages
110      * from the queue. If the server isn't running, messages can still be enqueued.
111      */

112     public void start() {
113
114     }
115
116     /**
117      * Stops the channel, which means that worker threads will stop processing messages from
118      * the queue. If the server isn't running, messages can still be enqueued.
119      */

120     public synchronized void stop() {
121         executor.shutdown();
122     }
123
124     /**
125      * Returns the number of currently active worker threads in the channel. This value
126      * will always fall in between the min a max thread count.
127      *
128      * @return the current number of worker threads.
129      */

130     public int getThreadCount() {
131         return executor.getPoolSize();
132     }
133
134     /**
135      * Returns the min number of threads the channel will use for processing messages.
136      * The channel will automatically de-allocate worker threads as the queue load shrinks,
137      * down to the defined minimum. This lets the channel consume fewer resources when load
138      * is low.
139      *
140      * @return the min number of threads that can be used by the channel.
141      */

142     public int getMinThreadCount() {
143         return executor.getCorePoolSize();
144     }
145
146     /**
147      * Sets the min number of threads the channel will use for processing messages.
148      * The channel will automatically de-allocate worker threads as the queue load shrinks,
149      * down to the defined minimum. This lets the channel consume fewer resources when load
150      * is low.
151      *
152      * @param minThreadCount the min number of threads that can be used by the channel.
153      */

154     public void setMinThreadCount(int minThreadCount) {
155         executor.setCorePoolSize(minThreadCount);
156     }
157
158     /**
159      * Returns the max number of threads the channel will use for processing messages. The
160      * channel will automatically allocate new worker threads as the queue load grows, up to the
161      * defined maximum. This lets the channel meet higher concurrency needs, but prevents too
162      * many threads from being allocated, which decreases overall system performance.
163      *
164      * @return the max number of threads that can be used by the channel.
165      */

166     public int getMaxThreadCount() {
167         return executor.getMaximumPoolSize();
168     }
169
170     /**
171      * Sets the max number of threads the channel will use for processing messages. The channel
172      * will automatically allocate new worker threads as the queue size grows, up to the defined
173      * maximum. This lets the channel meet higher concurrency needs, but prevents too many threads
174      * from being allocated, which decreases overall system performance.
175      *
176      * @param maxThreadCount the max number of threads that can be used by the channel.
177      */

178     public void setMaxThreadCount(int maxThreadCount) {
179         executor.setMaximumPoolSize(maxThreadCount);
180     }
181
182     /**
183      * Returns the current number of ChannelMessage objects waiting to be processed by
184      * the channel.
185      *
186      * @return the current number of elements in the processing queue.
187      */

188     public int getQueueSize() {
189         return executor.getQueue().size();
190     }
191 }
Popular Tags