KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > messaging > TransactionsTest


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.messaging;
18
19 import java.sql.Connection JavaDoc;
20
21 import javax.jbi.JBIException;
22 import javax.jbi.messaging.DeliveryChannel;
23 import javax.jbi.messaging.ExchangeStatus;
24 import javax.jbi.messaging.InOnly;
25 import javax.jbi.messaging.InOut;
26 import javax.jbi.messaging.MessageExchange;
27 import javax.jbi.messaging.MessagingException;
28 import javax.jbi.messaging.NormalizedMessage;
29 import javax.resource.spi.ConnectionManager JavaDoc;
30 import javax.resource.spi.ManagedConnectionFactory JavaDoc;
31 import javax.sql.DataSource JavaDoc;
32 import javax.sql.XADataSource JavaDoc;
33 import javax.transaction.Status JavaDoc;
34 import javax.transaction.TransactionManager JavaDoc;
35 import javax.xml.namespace.QName JavaDoc;
36
37 import junit.framework.TestCase;
38
39 import org.apache.activemq.broker.BrokerService;
40 import org.apache.derby.jdbc.EmbeddedXADataSource;
41 import org.apache.geronimo.connector.outbound.GenericConnectionManager;
42 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.NoPool;
43 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions;
44 import org.apache.geronimo.transaction.context.GeronimoTransactionManager;
45 import org.apache.geronimo.transaction.context.TransactionContextManager;
46 import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
47 import org.apache.geronimo.transaction.manager.XidFactoryImpl;
48 import org.apache.servicemix.MessageExchangeListener;
49 import org.apache.servicemix.client.DefaultServiceMixClient;
50 import org.apache.servicemix.client.ServiceMixClient;
51 import org.apache.servicemix.components.util.ComponentSupport;
52 import org.apache.servicemix.jbi.container.JBIContainer;
53 import org.apache.servicemix.jbi.jaxp.StringSource;
54 import org.apache.servicemix.jbi.nmr.flow.Flow;
55 import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow;
56 import org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow;
57 import org.apache.servicemix.store.Store;
58 import org.apache.servicemix.store.jdbc.JdbcStoreFactory;
59 import org.apache.servicemix.tck.ExchangeCompletedListener;
60 import org.tranql.connector.AllExceptionsAreFatalSorter;
61 import org.tranql.connector.jdbc.AbstractXADataSourceMCF;
62
63 public class TransactionsTest extends TestCase {
64
65     public static final long TIMEOUT = 1000;
66     
67     private JBIContainer jbi;
68     private BrokerService broker;
69     private TransactionManager JavaDoc tm;
70     private ServiceMixClient client;
71     private DataSource JavaDoc dataSource;
72     private Connection JavaDoc connection;
73     private Store store;
74     private ExchangeCompletedListener listener;
75     
76     protected void setUp() throws Exception JavaDoc {
77
78         // Create an AMQ broker
79
broker = new BrokerService();
80         broker.setUseJmx(false);
81         broker.setPersistent(false);
82         broker.addConnector("tcp://localhost:61616");
83         broker.start();
84         
85         TransactionManagerImpl exTransactionManager = new TransactionManagerImpl(600, new XidFactoryImpl(), null, null);
86         TransactionContextManager transactionContextManager = new TransactionContextManager(exTransactionManager, exTransactionManager);
87         tm = (TransactionManager JavaDoc) new GeronimoTransactionManager(transactionContextManager);
88         
89         // Create an embedded database for testing tx results when commit / rollback
90
ConnectionManager JavaDoc cm = new GenericConnectionManager(
91                         new XATransactions(true, true),
92                         new NoPool(),
93                         false,
94                         null,
95                         transactionContextManager,
96                         "connectionManager",
97                         GenericConnectionManager.class.getClassLoader());
98         ManagedConnectionFactory JavaDoc mcf = new DerbyDataSourceMCF("target/testdb");
99         dataSource = (DataSource JavaDoc) mcf.createConnectionFactory(cm);
100         
101         connection = dataSource.getConnection();
102         
103         JdbcStoreFactory storeFactory = new JdbcStoreFactory();
104         storeFactory.setDataSource(dataSource);
105         storeFactory.setTransactional(true);
106         store = storeFactory.open("store");
107         
108         JCAFlow jcaFlow = new JCAFlow();
109         jcaFlow.setTransactionContextManager(transactionContextManager);
110         
111         jbi = new JBIContainer();
112         jbi.setFlows(new Flow[] { new SedaFlow(), jcaFlow });
113         jbi.setEmbedded(true);
114         jbi.setUseMBeanServer(false);
115         jbi.setCreateMBeanServer(false);
116         jbi.setTransactionManager(tm);
117         jbi.setAutoEnlistInTransaction(true);
118         listener = new ExchangeCompletedListener();
119         jbi.addListener(listener);
120         jbi.init();
121         jbi.start();
122         
123         client = new DefaultServiceMixClient(jbi);
124     }
125     
126     protected void tearDown() throws Exception JavaDoc {
127         listener.assertExchangeCompleted();
128         jbi.shutDown();
129         Thread.sleep(100);
130         broker.stop();
131         connection.close();
132     }
133     
134     protected InOnly createInOnly() throws Exception JavaDoc {
135         InOnly me = client.createInOnlyExchange();
136         me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
137         me.setService(new QName JavaDoc("service"));
138         return me;
139     }
140     
141     protected InOut createInOut() throws Exception JavaDoc {
142         InOut me = client.createInOutExchange();
143         me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
144         me.setService(new QName JavaDoc("service"));
145         return me;
146     }
147     
148     public void testInOnlyAsyncSendAndListener() throws Exception JavaDoc {
149         jbi.activateComponent(new Listener(false, false), "target");
150         
151         MessageExchange me = createInOnly();
152         tm.begin();
153         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
154         client.send(me);
155         assertNull(client.receive(TIMEOUT));
156         tm.commit();
157         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
158         
159         me = client.receive(TIMEOUT);
160         assertNotNull(me);
161         assertEquals(ExchangeStatus.DONE, me.getStatus());
162         assertTrue(me.isTransacted());
163         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
164         
165         assertNotNull(store.load(me.getExchangeId()));
166     }
167     
168     public void testInOnlyAsyncSendAndListenerWithRollback() throws Exception JavaDoc {
169         jbi.activateComponent(new Listener(false, true), "target");
170         
171         MessageExchange me = createInOnly();
172         tm.begin();
173         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
174         client.send(me);
175         assertNull(client.receive(TIMEOUT));
176         tm.commit();
177         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
178         
179         assertNull(client.receive(TIMEOUT));
180         
181         assertNull(store.load(me.getExchangeId()));
182     }
183     
184     public void testInOnlySyncSendAndListener() throws Exception JavaDoc {
185         jbi.activateComponent(new Listener(false, false), "target");
186         
187         MessageExchange me = createInOnly();
188         tm.begin();
189         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
190         client.sendSync(me, TIMEOUT);
191         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
192         assertEquals(ExchangeStatus.DONE, me.getStatus());
193         assertTrue(me.isTransacted());
194         tm.commit();
195         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
196         
197         assertNotNull(store.load(me.getExchangeId()));
198     }
199     
200     public void testInOnlySyncSendAndListenerWithProviderRollback() throws Exception JavaDoc {
201         jbi.activateComponent(new Listener(false, true), "target");
202         
203         MessageExchange me = createInOnly();
204         tm.begin();
205         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
206         client.sendSync(me, TIMEOUT);
207         assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
208         assertEquals(ExchangeStatus.DONE, me.getStatus());
209         assertTrue(me.isTransacted());
210         tm.rollback();
211         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
212         
213         assertNull(store.load(me.getExchangeId()));
214     }
215     
216     public void testInOnlySyncSendAndListenerWithConsumerRollback() throws Exception JavaDoc {
217         jbi.activateComponent(new Listener(false, false), "target");
218         
219         MessageExchange me = createInOnly();
220         tm.begin();
221         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
222         client.sendSync(me, TIMEOUT);
223         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
224         tm.setRollbackOnly();
225         assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
226         assertEquals(ExchangeStatus.DONE, me.getStatus());
227         assertTrue(me.isTransacted());
228         tm.rollback();
229         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
230         
231         assertNull(store.load(me.getExchangeId()));
232     }
233     
234     public void testInOnlyAsyncSendAndPoll() throws Exception JavaDoc {
235         jbi.activateComponent(new Async(false, false), "target");
236         
237         MessageExchange me = createInOnly();
238         tm.begin();
239         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
240         client.send(me);
241         assertNull(client.receive(TIMEOUT));
242         tm.commit();
243         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
244         
245         me = client.receive(TIMEOUT);
246         assertNotNull(me);
247         assertEquals(ExchangeStatus.DONE, me.getStatus());
248         assertTrue(me.isTransacted());
249         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
250         
251         assertNotNull(store.load(me.getExchangeId()));
252     }
253     
254     public void testInOnlyAsyncSendAndPollWithRollback() throws Exception JavaDoc {
255         jbi.activateComponent(new Async(false, true), "target");
256         
257         MessageExchange me = createInOnly();
258         tm.begin();
259         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
260         client.send(me);
261         assertNull(client.receive(TIMEOUT));
262         tm.commit();
263         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
264         
265         assertNull(client.receive(TIMEOUT));
266         
267         assertNull(store.load(me.getExchangeId()));
268     }
269     
270     public void testInOnlySyncSendAndPoll() throws Exception JavaDoc {
271         jbi.activateComponent(new Async(false, false), "target");
272         
273         MessageExchange me = createInOnly();
274         tm.begin();
275         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
276         client.sendSync(me, TIMEOUT);
277         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
278         assertEquals(ExchangeStatus.DONE, me.getStatus());
279         assertTrue(me.isTransacted());
280         tm.commit();
281         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
282         
283         assertNotNull(store.load(me.getExchangeId()));
284     }
285     
286     public void testInOnlySyncSendAndPollWithProviderRollback() throws Exception JavaDoc {
287         jbi.activateComponent(new Async(false, true), "target");
288         
289         MessageExchange me = createInOnly();
290         tm.begin();
291         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
292         client.sendSync(me, TIMEOUT);
293         assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
294         assertEquals(ExchangeStatus.DONE, me.getStatus());
295         assertTrue(me.isTransacted());
296         tm.rollback();
297         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
298         
299         assertNull(store.load(me.getExchangeId()));
300     }
301     
302     public void testInOnlySyncSendAndPollWithConsumerRollback() throws Exception JavaDoc {
303         jbi.activateComponent(new Async(false, false), "target");
304         
305         MessageExchange me = createInOnly();
306         tm.begin();
307         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
308         client.sendSync(me, TIMEOUT);
309         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
310         tm.setRollbackOnly();
311         assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus());
312         assertEquals(ExchangeStatus.DONE, me.getStatus());
313         assertTrue(me.isTransacted());
314         tm.rollback();
315         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
316         
317         assertNull(store.load(me.getExchangeId()));
318     }
319
320     public void testInOutAsyncSendAndAsyncSendAndListener() throws Exception JavaDoc {
321         jbi.activateComponent(new Listener(false, false), "target");
322         
323         MessageExchange me = createInOut();
324         tm.begin();
325         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
326         client.send(me);
327         assertNull(client.receive(TIMEOUT));
328         tm.commit();
329         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
330         
331         me = client.receive(TIMEOUT);
332         assertNotNull(me);
333         assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
334         assertTrue(me.isTransacted());
335         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
336         client.done(me);
337         
338         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
339         assertNotNull(store.load(me.getExchangeId()));
340     }
341
342     /*
343      * NOT SUPPORTED
344      *
345     public void testInOutAsyncSendAndSyncSendAndListener() throws Exception {
346         jbi.activateComponent(new Listener(true, false), "target");
347         
348         MessageExchange me = createInOut();
349         tm.begin();
350         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
351         client.send(me);
352         assertNull(client.receive(TIMEOUT));
353         tm.commit();
354         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
355         
356         me = client.receive(TIMEOUT);
357         assertNotNull(me);
358         assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
359         assertTrue(me.isTransacted());
360         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
361         client.done(me);
362         
363         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
364         assertNotNull(store.load(me.getExchangeId()));
365     }
366     */

