1 17 package org.apache.servicemix.wsn.component; 18 19 import java.io.File ; 20 import java.io.StringReader ; 21 import java.net.URI ; 22 import java.net.URL ; 23 import java.util.List ; 24 25 import javax.jbi.JBIException; 26 import javax.jbi.messaging.NormalizedMessage; 27 import javax.jbi.servicedesc.ServiceEndpoint; 28 import javax.xml.namespace.QName ; 29 import javax.xml.parsers.DocumentBuilder ; 30 31 import junit.framework.TestCase; 32 33 import org.apache.activemq.ActiveMQConnectionFactory; 34 import org.apache.activemq.broker.BrokerService; 35 import org.apache.servicemix.jbi.container.ActivationSpec; 36 import org.apache.servicemix.jbi.container.JBIContainer; 37 import org.apache.servicemix.jbi.jaxp.SourceTransformer; 38 import org.apache.servicemix.tck.Receiver; 39 import org.apache.servicemix.tck.ReceiverComponent; 40 import org.apache.servicemix.wsn.client.AbstractWSAClient; 41 import org.apache.servicemix.wsn.client.CreatePullPoint; 42 import org.apache.servicemix.wsn.client.NotificationBroker; 43 import org.apache.servicemix.wsn.client.PullPoint; 44 import org.apache.servicemix.wsn.client.Subscription; 45 import org.apache.servicemix.wsn.spring.PublisherComponent; 46 import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType; 47 import org.oasis_open.docs.wsn.b_2.Notify; 48 import org.w3._2005._08.addressing.AttributedURIType; 49 import org.w3._2005._08.addressing.EndpointReferenceType; 50 import org.w3c.dom.Document ; 51 import org.w3c.dom.Element ; 52 import org.w3c.dom.Node ; 53 import org.xml.sax.InputSource ; 54 55 public class WSNComponentTest extends TestCase { 56 57 public static QName NOTIFICATION_BROKER = new QName ("http://servicemix.org/wsnotification", "NotificationBroker"); 58 59 private JBIContainer jbi; 60 private BrokerService jmsBroker; 61 private NotificationBroker wsnBroker; 62 private CreatePullPoint wsnCreatePullPoint; 63 private WSNComponent wsnComponent; 64 65 protected void setUp() throws Exception { 66 jmsBroker = new BrokerService(); 67 jmsBroker.setPersistent(false); 68 jmsBroker.addConnector("vm://localhost"); 69 jmsBroker.start(); 70 71 jbi = new JBIContainer(); 72 jbi.setEmbedded(true); 73 jbi.init(); 74 jbi.start(); 75 76 wsnComponent = new WSNComponent(); 77 wsnComponent.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost")); 78 ActivationSpec as = new ActivationSpec(); 79 as.setComponentName("servicemix-wsn2005"); 80 as.setComponent(wsnComponent); 81 jbi.activateComponent(as); 82 83 wsnBroker = new NotificationBroker(jbi); 84 wsnCreatePullPoint = new CreatePullPoint(jbi); 85 } 86 87 protected void tearDown() throws Exception { 88 if (jbi != null) { 89 jbi.shutDown(); 90 } 91 if (jmsBroker != null) { 92 jmsBroker.stop(); 93 } 94 } 95 96 public void testWSDL() throws Exception { 97 ServiceEndpoint[] ses = jbi.getRegistry().getEndpointsForInterface( 98 new QName ("http://docs.oasis-open.org/wsn/brw-2", "NotificationBroker")); 99 assertNotNull(ses); 100 assertEquals(1, ses.length); 101 } 102 103 public void testInvalidSubscribription() throws Exception { 104 try { 105 wsnBroker.subscribe(null, null, null); 106 fail("Expected an exception"); 107 } catch (JBIException e) { 108 } 110 } 111 112 public void testNotify() throws Exception { 113 ReceiverComponent receiver = new ReceiverComponent(); 114 jbi.activateComponent(receiver, "receiver"); 115 116 EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT); 117 wsnBroker.subscribe(consumer, "myTopic", null); 118 119 wsnBroker.notify("myTopic", parse("<hello>world</hello>")); 120 Thread.sleep(50); 122 123 receiver.getMessageList().assertMessagesReceived(1); 124 NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0); 125 Node node = new SourceTransformer().toDOMNode(msg); 126 assertEquals("Notify", node.getLocalName()); 127 128 Thread.sleep(50); 130 } 131 132 public void testRawNotify() throws Exception { 133 ReceiverComponent receiver = new ReceiverComponent(); 134 jbi.activateComponent(receiver, "receiver"); 135 136 EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT); 138 wsnBroker.subscribe(consumer, "myTopic", null, true); 139 140 Element body = parse("<hello>world</hello>"); 141 wsnBroker.notify("myTopic", body); 142 144 Thread.sleep(50); 146 147 receiver.getMessageList().assertMessagesReceived(1); 148 NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0); 149 Node node = new SourceTransformer().toDOMNode(msg); 150 assertEquals("hello", node.getLocalName()); 151 152 Thread.sleep(50); 154 } 155 156 public void testUnsubscribe() throws Exception { 157 PullPoint pullPoint = wsnCreatePullPoint.createPullPoint(); 159 Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null); 160 162 wsnBroker.notify("myTopic", new Notify()); 163 Thread.sleep(50); 165 166 assertEquals(1, pullPoint.getMessages(0).size()); 167 168 subscription.unsubscribe(); 169 170 wsnBroker.notify("myTopic", new Notify()); 171 Thread.sleep(50); 173 174 assertEquals(0, pullPoint.getMessages(0).size()); 175 176 Thread.sleep(50); 178 } 179 180 public void testPauseResume() throws Exception { 181 PullPoint pullPoint = wsnCreatePullPoint.createPullPoint(); 182 Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null); 183 184 wsnBroker.notify("myTopic", new Notify()); 185 Thread.sleep(50); 187 188 assertEquals(1, pullPoint.getMessages(0).size()); 189 190 subscription.pause(); 191 192 wsnBroker.notify("myTopic", new Notify()); 193 Thread.sleep(50); 195 196 assertEquals(0, pullPoint.getMessages(0).size()); 197 198 subscription.resume(); 199 200 wsnBroker.notify("myTopic", new Notify()); 201 Thread.sleep(50); 203 204 assertEquals(1, pullPoint.getMessages(0).size()); 205 206 Thread.sleep(50); 208 } 209 210 public void testPull() throws Exception { 211 PullPoint pullPoint = wsnCreatePullPoint.createPullPoint(); 212 wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null); 213 214 wsnBroker.notify("myTopic", new Notify()); 215 Thread.sleep(50); 217 218 List <NotificationMessageHolderType> msgs = pullPoint.getMessages(0); 219 assertNotNull(msgs); 220 assertEquals(1, msgs.size()); 221 222 Thread.sleep(50); 224 } 225 226 public void testPullWithFilter() throws Exception { 227 PullPoint pullPoint1 = wsnCreatePullPoint.createPullPoint(); 228 PullPoint pullPoint2 = wsnCreatePullPoint.createPullPoint(); 229 wsnBroker.subscribe(pullPoint1.getEndpoint(), "myTopic", "@type = 'a'"); 230 wsnBroker.subscribe(pullPoint2.getEndpoint(), "myTopic", "@type = 'b'"); 231 232 wsnBroker.notify("myTopic", parse("<msg type='a'/>")); 233 Thread.sleep(500); 235 236 assertEquals(1, pullPoint1.getMessages(0).size()); 237 assertEquals(0, pullPoint2.getMessages(0).size()); 238 239 wsnBroker.notify("myTopic", parse("<msg type='b'/>")); 240 Thread.sleep(500); 242 243 assertEquals(0, pullPoint1.getMessages(0).size()); 244 assertEquals(1, pullPoint2.getMessages(0).size()); 245 246 wsnBroker.notify("myTopic", parse("<msg type='c'/>")); 247 Thread.sleep(500); 249 250 assertEquals(0, pullPoint1.getMessages(0).size()); 251 assertEquals(0, pullPoint2.getMessages(0).size()); 252 } 253 254 public void testDemandBasedPublisher() throws Exception { 255 PublisherComponent publisherComponent = new PublisherComponent(); 256 publisherComponent.setService(new QName ("http://servicemix.org/example", "publisher")); 257 publisherComponent.setEndpoint("publisher"); 258 publisherComponent.setTopic("myTopic"); 259 publisherComponent.setDemand(true); 260 jbi.activateComponent(publisherComponent, "publisher"); 261 262 Thread.sleep(50); 263 assertNull(publisherComponent.getSubscription()); 264 265 PullPoint pullPoint = wsnCreatePullPoint.createPullPoint(); 266 Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null); 267 268 Thread.sleep(500); 269 assertNotNull(publisherComponent.getSubscription()); 270 271 subscription.unsubscribe(); 272 273 Thread.sleep(500); 274 assertNull(publisherComponent.getSubscription()); 275 276 Thread.sleep(50); 277 } 278 279 public void testDeployPullPoint() throws Exception { 280 URL url = getClass().getClassLoader().getResource("pullpoint/pullpoint.xml"); 281 File path = new File (new URI (url.toString())); 282 path = path.getParentFile(); 283 wsnComponent.getServiceUnitManager().deploy("pullpoint", path.getAbsolutePath()); 284 285 wsnComponent.getServiceUnitManager().start("pullpoint"); 286 287 wsnBroker.notify("myTopic", parse("<hello>world</hello>")); 288 PullPoint pullPoint = new PullPoint( 289 AbstractWSAClient.createWSA("http://www.consumer.org/service/endpoint"), 290 jbi); 291 Thread.sleep(50); 292 assertEquals(1, pullPoint.getMessages(0).size()); 293 } 294 295 public void testDeploySubscription() throws Exception { 296 URL url = getClass().getClassLoader().getResource("subscription/subscribe.xml"); 297 File path = new File (new URI (url.toString())); 298 path = path.getParentFile(); 299 wsnComponent.getServiceUnitManager().deploy("subscription", path.getAbsolutePath()); 300 301 ActivationSpec consumer = new ActivationSpec(); 302 consumer.setService(new QName ("http://www.consumer.org", "service")); 303 consumer.setEndpoint("endpoint"); 304 Receiver receiver = new ReceiverComponent(); 305 consumer.setComponent(receiver); 306 jbi.activateComponent(consumer); 307 308 wsnComponent.getServiceUnitManager().start("subscription"); 309 310 wsnBroker.notify("myTopic", parse("<hello>world</hello>")); 311 Thread.sleep(50); 313 receiver.getMessageList().assertMessagesReceived(1); 314 receiver.getMessageList().flushMessages(); 315 316 wsnComponent.getServiceUnitManager().stop("subscription"); 317 318 wsnBroker.notify("myTopic", parse("<hello>world</hello>")); 319 Thread.sleep(50); 321 assertEquals(0, receiver.getMessageList().flushMessages().size()); 322 323 wsnComponent.getServiceUnitManager().start("subscription"); 324 325 wsnBroker.notify("myTopic", parse("<hello>world</hello>")); 326 Thread.sleep(50); 328 receiver.getMessageList().assertMessagesReceived(1); 329 receiver.getMessageList().flushMessages(); 330 } 331 332 protected Element parse(String txt) throws Exception { 333 DocumentBuilder builder = new SourceTransformer().createDocumentBuilder(); 334 InputSource is = new InputSource (new StringReader (txt)); 335 Document doc = builder.parse(is); 336 return doc.getDocumentElement(); 337 } 338 339 protected EndpointReferenceType createEPR(QName service, String endpoint) { 340 EndpointReferenceType epr = new EndpointReferenceType(); 341 epr.setAddress(new AttributedURIType()); 342 epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart() + "/" + endpoint); 343 return epr; 344 } 345 346 } 347 | Popular Tags |