1 package org.jacorb.notification.servant; 2 3 23 24 import java.util.List ; 25 26 import org.apache.avalon.framework.configuration.Configuration; 27 import org.apache.avalon.framework.configuration.ConfigurationException; 28 import org.jacorb.notification.OfferManager; 29 import org.jacorb.notification.SubscriptionManager; 30 import org.jacorb.notification.engine.MessagePushOperation; 31 import org.jacorb.notification.engine.PushTaskExecutorFactory; 32 import org.jacorb.notification.engine.TaskProcessor; 33 import org.jacorb.notification.interfaces.Message; 34 import org.jacorb.notification.interfaces.MessageConsumer; 35 import org.jacorb.notification.util.CollectionsWrapper; 36 import org.omg.CORBA.ORB ; 37 import org.omg.CosEventChannelAdmin.AlreadyConnected; 38 import org.omg.CosEventComm.Disconnected; 39 import org.omg.CosNotification.EventType; 40 import org.omg.CosNotification.StructuredEvent; 41 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 42 import org.omg.CosNotifyChannelAdmin.ProxySupplierHelper; 43 import org.omg.CosNotifyChannelAdmin.ProxyType; 44 import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierOperations; 45 import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierPOATie; 46 import org.omg.CosNotifyComm.InvalidEventType; 47 import org.omg.CosNotifyComm.StructuredPushConsumer; 48 import org.omg.CosNotifyComm.StructuredPushConsumerOperations; 49 import org.omg.PortableServer.POA ; 50 import org.omg.PortableServer.Servant ; 51 52 56 57 public class StructuredProxyPushSupplierImpl extends AbstractProxyPushSupplier implements 58 StructuredProxyPushSupplierOperations 59 { 60 private class PushStructuredOperation extends MessagePushOperation 61 { 62 public PushStructuredOperation(Message message) { 63 super(message); 64 } 65 66 public void invokePush() throws Disconnected { 67 deliverMessageInternal(message_); 68 } 69 } 70 71 72 private final static StructuredPushConsumerOperations NULL_CONSUMER = new StructuredPushConsumerOperations() 73 { 74 public void push_structured_event(StructuredEvent event) 75 { 76 } 77 78 public void disconnect_structured_push_consumer() 79 { 80 } 81 82 public void offer_change(EventType[] added, EventType[] removed) throws InvalidEventType 83 { 84 } 85 }; 86 87 private StructuredPushConsumerOperations pushConsumer_; 88 89 private long timeSpent_; 90 91 93 public StructuredProxyPushSupplierImpl(IAdmin admin, ORB orb, POA poa, Configuration conf, 94 TaskProcessor taskProcessor, PushTaskExecutorFactory pushTaskExecutorFactory, OfferManager offerManager, 95 SubscriptionManager subscriptionManager, ConsumerAdmin consumerAdmin) 96 throws ConfigurationException 97 { 98 super(admin, orb, poa, conf, taskProcessor, pushTaskExecutorFactory, offerManager, 99 subscriptionManager, consumerAdmin); 100 } 101 102 public ProxyType MyType() 103 { 104 return ProxyType.PUSH_STRUCTURED; 105 } 106 107 public void pushPendingData() 108 { 109 Message[] _mesgs = getAllMessages(); 110 111 if (_mesgs != null) 112 { 113 for (int x = 0; x < _mesgs.length; ++x) 114 { 115 try 116 { 117 deliverMessageWithRetry(_mesgs[x]); 118 } finally 119 { 120 _mesgs[x].dispose(); 121 } 122 } 123 } 124 } 125 126 private void deliverMessageWithRetry(final Message message) 127 { 128 try 129 { 130 deliverMessageInternal(message); 131 } catch (Throwable e) 132 { 133 PushStructuredOperation _failedOperation = new PushStructuredOperation(message); 134 135 handleFailedPushOperation(_failedOperation, e); 136 } 137 } 138 139 void deliverMessageInternal(final Message message) throws Disconnected 140 { 141 long now = System.currentTimeMillis(); 142 pushConsumer_.push_structured_event(message.toStructuredEvent()); 143 timeSpent_ += (System.currentTimeMillis() - now); 144 resetErrorCounter(); 145 } 146 147 public void connect_structured_push_consumer(StructuredPushConsumer consumer) 148 throws AlreadyConnected 149 { 150 checkIsNotConnected(); 151 152 if (logger_.isDebugEnabled()) 153 { 154 logger_.debug("connect structured_push_consumer"); 155 } 156 157 pushConsumer_ = consumer; 158 159 connectClient(consumer); 160 } 161 162 public void disconnect_structured_push_supplier() 163 { 164 destroy(); 165 } 166 167 protected void connectionResumed() 168 { 169 schedulePush(); 170 } 171 172 protected void disconnectClient() 173 { 174 pushConsumer_.disconnect_structured_push_consumer(); 175 176 pushConsumer_ = NULL_CONSUMER; 177 } 178 179 public List getSubsequentFilterStages() 180 { 181 return CollectionsWrapper.singletonList(this); 182 } 183 184 public MessageConsumer getMessageConsumer() 185 { 186 return this; 187 } 188 189 public synchronized Servant getServant() 190 { 191 if (thisServant_ == null) 192 { 193 thisServant_ = new StructuredProxyPushSupplierPOATie(this); 194 } 195 return thisServant_; 196 } 197 198 public org.omg.CORBA.Object activate() 199 { 200 return ProxySupplierHelper.narrow(getServant()._this_object(getORB())); 201 } 202 203 protected long getCost() 204 { 205 return timeSpent_; 206 } 207 } | Popular Tags |