367     
368     /*
369      * NOT SUPPORTED
370      *
371     public void testInOutSyncSendAndAsyncSendAndListener() throws Exception {
372         jbi.activateComponent(new Listener(false, false), "target");
373         
374         MessageExchange me = createInOut();
375         tm.begin();
376         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
377         client.sendSync(me, TIMEOUT);
378         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
379         assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
380         assertTrue(me.isTransacted());
381         client.done(me);
382         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
383         tm.commit();
384         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
385         
386         assertNotNull(store.load(me.getExchangeId()));
387     }
388     */

389     
390     public void testInOutSyncSendAndSyncSendAndListener() throws Exception JavaDoc {
391         jbi.activateComponent(new Listener(true, false), "target");
392         
393         MessageExchange me = createInOut();
394         tm.begin();
395         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
396         client.sendSync(me, TIMEOUT);
397         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
398         assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
399         assertTrue(me.isTransacted());
400         client.done(me);
401         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
402         tm.commit();
403         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
404         
405         assertNotNull(store.load(me.getExchangeId()));
406     }
407     
408     public void testInOutAsyncSendAndAsyncSendAndPoll() throws Exception JavaDoc {
409         jbi.activateComponent(new Async(false, false), "target");
410         
411         MessageExchange me = createInOut();
412         tm.begin();
413         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
414         client.send(me);
415         assertNull(client.receive(TIMEOUT));
416         tm.commit();
417         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
418         
419         me = client.receive(TIMEOUT);
420         assertNotNull(me);
421         assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
422         assertTrue(me.isTransacted());
423         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
424         client.done(me);
425         
426         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
427         assertNotNull(store.load(me.getExchangeId()));
428     }
429     
430     public void testInOutSyncSendAndSyncSendAndPoll() throws Exception JavaDoc {
431         jbi.activateComponent(new Async(true, false), "target");
432         
433         MessageExchange me = createInOut();
434         tm.begin();
435         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
436         client.sendSync(me, TIMEOUT);
437         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
438         assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
439         assertTrue(me.isTransacted());
440         client.done(me);
441         assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
442         tm.commit();
443         assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus());
444         
445         assertNotNull(store.load(me.getExchangeId()));
446    }
447     
448     protected class Async extends ComponentSupport implements Runnable JavaDoc {
449         private boolean sync;
450         private boolean rollback;
451         private Thread JavaDoc runner;
452         private boolean running;
453         public Async(boolean sync, boolean rollback) {
454             this.sync = sync;
455             this.rollback = rollback;
456             setService(new QName JavaDoc("service"));
457             setEndpoint("endpoint");
458         }
459         public synchronized void start() throws JBIException {
460             if (!running) {
461                 running = true;
462                 runner = new Thread JavaDoc(this);
463                 runner.start();
464             }
465         }
466         public void run() {
467             while (running) {
468                 try {
469                     DeliveryChannel deliveryChannel = getContext().getDeliveryChannel();
470                     MessageExchange messageExchange = deliveryChannel.accept();
471                     process(messageExchange);
472                 }
473                 catch (Exception JavaDoc e) {
474                     e.printStackTrace();
475                 }
476             }
477         }
478         public synchronized void stop() throws JBIException {
479             running = false;
480         }
481         protected void process(MessageExchange exchange) throws Exception JavaDoc {
482             if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
483                 return;
484             }
485             try {
486                 store.store(exchange.getExchangeId(), exchange);
487             } catch (Exception JavaDoc e) {
488                 throw new MessagingException(e);
489             }
490             if (rollback) {
491                 try {
492                     tm.setRollbackOnly();
493                 } catch (Exception JavaDoc e) {
494                     throw new MessagingException(e);
495                 }
496             }
497             if (exchange instanceof InOnly) {
498                 exchange.setStatus(ExchangeStatus.DONE);
499                 getDeliveryChannel().send(exchange);
500             } else {
501                 NormalizedMessage msg = exchange.createMessage();
502                 msg.setContent(exchange.getMessage("in").getContent());
503                 exchange.setMessage(msg, "out");
504                 if (sync) {
505                     getDeliveryChannel().sendSync(exchange);
506                 } else {
507                     getDeliveryChannel().send(exchange);
508                 }
509             }
510         }
511     }
512     
513     protected class Listener extends ComponentSupport implements MessageExchangeListener {
514         private boolean sync;
515         private boolean rollback;
516         public Listener(boolean sync, boolean rollback) {
517             this.sync = sync;
518             this.rollback = rollback;
519             setService(new QName JavaDoc("service"));
520             setEndpoint("endpoint");
521         }
522         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
523             if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
524                 return;
525             }
526             try {
527                 store.store(exchange.getExchangeId(), exchange);
528             } catch (Exception JavaDoc e) {
529                 throw new MessagingException(e);
530             }
531             if (rollback) {
532                 try {
533                     tm.setRollbackOnly();
534                 } catch (Exception JavaDoc e) {
535                     throw new MessagingException(e);
536                 }
537             }
538             if (exchange instanceof InOnly) {
539                 exchange.setStatus(ExchangeStatus.DONE);
540                 getDeliveryChannel().send(exchange);
541             } else {
542                 NormalizedMessage msg = exchange.createMessage();
543                 msg.setContent(exchange.getMessage("in").getContent());
544                 exchange.setMessage(msg, "out");
545                 if (sync) {
546                     getDeliveryChannel().sendSync(exchange, TIMEOUT);
547                 } else {
548                     getDeliveryChannel().send(exchange);
549                 }
550             }
551         }
552         
553     }
554     
555     public static class DerbyDataSourceMCF extends AbstractXADataSourceMCF {
556         private static final long serialVersionUID = 7971682207810098396L;
557         protected DerbyDataSourceMCF(String JavaDoc dbName) {
558             super(createXADS(dbName), new AllExceptionsAreFatalSorter());
559         }
560         public String JavaDoc getPassword() {
561             return null;
562         }
563         public String JavaDoc getUserName() {
564             return null;
565         }
566         protected static XADataSource JavaDoc createXADS(String JavaDoc dbName) {
567             EmbeddedXADataSource xads = new EmbeddedXADataSource();
568             xads.setDatabaseName(dbName);
569             xads.setCreateDatabase("create");
570             return xads;
571         }
572     }
573     
574 }
575
Popular Tags