1 package org.jacorb.test.notification; 2 3 import junit.framework.Test; 4 5 import org.jacorb.test.notification.common.NotifyServerTestCase; 6 import org.jacorb.test.notification.common.NotifyServerTestSetup; 7 import org.omg.CORBA.Any ; 8 import org.omg.CORBA.IntHolder ; 9 import org.omg.CORBA.NO_IMPLEMENT ; 10 import org.omg.CORBA.TRANSIENT ; 11 import org.omg.CosNotification.EventHeader; 12 import org.omg.CosNotification.EventType; 13 import org.omg.CosNotification.FixedEventHeader; 14 import org.omg.CosNotification.Property; 15 import org.omg.CosNotification.StructuredEvent; 16 import org.omg.CosNotifyChannelAdmin.EventChannel; 17 import org.omg.CosNotifyChannelAdmin.ObtainInfoMode; 18 import org.omg.CosNotifyFilter.ConstraintExp; 19 import org.omg.CosNotifyFilter.Filter; 20 21 import EDU.oswego.cs.dl.util.concurrent.Latch; 22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 23 24 27 28 public class StructuredEventChannelTest extends NotifyServerTestCase 29 { 30 private static final EventType[] EMPTY_EVENT_TYPE = new EventType[0]; 31 32 EventChannel channel_; 33 34 StructuredEvent testEvent_; 35 36 Filter trueFilter_; 37 38 Filter falseFilter_; 39 40 NotificationTestUtils testUtils_; 41 42 44 public StructuredEventChannelTest(String name, NotifyServerTestSetup setup) 45 { 46 super(name, setup); 47 } 48 49 51 public void setUpTest() throws Exception 52 { 53 channel_ = getDefaultChannel(); 54 55 testEvent_ = new StructuredEvent(); 57 EventType _type = new EventType("testDomain", "testType"); 58 FixedEventHeader _fixed = new FixedEventHeader(_type, "testing"); 59 60 Property[] _variable = new Property[0]; 62 testEvent_.header = new EventHeader(_fixed, _variable); 63 64 testEvent_.filterable_data = new Property[1]; 66 67 Any _personAny = getClientORB().create_any(); 68 69 Person _p = new NotificationTestUtils(getClientORB()).getTestPerson(); 71 Address _a = new Address(); 72 73 _p.first_name = "firstname"; 74 _p.last_name = "lastname"; 75 _p.age = 5; 76 _p.phone_numbers = new String [2]; 77 _p.phone_numbers[0] = "12345678"; 78 _p.phone_numbers[1] = ""; 79 _p.nv = new NamedValue[2]; 80 _p.nv[0] = new NamedValue(); 81 _p.nv[1] = new NamedValue(); 82 _p.person_profession = Profession.STUDENT; 83 _a.street = "Takustr."; 84 _a.number = 9; 85 _a.city = "Berlin"; 86 _p.home_address = _a; 87 88 PersonHelper.insert(_personAny, _p); 89 testEvent_.filterable_data[0] = new Property("person", _personAny); 90 91 testEvent_.remainder_of_body = getClientORB().create_any(); 92 93 trueFilter_ = createFilter(); 94 95 ConstraintExp[] _constraintExp = new ConstraintExp[1]; 96 EventType[] _eventType = new EventType[1]; 97 _eventType[0] = new EventType("*", "*"); 98 99 _constraintExp[0] = new ConstraintExp(_eventType, "true"); 100 trueFilter_.add_constraints(_constraintExp); 101 102 falseFilter_ = createFilter(); 103 104 _constraintExp = new ConstraintExp[1]; 105 _eventType = new EventType[1]; 106 _eventType[0] = new EventType("*", "*"); 107 108 _constraintExp[0] = new ConstraintExp(_eventType, "false"); 109 falseFilter_.add_constraints(_constraintExp); 110 } 111 112 private StructuredPushSender getPushSender() 113 { 114 StructuredPushSender sender = new StructuredPushSender(getClientORB()); 115 sender.setStructuredEvent(testEvent_); 116 117 return sender; 118 } 119 120 public void testDestroyChannelDisconnectsClients() throws Exception 121 { 122 Property[] _p = new Property[0]; 123 124 EventChannel _channel = getEventChannelFactory().create_channel(_p, _p, new IntHolder ()); 125 126 StructuredPushSender _pushSender = getPushSender(); 127 128 StructuredPullSender _pullSender = new StructuredPullSender(getClientORB(), testEvent_); 129 StructuredPushReceiver _pushReceiver = new StructuredPushReceiver(getClientORB()); 130 StructuredPullReceiver _pullReceiver = new StructuredPullReceiver(getClientORB()); 131 132 _pushSender.connect(_channel, false); 133 _pullSender.connect(_channel, false); 134 _pushReceiver.connect(_channel, false); 135 _pullReceiver.connect(_channel, false); 136 137 assertTrue(_pushSender.isConnected()); 138 assertTrue(_pullSender.isConnected()); 139 assertTrue(_pushReceiver.isConnected()); 140 assertTrue(_pullReceiver.isConnected()); 141 142 _channel.destroy(); 143 144 assertTrue(!_pushSender.isConnected()); 145 assertTrue(!_pullSender.isConnected()); 146 assertTrue(!_pushReceiver.isConnected()); 147 assertTrue(!_pullReceiver.isConnected()); 148 } 149 150 public void testObtainSubscriptionTypes_sender_throws_NO_IMPLEMENT() throws Exception 151 { 152 final SynchronizedInt subscriptionChangeCounter = new SynchronizedInt(0); 153 154 StructuredPushSender _sender = new StructuredPushSender(getClientORB()) 155 { 156 public void subscription_change(EventType[] added, EventType[] removed) 157 { 158 subscriptionChangeCounter.increment(); 159 160 throw new NO_IMPLEMENT (); 161 } 162 }; 163 164 _sender.setStructuredEvent(testEvent_); 165 166 StructuredPushReceiver _receiver = new StructuredPushReceiver(getClientORB()); 167 168 _sender.connect(channel_, false); 169 170 _receiver.connect(channel_, false); 171 172 _sender.pushConsumer_.obtain_subscription_types(ObtainInfoMode.NONE_NOW_UPDATES_ON); 173 174 EventType[] offers = new EventType[] { new EventType("domain1", "type1") }; 175 176 _receiver.pushSupplier_.subscription_change(offers, EMPTY_EVENT_TYPE); 177 178 offers = new EventType[] { new EventType("domain2", "type2") }; 179 180 _receiver.pushSupplier_.subscription_change(offers, EMPTY_EVENT_TYPE); 181 182 Thread.sleep(1000); 183 184 assertEquals(1, subscriptionChangeCounter.get()); 185 } 186 187 public void testObtainSubscriptionTypes_NONE_NOW_UPDATE_ON() throws Exception 188 { 189 StructuredPushSender _sender = getPushSender(); 190 191 StructuredPushReceiver _receiver = new StructuredPushReceiver(getClientORB()); 192 193 _sender.connect(channel_, false); 194 _receiver.connect(channel_, false); 195 196 _sender.pushConsumer_.obtain_subscription_types(ObtainInfoMode.NONE_NOW_UPDATES_ON); 197 198 EventType[] offers = new EventType[] { new EventType("domain1", "type1") }; 199 200 _receiver.pushSupplier_.subscription_change(offers, EMPTY_EVENT_TYPE); 201 202 Thread.sleep(1000); 203 204 assertEquals(1, _sender.addedSubscriptions_.size()); 205 assertEquals("domain1", ((EventType) _sender.addedSubscriptions_.get(0)).domain_name); 206 assertEquals("type1", ((EventType) _sender.addedSubscriptions_.get(0)).type_name); 207 } 208 209 public void testObtainSubscriptionTypes_ALL_NOW_UPDATE_OFF() throws Exception 210 { 211 StructuredPushSender _sender = getPushSender(); 212 StructuredPushReceiver _receiver = new StructuredPushReceiver(getClientORB()); 213 214 _sender.connect(channel_, false); 215 _receiver.connect(channel_, false); 216 217 EventType[] offers = new EventType[] { new EventType("domain1", "type1") }; 218 219 _receiver.pushSupplier_.subscription_change(offers, EMPTY_EVENT_TYPE); 220 221 EventType[] _subscriptionTypes = _sender.pushConsumer_ 222 .obtain_subscription_types(ObtainInfoMode.ALL_NOW_UPDATES_ON); 223 224 Thread.sleep(1000); 225 226 assertEquals(1, _subscriptionTypes.length); 227 assertEquals("domain1", _subscriptionTypes[0].domain_name); 228 assertEquals("type1", _subscriptionTypes[0].type_name); 229 } 230 231 public void testObtainOfferedTypes_NONE_NOW_UPDATES_ON() throws Exception 232 { 233 StructuredPushSender _sender = getPushSender(); 234 StructuredPushReceiver _receiver = new StructuredPushReceiver(getClientORB()); 235 236 _sender.connect(channel_, false); 237 _receiver.connect(channel_, false); 238 239 _receiver.pushSupplier_.obtain_offered_types(ObtainInfoMode.NONE_NOW_UPDATES_ON); 240 241 EventType[] offers = new EventType[] { new EventType("domain1", "type1") }; 242 243 _sender.pushConsumer_.offer_change(offers, EMPTY_EVENT_TYPE); 244 245 Thread.sleep(1000); 246 247 assertEquals(1, _receiver.addedOffers.size()); 248 assertEquals("domain1", ((EventType) _receiver.addedOffers.get(0)).domain_name); 249 assertEquals("type1", ((EventType) _receiver.addedOffers.get(0)).type_name); 250 } 251 252 public void testObtainOfferedTypes_ALL_NOW_UPDATES_ON() throws Exception 253 { 254 StructuredPushSender _sender = getPushSender(); 255 StructuredPushReceiver _receiver = new StructuredPushReceiver(getClientORB()); 256 257 _sender.connect(channel_, false); 258 _receiver.connect(channel_, false); 259 260 EventType[] offers = new EventType[] { new EventType("domain1", "type1"), 261 new EventType("domain2", "type2") }; 262 263 _sender.pushConsumer_.offer_change(offers, EMPTY_EVENT_TYPE); 264 265 EventType[] _offeredTypes = _receiver.pushSupplier_ 266 .obtain_offered_types(ObtainInfoMode.ALL_NOW_UPDATES_ON); 267 268 assertEquals(2, _offeredTypes.length); 269 } 270 271 public void testObtainOfferedTypes_receiver_throws_NO_IMPLEMENT() throws Exception 272 { 273 final SynchronizedInt offerChangeCalled = new SynchronizedInt(0); 274 275 StructuredPushSender _sender = getPushSender(); 276 277 StructuredPushReceiver _receiver = new StructuredPushReceiver(getClientORB()) 278 { 279 public void offer_change(org.omg.CosNotification.EventType[] added,org.omg.CosNotification.EventType[] removed) 280 { 281 offerChangeCalled.increment(); 282 283 throw new NO_IMPLEMENT (); 284 } 285 }; 286 287 _sender.connect(channel_, false); 288 _receiver.connect(channel_, false); 289 290 _receiver.pushSupplier_.obtain_offered_types(ObtainInfoMode.NONE_NOW_UPDATES_ON); 291 292 EventType[] offers = new EventType[] { new EventType("domain1", "type1"), 293 new EventType("domain2", "type2") }; 294 295 _sender.pushConsumer_.offer_change(offers, EMPTY_EVENT_TYPE); 296 297 offers = new EventType[] { new EventType("domain3", "type3"), 298 new EventType("domain4", "type4") }; 299 300 _sender.pushConsumer_.offer_change(offers, EMPTY_EVENT_TYPE); 301 302 assertEquals(1, offerChangeCalled.get()); 303 } 304 305 public void testSendPushPush() throws Exception 306 { 307 StructuredPushSender _sender = getPushSender(); 308 StructuredPushReceiver _receiver = new StructuredPushReceiver(getClientORB()); 309 310 _sender.connect(channel_, false); 311 _receiver.connect(channel_, false); 312 313 _receiver.start(); 314 _sender.start(); 315 316 _sender.join(); 317 _receiver.join(); 318 319 assertTrue("Error while sending", !_sender.error_); 320 assertTrue("Should have received something", _receiver.isEventHandled()); 321 } 322 323 public void testSendPushPush_MisbehavingConsumer() throws Exception 324 { 325 StructuredPushSender _sender = getPushSender(); 326 327 final Latch disconnectedLatch = new Latch(); 328 329 StructuredPushReceiver _receiver = new StructuredPushReceiver(getClientORB()) 330 { 331 public void push_structured_event(StructuredEvent event) 332 { 333 throw new TRANSIENT (); 334 } 335 336 public void disconnect_structured_push_consumer() 337 { 338 super.disconnect_structured_push_consumer(); 339 340 disconnectedLatch.release(); 341 } 342 }; 343 344 _sender.connect(channel_, false); 345 _receiver.connect(channel_, false); 346 347 _receiver.start(); 348 _sender.start(); 349 350 _sender.join(); 351 _receiver.join(); 352 353 assertTrue(disconnectedLatch.attempt(20000)); 354 355 assertFalse(_receiver.isConnected()); 356 } 357 358 public void testSendPushPull() throws Exception 359 { 360 StructuredPushSender _sender = getPushSender(); 361 StructuredPullReceiver _receiver = new StructuredPullReceiver(getClientORB()); 362 363 _sender.connect(channel_, false); 364 _receiver.connect(channel_, false); 365 366 _receiver.start(); 367 _sender.start(); 368 369 _sender.join(); 370 _receiver.join(); 371 372 assertTrue("Error while sending", !_sender.error_); 373 assertTrue("Should have received something", _receiver.isEventHandled()); 374 } 375 376 public void testSendPullPush() throws Exception 377 { 378 StructuredPullSender _sender = new StructuredPullSender(getClientORB(), testEvent_); 379 StructuredPushReceiver _receiver = new StructuredPushReceiver(getClientORB()); 380 _receiver.setTimeOut(2000); 381 382 _sender.connect(channel_, false); 383 _receiver.connect(channel_, false); 384 385 _receiver.start(); 386 _sender.start(); 387 388 _sender.join(); 389 _receiver.join(); 390 391 assertTrue("Error while sending", !_sender.isError()); 392 assertTrue("Should have received something", _receiver.isEventHandled()); 393 } 394 395 public void testSendPullPull() throws Exception 396 { 397 StructuredPullSender _sender = new StructuredPullSender(getClientORB(), testEvent_); 398 StructuredPullReceiver _receiver = new StructuredPullReceiver(getClientORB()); 399 _sender.connect(channel_, false); 400 401 _receiver.connect(channel_, false); 402 403 _receiver.start(); 404 _sender.start(); 405 406 _sender.join(); 407 _receiver.join(); 408 409 boolean _senderError = ((TestClientOperations) _sender).isError(); 410 assertTrue("Error while sending", !_senderError); 411 assertTrue("Should have received something", _receiver.isEventHandled()); 412 } 413 414 public static Test suite() throws Exception 415 { 416 return NotifyServerTestCase.suite("Test of Structured EventChannel", 417 StructuredEventChannelTest.class); 418 } 419 } 420 | Popular Tags |