1 package org.jacorb.notification.servant; 2 3 22 23 import java.util.List ; 24 25 import org.apache.avalon.framework.configuration.Configuration; 26 import org.apache.avalon.framework.configuration.ConfigurationException; 27 import org.jacorb.notification.OfferManager; 28 import org.jacorb.notification.SubscriptionManager; 29 import org.jacorb.notification.engine.MessagePushOperation; 30 import org.jacorb.notification.engine.PushTaskExecutorFactory; 31 import org.jacorb.notification.engine.TaskProcessor; 32 import org.jacorb.notification.interfaces.Message; 33 import org.jacorb.notification.interfaces.MessageConsumer; 34 import org.jacorb.notification.util.CollectionsWrapper; 35 import org.omg.CORBA.ORB ; 36 import org.omg.CosEventChannelAdmin.AlreadyConnected; 37 import org.omg.CosEventComm.Disconnected; 38 import org.omg.CosEventComm.PushConsumer; 39 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 40 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierHelper; 41 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierOperations; 42 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierPOATie; 43 import org.omg.CosNotifyChannelAdmin.ProxyType; 44 import org.omg.PortableServer.POA ; 45 import org.omg.PortableServer.Servant ; 46 47 51 52 public class ProxyPushSupplierImpl extends AbstractProxyPushSupplier implements 53 ProxyPushSupplierOperations 54 { 55 private class PushAnyOperation extends MessagePushOperation 56 { 57 public PushAnyOperation(Message message) { 58 super(message); 59 } 60 61 public void invokePush() throws Disconnected { 62 deliverMessageInternal(message_); 63 } 64 } 65 66 private PushConsumer pushConsumer_; 67 68 private long timeSpent_; 69 70 72 public ProxyPushSupplierImpl(IAdmin admin, ORB orb, POA poa, Configuration conf, 73 TaskProcessor taskProcessor, PushTaskExecutorFactory pushTaskExecutorFactory, OfferManager offerManager, 74 SubscriptionManager subscriptionManager, ConsumerAdmin consumerAdmin) 75 throws ConfigurationException 76 { 77 super(admin, orb, poa, conf, taskProcessor, pushTaskExecutorFactory, offerManager, 78 subscriptionManager, consumerAdmin); 79 } 80 81 public ProxyType MyType() 82 { 83 return ProxyType.PUSH_ANY; 84 } 85 86 public void disconnect_push_supplier() 87 { 88 destroy(); 89 } 90 91 protected void disconnectClient() 92 { 93 pushConsumer_.disconnect_push_consumer(); 94 95 pushConsumer_ = null; 96 } 97 98 99 100 private void deliverMessageWithRetry(final Message message) 101 { 102 try 103 { 104 deliverMessageInternal(message); 105 } catch (Throwable e) 106 { 107 PushAnyOperation _failedOperation = new PushAnyOperation(message); 108 109 handleFailedPushOperation(_failedOperation, e); 110 } 111 } 112 113 void deliverMessageInternal(final Message message) throws Disconnected 114 { 115 long now = System.currentTimeMillis(); 116 pushConsumer_.push(message.toAny()); 117 timeSpent_ += (System.currentTimeMillis() - now); 118 resetErrorCounter(); 119 } 120 121 public void pushPendingData() 122 { 123 Message[] _events = getAllMessages(); 124 125 for (int x = 0; x < _events.length; ++x) 126 { 127 try 128 { 129 deliverMessageWithRetry(_events[x]); 130 } finally 131 { 132 _events[x].dispose(); 133 } 134 } 135 } 136 137 public void connect_any_push_consumer(PushConsumer pushConsumer) throws AlreadyConnected 138 { 139 checkIsNotConnected(); 140 141 pushConsumer_ = pushConsumer; 142 143 connectClient(pushConsumer); 144 } 145 146 public List getSubsequentFilterStages() 147 { 148 return CollectionsWrapper.singletonList(this); 149 } 150 151 public MessageConsumer getMessageConsumer() 152 { 153 return this; 154 } 155 156 157 158 protected void connectionResumed() 159 { 160 schedulePush(); 161 } 162 163 public synchronized Servant getServant() 164 { 165 if (thisServant_ == null) 166 { 167 thisServant_ = new ProxyPushSupplierPOATie(this); 168 } 169 return thisServant_; 170 } 171 172 public org.omg.CORBA.Object activate() 173 { 174 return ProxyPushSupplierHelper.narrow(getServant()._this_object(getORB())); 175 } 176 177 public long getCost() 178 { 179 return timeSpent_; 180 } 181 } | Popular Tags |