1 21 22 package org.jacorb.notification.servant; 23 24 import org.apache.avalon.framework.configuration.Configuration; 25 import org.apache.avalon.framework.configuration.ConfigurationException; 26 import org.jacorb.notification.OfferManager; 27 import org.jacorb.notification.SubscriptionManager; 28 import org.jacorb.notification.conf.Attributes; 29 import org.jacorb.notification.conf.Default; 30 import org.jacorb.notification.engine.AbstractRetryStrategy; 31 import org.jacorb.notification.engine.PushOperation; 32 import org.jacorb.notification.engine.PushTaskExecutor; 33 import org.jacorb.notification.engine.PushTaskExecutorFactory; 34 import org.jacorb.notification.engine.RetryException; 35 import org.jacorb.notification.engine.RetryStrategy; 36 import org.jacorb.notification.engine.RetryStrategyFactory; 37 import org.jacorb.notification.engine.TaskProcessor; 38 import org.jacorb.notification.interfaces.IProxyPushSupplier; 39 import org.jacorb.util.ObjectUtil; 40 import org.omg.CORBA.ORB ; 41 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 42 import org.omg.PortableServer.POA ; 43 import org.picocontainer.MutablePicoContainer; 44 import org.picocontainer.defaults.DefaultPicoContainer; 45 46 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 47 48 public abstract class AbstractProxyPushSupplier extends AbstractProxySupplier implements 49 IProxyPushSupplier 50 { 51 private final RetryStrategyFactory retryStrategyFactory_; 52 53 57 private final SynchronizedBoolean enabled_ = new SynchronizedBoolean(true); 58 59 private final PushTaskExecutor pushTaskExecutor_; 60 61 private final PushTaskExecutor.PushTask pushTask_ = new PushTaskExecutor.PushTask() 62 { 63 public void doPush() 64 { 65 pushPendingData(); 66 } 67 68 public void cancel() 69 { 70 } 72 }; 73 74 public AbstractProxyPushSupplier(IAdmin admin, ORB orb, POA poa, Configuration conf, 75 TaskProcessor taskProcessor, PushTaskExecutorFactory pushTaskExecutorFactory, 76 OfferManager offerManager, SubscriptionManager subscriptionManager, 77 ConsumerAdmin consumerAdmin) throws ConfigurationException 78 { 79 super(admin, orb, poa, conf, taskProcessor, offerManager, subscriptionManager, 80 consumerAdmin); 81 82 pushTaskExecutor_ = pushTaskExecutorFactory.newExecutor(this); 83 84 retryStrategyFactory_ = newRetryStrategyFactory(conf, taskProcessor); 85 } 86 87 protected void handleFailedPushOperation(PushOperation operation, Throwable error) 88 { 89 if (AbstractRetryStrategy.isFatalException(error)) 90 { 91 if (logger_.isErrorEnabled()) 94 { 95 logger_.error("push raised " + error + ": will destroy ProxySupplier, " 96 + "disconnect Consumer", error); 97 } 98 99 operation.dispose(); 100 destroy(); 101 102 return; 103 } 104 105 if (!isDisposed()) 106 { 107 RetryStrategy _retry = newRetryStrategy(this, operation); 108 109 try 110 { 111 _retry.retry(); 112 } catch (RetryException e) 113 { 114 logger_.error("retry failed", e); 115 116 _retry.dispose(); 117 destroy(); 118 } 119 } 120 } 121 122 private RetryStrategy newRetryStrategy(IProxyPushSupplier pushSupplier, 123 PushOperation pushOperation) 124 { 125 return retryStrategyFactory_.newRetryStrategy(pushSupplier, pushOperation); 126 } 127 128 private RetryStrategyFactory newRetryStrategyFactory(Configuration config, 129 TaskProcessor taskProcessor) throws ConfigurationException 130 { 131 String factoryName = config.getAttribute(Attributes.RETRY_STRATEGY_FACTORY, 132 Default.DEFAULT_RETRY_STRATEGY_FACTORY); 133 134 try 135 { 136 Class factoryClazz = ObjectUtil.classForName(factoryName); 137 138 MutablePicoContainer pico = new DefaultPicoContainer(); 139 140 pico.registerComponentInstance(TaskProcessor.class, taskProcessor); 141 142 pico.registerComponentImplementation(RetryStrategyFactory.class, factoryClazz); 143 144 pico.registerComponentInstance(config); 145 146 return (RetryStrategyFactory) pico.getComponentInstance(RetryStrategyFactory.class); 147 148 } catch (ClassNotFoundException e) 149 { 150 throw new ConfigurationException(Attributes.RETRY_STRATEGY_FACTORY, e); 151 } 152 } 153 154 public final void schedulePush() 155 { 156 if (!isDisposed() && !isSuspended() && isEnabled()) 157 { 158 schedulePush(pushTask_); 159 } 160 } 161 162 public final void schedulePush(PushTaskExecutor.PushTask pushTask) 163 { 164 pushTaskExecutor_.executePush(pushTask); 165 } 166 167 public final void messageDelivered() 168 { 169 if (isEnabled()) 170 { 171 schedulePush(); 172 } 173 } 174 175 public void resetErrorCounter() 176 { 177 super.resetErrorCounter(); 178 179 enableDelivery(); 180 } 181 182 public void disableDelivery() 183 { 184 logger_.debug("Disable Delivery to ProxySupplier"); 185 186 enabled_.set(false); 187 } 188 189 protected boolean isEnabled() 190 { 191 return enabled_.get(); 192 } 193 194 private void enableDelivery() 195 { 196 logger_.debug("Enable Delivery to ProxySupplier"); 197 198 enabled_.set(true); 199 } 200 } 201 | Popular Tags |