KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > servant > AbstractProxyPushSupplier


1 /*
2  * JacORB - a free Java ORB
3  *
4  * Copyright (C) 1999-2004 Gerald Brose
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the Free
18  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19  *
20  */

21
22 package org.jacorb.notification.servant;
23
24 import org.apache.avalon.framework.configuration.Configuration;
25 import org.apache.avalon.framework.configuration.ConfigurationException;
26 import org.jacorb.notification.OfferManager;
27 import org.jacorb.notification.SubscriptionManager;
28 import org.jacorb.notification.conf.Attributes;
29 import org.jacorb.notification.conf.Default;
30 import org.jacorb.notification.engine.AbstractRetryStrategy;
31 import org.jacorb.notification.engine.PushOperation;
32 import org.jacorb.notification.engine.PushTaskExecutor;
33 import org.jacorb.notification.engine.PushTaskExecutorFactory;
34 import org.jacorb.notification.engine.RetryException;
35 import org.jacorb.notification.engine.RetryStrategy;
36 import org.jacorb.notification.engine.RetryStrategyFactory;
37 import org.jacorb.notification.engine.TaskProcessor;
38 import org.jacorb.notification.interfaces.IProxyPushSupplier;
39 import org.jacorb.util.ObjectUtil;
40 import org.omg.CORBA.ORB JavaDoc;
41 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
42 import org.omg.PortableServer.POA JavaDoc;
43 import org.picocontainer.MutablePicoContainer;
44 import org.picocontainer.defaults.DefaultPicoContainer;
45
46 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
47
48 public abstract class AbstractProxyPushSupplier extends AbstractProxySupplier implements
49         IProxyPushSupplier
50 {
51     private final RetryStrategyFactory retryStrategyFactory_;
52
53     /**
54      * flag to indicate that this ProxySupplier should invoke remote calls (push) during
55      * deliverMessage.
56      */

57     private final SynchronizedBoolean enabled_ = new SynchronizedBoolean(true);
58
59     private final PushTaskExecutor pushTaskExecutor_;
60
61     private final PushTaskExecutor.PushTask pushTask_ = new PushTaskExecutor.PushTask()
62     {
63         public void doPush()
64         {
65             pushPendingData();
66         }
67         
68         public void cancel()
69         {
70             // ignore, only depends on settings of ProxyPushSupplier
71
}
72     };
73
74     public AbstractProxyPushSupplier(IAdmin admin, ORB JavaDoc orb, POA JavaDoc poa, Configuration conf,
75             TaskProcessor taskProcessor, PushTaskExecutorFactory pushTaskExecutorFactory,
76             OfferManager offerManager, SubscriptionManager subscriptionManager,
77             ConsumerAdmin consumerAdmin) throws ConfigurationException
78     {
79         super(admin, orb, poa, conf, taskProcessor, offerManager, subscriptionManager,
80                 consumerAdmin);
81
82         pushTaskExecutor_ = pushTaskExecutorFactory.newExecutor(this);
83
84         retryStrategyFactory_ = newRetryStrategyFactory(conf, taskProcessor);
85     }
86
87     protected void handleFailedPushOperation(PushOperation operation, Throwable JavaDoc error)
88     {
89         if (AbstractRetryStrategy.isFatalException(error))
90         {
91             // push operation caused a fatal exception
92
// destroy the ProxySupplier
93
if (logger_.isErrorEnabled())
94             {
95                 logger_.error("push raised " + error + ": will destroy ProxySupplier, "
96                         + "disconnect Consumer", error);
97             }
98
99             operation.dispose();
100             destroy();
101
102             return;
103         }
104
105         if (!isDisposed())
106         {
107             RetryStrategy _retry = newRetryStrategy(this, operation);
108
109             try
110             {
111                 _retry.retry();
112             } catch (RetryException e)
113             {
114                 logger_.error("retry failed", e);
115
116                 _retry.dispose();
117                 destroy();
118             }
119         }
120     }
121
122     private RetryStrategy newRetryStrategy(IProxyPushSupplier pushSupplier,
123             PushOperation pushOperation)
124     {
125         return retryStrategyFactory_.newRetryStrategy(pushSupplier, pushOperation);
126     }
127
128     private RetryStrategyFactory newRetryStrategyFactory(Configuration config,
129             TaskProcessor taskProcessor) throws ConfigurationException
130     {
131         String JavaDoc factoryName = config.getAttribute(Attributes.RETRY_STRATEGY_FACTORY,
132                 Default.DEFAULT_RETRY_STRATEGY_FACTORY);
133
134         try
135         {
136             Class JavaDoc factoryClazz = ObjectUtil.classForName(factoryName);
137
138             MutablePicoContainer pico = new DefaultPicoContainer();
139
140             pico.registerComponentInstance(TaskProcessor.class, taskProcessor);
141
142             pico.registerComponentImplementation(RetryStrategyFactory.class, factoryClazz);
143
144             pico.registerComponentInstance(config);
145
146             return (RetryStrategyFactory) pico.getComponentInstance(RetryStrategyFactory.class);
147
148         } catch (ClassNotFoundException JavaDoc e)
149         {
150             throw new ConfigurationException(Attributes.RETRY_STRATEGY_FACTORY, e);
151         }
152     }
153
154     public final void schedulePush()
155     {
156         if (!isDisposed() && !isSuspended() && isEnabled())
157         {
158             schedulePush(pushTask_);
159         }
160     }
161
162     public final void schedulePush(PushTaskExecutor.PushTask pushTask)
163     {
164         pushTaskExecutor_.executePush(pushTask);
165     }
166
167     public final void messageDelivered()
168     {
169         if (isEnabled())
170         {
171             schedulePush();
172         }
173     }
174
175     public void resetErrorCounter()
176     {
177         super.resetErrorCounter();
178
179         enableDelivery();
180     }
181
182     public void disableDelivery()
183     {
184         logger_.debug("Disable Delivery to ProxySupplier");
185
186         enabled_.set(false);
187     }
188
189     protected boolean isEnabled()
190     {
191         return enabled_.get();
192     }
193
194     private void enableDelivery()
195     {
196         logger_.debug("Enable Delivery to ProxySupplier");
197
198         enabled_.set(true);
199     }
200 }
201
Popular Tags