KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > axis > ime > internal > MessageExchangeProvider


1 /*
2  * Copyright 2001-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.axis.ime.internal;
18
19 import org.apache.axis.AxisFault;
20 import org.apache.axis.Handler;
21 import org.apache.axis.components.logger.LogFactory;
22 import org.apache.axis.components.threadpool.ThreadPool;
23 import org.apache.axis.i18n.Messages;
24 import org.apache.axis.ime.MessageExchange;
25 import org.apache.axis.ime.MessageExchangeEventListener;
26 import org.apache.axis.ime.MessageExchangeFactory;
27 import org.apache.axis.ime.event.MessageSendEvent;
28 import org.apache.axis.ime.internal.util.KeyedBuffer;
29 import org.apache.axis.ime.internal.util.NonPersistentKeyedBuffer;
30 import org.apache.commons.logging.Log;
31
32 import java.util.Hashtable JavaDoc;
33
34 /**
35  * @author James M Snell (jasnell@us.ibm.com)
36  * @author Ray Chun (rchun@sonicsoftware.com)
37  */

38 public abstract class MessageExchangeProvider
39         implements MessageExchangeFactory {
40
41     protected static Log log =
42         LogFactory.getLog(MessageExchangeProvider.class.getName());
43
44     public static final long SELECT_TIMEOUT = 1000 * 30;
45     public static final long DEFAULT_THREAD_COUNT = 5;
46
47     protected final ThreadPool WORKERS = new ThreadPool();
48     protected final KeyedBuffer SEND = new NonPersistentKeyedBuffer(WORKERS);
49     protected final KeyedBuffer RECEIVE = new NonPersistentKeyedBuffer(WORKERS);
50     protected final KeyedBuffer RECEIVE_REQUESTS = new NonPersistentKeyedBuffer(WORKERS);
51     protected Handler sendHandler = null;
52     protected Handler receiveHandler = null;
53
54     protected boolean initialized = false;
55
56     public Handler getSendHandler() {
57       return sendHandler;
58     }
59     
60     public Handler getReceiveHandler() {
61       return receiveHandler;
62     }
63
64     public void setSendHandler(Handler handler) {
65       this.sendHandler = handler;
66     }
67     
68     public void setReceiveHandler(Handler handler) {
69       this.receiveHandler = handler;
70     }
71
72     protected abstract MessageExchangeEventListener getMessageExchangeEventListener();
73
74     protected abstract ReceivedMessageDispatchPolicy getReceivedMessageDispatchPolicy();
75
76     public MessageExchange createMessageExchange()
77             throws AxisFault {
78         return new MessageExchangeImpl(this);
79     }
80
81     public MessageExchange createMessageExchange(
82             Hashtable JavaDoc options)
83             throws AxisFault {
84       MessageExchange msgex = new MessageExchangeImpl(this);
85       msgex.setOptions(options);
86       return msgex;
87     }
88             
89     public void cleanup()
90             throws InterruptedException JavaDoc {
91         if (log.isDebugEnabled()) {
92             log.debug("Enter: MessageExchangeProvider::cleanup");
93         }
94         WORKERS.cleanup();
95         if (log.isDebugEnabled()) {
96             log.debug("Exit: MessageExchangeProvider::cleanup");
97         }
98     }
99
100     public void init() {
101         init(DEFAULT_THREAD_COUNT);
102     }
103
104     public void init(long THREAD_COUNT) {
105         if (log.isDebugEnabled()) {
106             log.debug("Enter: MessageExchangeProvider::init");
107         }
108         if (initialized)
109             throw new IllegalStateException JavaDoc(Messages.getMessage("illegalStateException00"));
110         for (int n = 0; n < THREAD_COUNT; n++) {
111             WORKERS.addWorker(new MessageSender(WORKERS, SEND, getMessageExchangeEventListener(), getSendHandler()));
112             WORKERS.addWorker(new MessageReceiver(WORKERS, RECEIVE, getReceivedMessageDispatchPolicy(), getReceiveHandler()));
113         }
114         initialized = true;
115         if (log.isDebugEnabled()) {
116             log.debug("Exit: MessageExchangeProvider::init");
117         }
118     }
119     
120     public void processReceive(
121             MessageExchangeReceiveContext context) {
122         if (log.isDebugEnabled()) {
123             log.debug("Enter: MessageExchangeProvider::processReceive");
124         }
125         if (context.getMessageExchangeCorrelator() != null) {
126           RECEIVE_REQUESTS.put(
127             context.getMessageExchangeCorrelator(),
128             context);
129         } else {
130           RECEIVE_REQUESTS.put(
131             SimpleMessageExchangeCorrelator.NULL_CORRELATOR, context);
132         }
133         if (log.isDebugEnabled()) {
134             log.debug("Exit: MessageExchangeProvider::processReceive");
135         }
136     }
137     
138     public void processSend(
139             MessageExchangeSendContext context) {
140         if (log.isDebugEnabled()) {
141             log.debug("Enter: MessageExchangeProvider::processSend");
142         }
143         SEND.put(
144             context.getMessageExchangeCorrelator(),
145             context);
146         if (log.isDebugEnabled()) {
147             log.debug("Exit: MessageExchangeProvider::processSend");
148         }
149     }
150
151     public void shutdown() {
152         shutdown(false);
153     }
154
155     public void shutdown(boolean force) {
156         if (log.isDebugEnabled()) {
157             log.debug("Enter: MessageExchangeProvider::shutdown");
158         }
159         if (!force) {
160             WORKERS.safeShutdown();
161         } else {
162             WORKERS.shutdown();
163         }
164         if (log.isDebugEnabled()) {
165             log.debug("Exit: MessageExchangeProvider::shutdown");
166         }
167     }
168
169     public void awaitShutdown()
170             throws InterruptedException JavaDoc {
171         if (log.isDebugEnabled()) {
172             log.debug("Enter: MessageExchangeProvider::awaitShutdown");
173         }
174         WORKERS.awaitShutdown();
175         if (log.isDebugEnabled()) {
176             log.debug("Exit: MessageExchangeProvider::awaitShutdown");
177         }
178     }
179
180     public void awaitShutdown(long shutdown)
181             throws InterruptedException JavaDoc {
182         if (log.isDebugEnabled()) {
183             log.debug("Enter: MessageExchangeProvider::awaitShutdown");
184         }
185         WORKERS.awaitShutdown(shutdown);
186         if (log.isDebugEnabled()) {
187             log.debug("Exit: MessageExchangeProvider::awaitShutdown");
188         }
189     }
190
191     /**
192      * Unsupported for now
193      * @see org.apache.axis.ime.MessageExchange@setProperty(String,Object)
194      */

195     public void setOption(
196             String JavaDoc propertyId,
197             Object JavaDoc propertyValue) {
198         throw new UnsupportedOperationException JavaDoc(Messages.getMessage("unsupportedOperationException00"));
199     }
200
201     /**
202      * Unsupported for now
203      * @see org.apache.axis.ime.MessageExchange@getProperty(String)
204      */

205     public Object JavaDoc getOption(
206             String JavaDoc propertyId) {
207         throw new UnsupportedOperationException JavaDoc(Messages.getMessage("unsupportedOperationException00"));
208     }
209
210     /**
211      * Unsupported for now
212      * @see org.apache.axis.ime.MessageExchange@getProperty(String,Object)
213      */

214     public Object JavaDoc getOption(
215             String JavaDoc propertyId,
216             Object JavaDoc defaultValue) {
217         throw new UnsupportedOperationException JavaDoc(Messages.getMessage("unsupportedOperationException00"));
218     }
219
220     /**
221      * Unsupported for now
222      * @see org.apache.axis.ime.MessageExchange@getProperties()
223      */

224     public Hashtable JavaDoc getOptions() {
225         throw new UnsupportedOperationException JavaDoc(Messages.getMessage("unsupportedOperationException00"));
226     }
227
228     /**
229      * Unsupported for now
230      * @see org.apache.axis.ime.MessageExchange@setProperties(java.langHashtable)
231      */

232     public void setOptions(Hashtable JavaDoc properties) {
233         throw new UnsupportedOperationException JavaDoc(Messages.getMessage("unsupportedOperationException00"));
234     }
235
236     /**
237      * Unsupported for now
238      * @see org.apache.axis.ime.MessageExchange@clearProperties()
239      */

240     public void clearOptions() {
241         throw new UnsupportedOperationException JavaDoc(Messages.getMessage("unsupportedOperationException00"));
242     }
243
244   // -- Worker Classes --- //
245
public static class MessageReceiver
246             implements Runnable JavaDoc {
247         
248         protected static Log log =
249             LogFactory.getLog(MessageReceiver.class.getName());
250         
251         protected ThreadPool pool;
252         protected KeyedBuffer channel;
253         protected ReceivedMessageDispatchPolicy policy;
254         protected Handler handler;
255     
256         protected MessageReceiver(
257                 ThreadPool pool,
258                 KeyedBuffer channel,
259                 ReceivedMessageDispatchPolicy policy,
260                 Handler handler) {
261             this.pool = pool;
262             this.channel = channel;
263             this.policy = policy;
264             this.handler = handler;
265         }
266     
267         /**
268          * @see java.lang.Runnable#run()
269          */

270         public void run() {
271             if (log.isDebugEnabled()) {
272                 log.debug("Enter: MessageExchangeProvider.MessageReceiver::run");
273             }
274             try {
275                 while (!pool.isShuttingDown()) {
276                     MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT);
277                     if (context != null) {
278                       if (handler != null)
279                         handler.invoke(context.getMessageContext());
280                       policy.dispatch(context);
281                     }
282                 }
283             } catch (Throwable JavaDoc t) {
284                 log.error(Messages.getMessage("fault00"), t);
285             } finally {
286                 pool.workerDone(this,true);
287                 if (log.isDebugEnabled()) {
288                     log.debug("Exit: MessageExchangeProvider.MesageReceiver::run");
289                 }
290             }
291         }
292     
293     }
294
295
296
297     public static class MessageSender
298             implements Runnable JavaDoc {
299
300         protected static Log log =
301             LogFactory.getLog(MessageReceiver.class.getName());
302     
303         protected ThreadPool pool;
304         protected KeyedBuffer channel;
305         protected MessageExchangeEventListener listener;
306         protected Handler handler;
307     
308         protected MessageSender(
309                 ThreadPool pool,
310                 KeyedBuffer channel,
311                 MessageExchangeEventListener listener,
312                 Handler handler) {
313             this.pool = pool;
314             this.channel = channel;
315             this.listener = listener;
316             this.handler = handler;
317         }
318         
319         /**
320          * @see java.lang.Runnable#run()
321          */

322         public void run() {
323             if (log.isDebugEnabled()) {
324                 log.debug("Enter: MessageExchangeProvider.MessageSender::run");
325             }
326             try {
327                 while (!pool.isShuttingDown()) {
328                     MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT);
329                     if (context != null) {
330                       if (handler != null)
331                         handler.invoke(context.getMessageContext());
332                       
333                       MessageSendEvent sendEvent = new MessageSendEvent(
334                             context.getMessageExchangeCorrelator(),
335                             context,
336                             context.getMessageContext());
337                       listener.onEvent(sendEvent);
338                     }
339                 }
340             } catch (Throwable JavaDoc t) {
341                 log.error(Messages.getMessage("fault00"), t);
342             } finally {
343                 pool.workerDone(this,true);
344                 if (log.isDebugEnabled()) {
345                     log.debug("Exit: MessageExchangeProvider.MessageSender::run");
346                 }
347             }
348         }
349     
350     }
351
352 }
353
Popular Tags