1 package org.jacorb.test.notification; 2 3 import junit.framework.Assert; 4 5 import org.omg.CORBA.BooleanHolder ; 6 import org.omg.CORBA.IntHolder ; 7 import org.omg.CORBA.ORB ; 8 import org.omg.CosEventChannelAdmin.AlreadyConnected; 9 import org.omg.CosEventChannelAdmin.TypeError; 10 import org.omg.CosEventComm.Disconnected; 11 import org.omg.CosNotification.EventType; 12 import org.omg.CosNotification.StructuredEvent; 13 import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded; 14 import org.omg.CosNotifyChannelAdmin.ClientType; 15 import org.omg.CosNotifyChannelAdmin.EventChannel; 16 import org.omg.CosNotifyChannelAdmin.ProxyType; 17 import org.omg.CosNotifyChannelAdmin.SequenceProxyPullConsumer; 18 import org.omg.CosNotifyChannelAdmin.SequenceProxyPullConsumerHelper; 19 import org.omg.CosNotifyChannelAdmin.SupplierAdmin; 20 import org.omg.CosNotifyComm.InvalidEventType; 21 import org.omg.CosNotifyComm.SequencePullSupplierHelper; 22 import org.omg.CosNotifyComm.SequencePullSupplierOperations; 23 import org.omg.CosNotifyComm.SequencePullSupplierPOATie; 24 25 29 30 public class SequencePullSender extends Thread implements SequencePullSupplierOperations, 31 TestClientOperations 32 { 33 ORB orb_; 34 35 StructuredEvent[] event_; 36 37 SequenceProxyPullConsumer pullConsumer_; 38 39 boolean error_; 40 41 boolean connected_; 42 43 boolean eventHandled_; 44 45 boolean available_ = false; 46 47 public void run() 48 { 49 synchronized (this) 50 { 51 available_ = true; 52 } 53 } 54 55 public boolean isError() 56 { 57 return error_; 58 } 59 60 public boolean isEventHandled() 61 { 62 return eventHandled_; 63 } 64 65 public SequencePullSender(ORB orb, StructuredEvent[] event) 66 { 67 event_ = event; 68 orb_ = orb; 69 } 70 71 public void subscription_change(EventType[] eventType1, EventType[] eventType2) 72 throws InvalidEventType 73 { 74 } 76 77 public StructuredEvent[] pull_structured_events(int number) throws Disconnected 78 { 79 BooleanHolder _success = new BooleanHolder (); 80 StructuredEvent[] _event; 81 while (true) 82 { 83 _event = try_pull_structured_events(number, _success); 84 if (_success.value) 85 { 86 return _event; 87 } 88 Thread.yield(); 89 } 90 } 91 92 public StructuredEvent[] try_pull_structured_events(int number, BooleanHolder booleanHolder) 93 throws Disconnected 94 { 95 96 booleanHolder.value = false; 97 StructuredEvent[] _result = new StructuredEvent[] { NotificationTestUtils 98 .getInvalidStructuredEvent(orb_) }; 99 100 if (event_ != null) 101 { 102 synchronized (this) 103 { 104 if (event_ != null && available_) 105 { 106 _result = event_; 107 event_ = null; 108 booleanHolder.value = true; 109 eventHandled_ = true; 110 } 111 } 112 } 113 return _result; 114 } 115 116 public void disconnect_sequence_pull_supplier() 117 { 118 connected_ = false; 119 } 120 121 public void connect(EventChannel channel, boolean useOrSemantic) throws AdminLimitExceeded, 122 AlreadyConnected, TypeError 123 { 124 SequencePullSupplierPOATie _senderTie = new SequencePullSupplierPOATie(this); 125 SupplierAdmin _supplierAdmin = channel.default_supplier_admin(); 126 IntHolder _proxyId = new IntHolder (); 127 pullConsumer_ = SequenceProxyPullConsumerHelper.narrow(_supplierAdmin 128 .obtain_notification_pull_consumer(ClientType.SEQUENCE_EVENT, _proxyId)); 129 130 Assert.assertEquals(ProxyType._PULL_SEQUENCE, pullConsumer_.MyType().value()); 131 132 pullConsumer_.connect_sequence_pull_supplier(SequencePullSupplierHelper.narrow(_senderTie 133 ._this(orb_))); 134 connected_ = true; 135 } 136 137 public void shutdown() 138 { 139 pullConsumer_.disconnect_sequence_pull_consumer(); 140 } 141 142 public boolean isConnected() 143 { 144 return connected_; 145 } 146 147 } 148 | Popular Tags |