1 package org.jacorb.test.notification; 2 3 import java.util.ArrayList ; 4 import java.util.List ; 5 6 import junit.framework.Assert; 7 8 import org.omg.CORBA.IntHolder ; 9 import org.omg.CORBA.ORB ; 10 import org.omg.CosEventChannelAdmin.AlreadyConnected; 11 import org.omg.CosEventChannelAdmin.TypeError; 12 import org.omg.CosEventComm.Disconnected; 13 import org.omg.CosNotification.EventType; 14 import org.omg.CosNotification.StructuredEvent; 15 import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded; 16 import org.omg.CosNotifyChannelAdmin.ClientType; 17 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 18 import org.omg.CosNotifyChannelAdmin.EventChannel; 19 import org.omg.CosNotifyChannelAdmin.ProxyType; 20 import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplier; 21 import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierHelper; 22 import org.omg.CosNotifyComm.InvalidEventType; 23 import org.omg.CosNotifyComm.StructuredPushConsumerHelper; 24 import org.omg.CosNotifyComm.StructuredPushConsumerOperations; 25 import org.omg.CosNotifyComm.StructuredPushConsumerPOATie; 26 import org.omg.CosNotifyFilter.Filter; 27 import org.omg.CosNotifyFilter.FilterNotFound; 28 29 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier; 30 31 public class StructuredPushReceiver extends Thread implements StructuredPushConsumerOperations, 32 TestClientOperations 33 { 34 StructuredProxyPushSupplier pushSupplier_; 35 36 int received_ = 0; 37 38 int expected_ = 1; 39 40 int filterId_ = Integer.MIN_VALUE; 41 42 long timeout_ = 2000; 43 44 Filter filter_; 45 46 CyclicBarrier barrier_; 47 48 boolean connected_ = false; 49 50 List addedOffers = new ArrayList (); 51 52 List removedOffers = new ArrayList (); 53 54 ORB orb_; 55 56 public StructuredPushReceiver(ORB orb) 57 { 58 orb_ = orb; 59 } 60 61 public StructuredPushReceiver(ORB orb, int expected) 62 { 63 this(orb); 64 65 expected_ = expected; 66 } 67 68 public void setBarrier(CyclicBarrier barrier) 69 { 70 barrier_ = barrier; 71 } 72 73 public void setFilter(Filter filter) 74 { 75 filter_ = filter; 76 filterId_ = pushSupplier_.add_filter(filter); 77 } 78 79 public void setTimeOut(long timeout) 80 { 81 timeout_ = timeout; 82 } 83 84 public void run() 85 { 86 if (!isEventHandled()) 87 { 88 synchronized (this) 89 { 90 try 91 { 92 wait(timeout_); 93 } catch (InterruptedException e) 94 { 95 } 97 } 98 } 99 100 if (barrier_ != null) 101 { 102 try 103 { 104 barrier_.barrier(); 105 } catch (InterruptedException ie) 106 { 107 } 109 } 110 } 111 112 public void push_structured_event(StructuredEvent event) throws Disconnected 113 { 114 received_++; 115 116 if (received_ % 100 == 0) 117 { 118 System.out.println("push: " + received_); 119 } 120 121 122 if (received_ == expected_) 123 { 124 synchronized (this) 125 { 126 notifyAll(); 127 } 128 } 129 } 130 131 132 public void disconnect_structured_push_consumer() 133 { 134 connected_ = false; 135 } 136 137 public void offer_change(EventType[] added, EventType[] removed) throws InvalidEventType 138 { 139 for (int x = 0; x < added.length; ++x) 140 { 141 addedOffers.add(added[x]); 142 } 143 144 for (int x = 0; x < removed.length; ++x) 145 { 146 removedOffers.add(removed[x]); 147 } 148 } 149 150 public void connect(EventChannel channel, boolean useOrSemantic) throws AdminLimitExceeded, 151 AlreadyConnected, TypeError 152 { 153 StructuredPushConsumerPOATie receiverTie = new StructuredPushConsumerPOATie(this); 154 155 ConsumerAdmin _consumerAdmin = channel.default_consumer_admin(); 156 157 IntHolder _proxyIdHolder = new IntHolder (); 158 pushSupplier_ = StructuredProxyPushSupplierHelper.narrow(_consumerAdmin 159 .obtain_notification_push_supplier(ClientType.STRUCTURED_EVENT, _proxyIdHolder)); 160 161 Assert.assertNotNull(pushSupplier_); 162 Assert.assertNotNull(pushSupplier_.MyType()); 163 Assert.assertEquals(pushSupplier_.MyType(), ProxyType.PUSH_STRUCTURED); 164 165 Assert.assertEquals(_consumerAdmin, pushSupplier_.MyAdmin()); 166 pushSupplier_.connect_structured_push_consumer(StructuredPushConsumerHelper 167 .narrow(receiverTie._this(orb_))); 168 169 connected_ = true; 170 } 171 172 public boolean isEventHandled() 173 { 174 if (expected_ > 0) 175 { 176 return received_ == expected_; 177 } 178 179 return received_ > 0; 180 } 181 182 public boolean isConnected() 183 { 184 return connected_; 185 } 186 187 public boolean isError() 188 { 189 return false; 190 } 191 192 public void shutdown() throws FilterNotFound 193 { 194 if (filterId_ != Integer.MIN_VALUE) 195 { 196 pushSupplier_.remove_filter(filterId_); 197 } 198 199 Assert.assertTrue(!pushSupplier_._non_existent()); 200 pushSupplier_.disconnect_structured_push_supplier(); 201 202 if (filter_ != null) 203 { 204 filter_.destroy(); 205 } 206 } 207 208 public String toString() 209 { 210 return "StructuredPushReceiver received: " + received_; 211 } 212 } | Popular Tags |