KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > test > notification > AnyPushReceiver


1 package org.jacorb.test.notification;
2
3 import junit.framework.Assert;
4
5 import org.omg.CORBA.Any JavaDoc;
6 import org.omg.CORBA.IntHolder JavaDoc;
7 import org.omg.CORBA.ORB JavaDoc;
8 import org.omg.CosEventChannelAdmin.AlreadyConnected;
9 import org.omg.CosEventChannelAdmin.TypeError;
10 import org.omg.CosEventComm.Disconnected;
11 import org.omg.CosNotification.EventType;
12 import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded;
13 import org.omg.CosNotifyChannelAdmin.AdminNotFound;
14 import org.omg.CosNotifyChannelAdmin.ClientType;
15 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
16 import org.omg.CosNotifyChannelAdmin.EventChannel;
17 import org.omg.CosNotifyChannelAdmin.InterFilterGroupOperator;
18 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplier;
19 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierHelper;
20 import org.omg.CosNotifyChannelAdmin.ProxyType;
21 import org.omg.CosNotifyComm.PushConsumerPOA;
22 import org.omg.CosNotifyFilter.Filter;
23 import org.omg.CosNotifyFilter.FilterNotFound;
24 import org.omg.PortableServer.POA JavaDoc;
25
26 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
27 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
28
29 public class AnyPushReceiver extends PushConsumerPOA implements Runnable JavaDoc, TestClientOperations
30 {
31     Any JavaDoc event_ = null;
32
33     ORB JavaDoc orb_;
34
35     POA poa_;
36
37     long receiveTime_;
38
39     CyclicBarrier barrier_;
40
41     ProxyPushSupplier mySupplier_;
42
43     boolean connected_;
44
45     int numberOfExpectedEvents_ = 1;
46
47     final SynchronizedInt received_ = new SynchronizedInt(0);
48
49     long TIMEOUT = 4000L;
50
51     long TIMEOUT_OFF = 0;
52
53     int filterId_ = Integer.MIN_VALUE;
54
55     ConsumerAdmin myAdmin_;
56
57     private Object JavaDoc lock_ = new Object JavaDoc();
58
59     public AnyPushReceiver(ORB JavaDoc orb)
60     {
61         orb_ = orb;
62     }
63
64     public void setNumberOfExpectedEvents(int number)
65     {
66         numberOfExpectedEvents_ = number;
67     }
68
69
70     public void setFilter(Filter filter)
71     {
72         filterId_ = mySupplier_.add_filter(filter);
73     }
74
75     public void addAdminFilter(Filter filter)
76     {
77         Assert.assertNotNull(myAdmin_);
78         myAdmin_.add_filter(filter);
79     }
80
81     public void addProxyFilter(Filter filter)
82     {
83         Assert.assertNotNull(mySupplier_);
84         mySupplier_.add_filter(filter);
85     }
86
87     public boolean isEventHandled()
88     {
89         System.out.println("expected: " + numberOfExpectedEvents_ + " received: " + received_);
90
91         if (numberOfExpectedEvents_ > 0)
92         {
93             return received_.get() == numberOfExpectedEvents_;
94         }
95
96         return received_.get() > 0;
97     }
98
99     public void setTimeOut(long timeout)
100     {
101         TIMEOUT = timeout;
102     }
103
104     public void setBarrier(CyclicBarrier barrier)
105     {
106         barrier_ = barrier;
107     }
108
109     public void shutdown() throws FilterNotFound
110     {
111         if (filterId_ != Integer.MIN_VALUE)
112         {
113             mySupplier_.remove_filter(filterId_);
114         }
115         mySupplier_.disconnect_push_supplier();
116
117         myAdmin_.destroy();
118     }
119
120     public void connect(EventChannel channel, boolean useOrSemantic)
121
122     throws AdminLimitExceeded, TypeError, AlreadyConnected, AdminNotFound
123     {
124         IntHolder JavaDoc _proxyId = new IntHolder JavaDoc();
125         IntHolder JavaDoc _adminId = new IntHolder JavaDoc();
126
127         if (useOrSemantic)
128         {
129             myAdmin_ = channel.new_for_consumers(InterFilterGroupOperator.OR_OP, _adminId);
130             Assert.assertEquals(InterFilterGroupOperator.OR_OP, myAdmin_.MyOperator());
131         }
132         else
133         {
134             myAdmin_ = channel.new_for_consumers(InterFilterGroupOperator.AND_OP, _adminId);
135             Assert.assertEquals(InterFilterGroupOperator.AND_OP, myAdmin_.MyOperator());
136         }
137
138         Assert.assertEquals(myAdmin_, channel.get_consumeradmin(_adminId.value));
139
140         mySupplier_ = ProxyPushSupplierHelper.narrow(myAdmin_.obtain_notification_push_supplier(
141                 ClientType.ANY_EVENT, _proxyId));
142
143         Assert.assertEquals(ProxyType._PUSH_ANY, mySupplier_.MyType().value());
144
145         mySupplier_.connect_any_push_consumer(_this(orb_));
146
147         connected_ = true;
148     }
149
150     public int getReceived()
151     {
152         return received_.get();
153     }
154
155     public void run()
156     {
157
158         if (!isEventHandled())
159         {
160             try
161             {
162                 synchronized (lock_)
163                 {
164                     
165                     System.err.println("will wait " + TIMEOUT);
166                     lock_.wait(TIMEOUT);
167                 }
168
169             } catch (InterruptedException JavaDoc e)
170             {
171                 // ignored
172
}
173         }
174
175         if (barrier_ != null)
176         {
177             try
178             {
179                 barrier_.barrier();
180             } catch (InterruptedException JavaDoc ie)
181             {
182                 // ignored
183
}
184         }
185     }
186
187     public void push(Any JavaDoc any) throws Disconnected
188     {
189         received_.increment();
190
191         if (numberOfExpectedEvents_ > 0 && (received_.get() == numberOfExpectedEvents_))
192         {
193             synchronized (lock_)
194             {
195                 lock_.notifyAll();
196             }
197         }
198     }
199
200     public long calcTotalTime(long start)
201     {
202         return (receiveTime_ - start);
203     }
204
205     public boolean isConnected()
206     {
207         return connected_;
208     }
209
210     public boolean isError()
211     {
212         return false;
213     }
214
215     public void disconnect_push_consumer()
216     {
217         connected_ = false;
218     }
219
220     public void offer_change(EventType[] e1, EventType[] e2)
221     {
222         // ignored
223
}
224 }
Popular Tags