1 package org.jacorb.test.notification; 2 3 import junit.framework.Assert; 4 5 import org.omg.CORBA.Any ; 6 import org.omg.CORBA.IntHolder ; 7 import org.omg.CORBA.ORB ; 8 import org.omg.CosEventChannelAdmin.AlreadyConnected; 9 import org.omg.CosEventChannelAdmin.TypeError; 10 import org.omg.CosEventComm.Disconnected; 11 import org.omg.CosNotification.EventType; 12 import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded; 13 import org.omg.CosNotifyChannelAdmin.AdminNotFound; 14 import org.omg.CosNotifyChannelAdmin.ClientType; 15 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 16 import org.omg.CosNotifyChannelAdmin.EventChannel; 17 import org.omg.CosNotifyChannelAdmin.InterFilterGroupOperator; 18 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplier; 19 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierHelper; 20 import org.omg.CosNotifyChannelAdmin.ProxyType; 21 import org.omg.CosNotifyComm.PushConsumerPOA; 22 import org.omg.CosNotifyFilter.Filter; 23 import org.omg.CosNotifyFilter.FilterNotFound; 24 import org.omg.PortableServer.POA ; 25 26 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier; 27 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 28 29 public class AnyPushReceiver extends PushConsumerPOA implements Runnable , TestClientOperations 30 { 31 Any event_ = null; 32 33 ORB orb_; 34 35 POA poa_; 36 37 long receiveTime_; 38 39 CyclicBarrier barrier_; 40 41 ProxyPushSupplier mySupplier_; 42 43 boolean connected_; 44 45 int numberOfExpectedEvents_ = 1; 46 47 final SynchronizedInt received_ = new SynchronizedInt(0); 48 49 long TIMEOUT = 4000L; 50 51 long TIMEOUT_OFF = 0; 52 53 int filterId_ = Integer.MIN_VALUE; 54 55 ConsumerAdmin myAdmin_; 56 57 private Object lock_ = new Object (); 58 59 public AnyPushReceiver(ORB orb) 60 { 61 orb_ = orb; 62 } 63 64 public void setNumberOfExpectedEvents(int number) 65 { 66 numberOfExpectedEvents_ = number; 67 } 68 69 70 public void setFilter(Filter filter) 71 { 72 filterId_ = mySupplier_.add_filter(filter); 73 } 74 75 public void addAdminFilter(Filter filter) 76 { 77 Assert.assertNotNull(myAdmin_); 78 myAdmin_.add_filter(filter); 79 } 80 81 public void addProxyFilter(Filter filter) 82 { 83 Assert.assertNotNull(mySupplier_); 84 mySupplier_.add_filter(filter); 85 } 86 87 public boolean isEventHandled() 88 { 89 System.out.println("expected: " + numberOfExpectedEvents_ + " received: " + received_); 90 91 if (numberOfExpectedEvents_ > 0) 92 { 93 return received_.get() == numberOfExpectedEvents_; 94 } 95 96 return received_.get() > 0; 97 } 98 99 public void setTimeOut(long timeout) 100 { 101 TIMEOUT = timeout; 102 } 103 104 public void setBarrier(CyclicBarrier barrier) 105 { 106 barrier_ = barrier; 107 } 108 109 public void shutdown() throws FilterNotFound 110 { 111 if (filterId_ != Integer.MIN_VALUE) 112 { 113 mySupplier_.remove_filter(filterId_); 114 } 115 mySupplier_.disconnect_push_supplier(); 116 117 myAdmin_.destroy(); 118 } 119 120 public void connect(EventChannel channel, boolean useOrSemantic) 121 122 throws AdminLimitExceeded, TypeError, AlreadyConnected, AdminNotFound 123 { 124 IntHolder _proxyId = new IntHolder (); 125 IntHolder _adminId = new IntHolder (); 126 127 if (useOrSemantic) 128 { 129 myAdmin_ = channel.new_for_consumers(InterFilterGroupOperator.OR_OP, _adminId); 130 Assert.assertEquals(InterFilterGroupOperator.OR_OP, myAdmin_.MyOperator()); 131 } 132 else 133 { 134 myAdmin_ = channel.new_for_consumers(InterFilterGroupOperator.AND_OP, _adminId); 135 Assert.assertEquals(InterFilterGroupOperator.AND_OP, myAdmin_.MyOperator()); 136 } 137 138 Assert.assertEquals(myAdmin_, channel.get_consumeradmin(_adminId.value)); 139 140 mySupplier_ = ProxyPushSupplierHelper.narrow(myAdmin_.obtain_notification_push_supplier( 141 ClientType.ANY_EVENT, _proxyId)); 142 143 Assert.assertEquals(ProxyType._PUSH_ANY, mySupplier_.MyType().value()); 144 145 mySupplier_.connect_any_push_consumer(_this(orb_)); 146 147 connected_ = true; 148 } 149 150 public int getReceived() 151 { 152 return received_.get(); 153 } 154 155 public void run() 156 { 157 158 if (!isEventHandled()) 159 { 160 try 161 { 162 synchronized (lock_) 163 { 164 165 System.err.println("will wait " + TIMEOUT); 166 lock_.wait(TIMEOUT); 167 } 168 169 } catch (InterruptedException e) 170 { 171 } 173 } 174 175 if (barrier_ != null) 176 { 177 try 178 { 179 barrier_.barrier(); 180 } catch (InterruptedException ie) 181 { 182 } 184 } 185 } 186 187 public void push(Any any) throws Disconnected 188 { 189 received_.increment(); 190 191 if (numberOfExpectedEvents_ > 0 && (received_.get() == numberOfExpectedEvents_)) 192 { 193 synchronized (lock_) 194 { 195 lock_.notifyAll(); 196 } 197 } 198 } 199 200 public long calcTotalTime(long start) 201 { 202 return (receiveTime_ - start); 203 } 204 205 public boolean isConnected() 206 { 207 return connected_; 208 } 209 210 public boolean isError() 211 { 212 return false; 213 } 214 215 public void disconnect_push_consumer() 216 { 217 connected_ = false; 218 } 219 220 public void offer_change(EventType[] e1, EventType[] e2) 221 { 222 } 224 } | Popular Tags |