1 package org.jacorb.notification.servant; 2 3 23 24 import org.apache.avalon.framework.configuration.Configuration; 25 import org.jacorb.notification.MessageFactory; 26 import org.jacorb.notification.OfferManager; 27 import org.jacorb.notification.SubscriptionManager; 28 import org.jacorb.notification.engine.TaskProcessor; 29 import org.jacorb.notification.interfaces.Message; 30 import org.omg.CORBA.BooleanHolder ; 31 import org.omg.CORBA.ORB ; 32 import org.omg.CosEventChannelAdmin.AlreadyConnected; 33 import org.omg.CosEventComm.Disconnected; 34 import org.omg.CosNotification.StructuredEvent; 35 import org.omg.CosNotifyChannelAdmin.ProxyType; 36 import org.omg.CosNotifyChannelAdmin.SequenceProxyPullConsumerOperations; 37 import org.omg.CosNotifyChannelAdmin.SequenceProxyPullConsumerPOATie; 38 import org.omg.CosNotifyComm.SequencePullSupplier; 39 import org.omg.PortableServer.POA ; 40 import org.omg.PortableServer.Servant ; 41 42 46 47 public class SequenceProxyPullConsumerImpl extends StructuredProxyPullConsumerImpl implements 48 SequenceProxyPullConsumerOperations 49 { 50 private SequencePullSupplier sequencePullSupplier_; 51 52 54 public SequenceProxyPullConsumerImpl(IAdmin admin, ORB orb, POA poa, Configuration conf, 55 TaskProcessor taskProcessor, MessageFactory mf, OfferManager offerManager, SubscriptionManager subscriptionManager) 56 { 57 super(admin, orb, poa, conf, taskProcessor, mf, offerManager, subscriptionManager); 58 } 59 60 public ProxyType MyType() 61 { 62 return ProxyType.PULL_SEQUENCE; 63 } 64 65 public void disconnect_sequence_pull_consumer() 66 { 67 destroy(); 68 } 69 70 public synchronized void connect_sequence_pull_supplier( 71 SequencePullSupplier sequencePullSupplier) throws AlreadyConnected 72 { 73 checkIsNotConnected(); 74 75 sequencePullSupplier_ = sequencePullSupplier; 76 77 connectClient(sequencePullSupplier); 78 79 startTask(); 80 } 81 82 85 protected void runPullEventInternal() throws InterruptedException , Disconnected 86 { 87 BooleanHolder _hasEvent = new BooleanHolder (); 88 _hasEvent.value = false; 89 StructuredEvent[] _events = null; 90 91 try 92 { 93 pullSync_.acquire(); 94 95 _events = sequencePullSupplier_.try_pull_structured_events(1, _hasEvent); 96 } finally 97 { 98 pullSync_.release(); 99 } 100 101 if (_hasEvent.value) 102 { 103 for (int x = 0; x < _events.length; ++x) 104 { 105 Message msg = getMessageFactory().newMessage(_events[x], this); 106 107 processMessage(msg); 108 } 109 } 110 } 111 112 protected void disconnectClient() 113 { 114 stopTask(); 115 sequencePullSupplier_.disconnect_sequence_pull_supplier(); 116 sequencePullSupplier_ = null; 117 } 118 119 public synchronized Servant getServant() 120 { 121 if (thisServant_ == null) 122 { 123 thisServant_ = new SequenceProxyPullConsumerPOATie(this); 124 } 125 126 return thisServant_; 127 } 128 } | Popular Tags |