KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ra > MDBTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.ra;
19
20 import java.io.ByteArrayOutputStream JavaDoc;
21 import java.io.DataOutputStream JavaDoc;
22 import java.io.IOException JavaDoc;
23 import java.lang.reflect.Method JavaDoc;
24 import java.util.Timer JavaDoc;
25
26 import javax.jms.Connection JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.MessageProducer JavaDoc;
30 import javax.jms.Queue JavaDoc;
31 import javax.jms.Session JavaDoc;
32 import javax.resource.ResourceException JavaDoc;
33 import javax.resource.spi.BootstrapContext JavaDoc;
34 import javax.resource.spi.UnavailableException JavaDoc;
35 import javax.resource.spi.XATerminator JavaDoc;
36 import javax.resource.spi.endpoint.MessageEndpoint JavaDoc;
37 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
38 import javax.resource.spi.work.ExecutionContext JavaDoc;
39 import javax.resource.spi.work.Work JavaDoc;
40 import javax.resource.spi.work.WorkException JavaDoc;
41 import javax.resource.spi.work.WorkListener JavaDoc;
42 import javax.resource.spi.work.WorkManager JavaDoc;
43 import javax.transaction.xa.XAResource JavaDoc;
44 import javax.transaction.xa.Xid JavaDoc;
45
46 import junit.framework.TestCase;
47
48 import org.apache.activemq.ActiveMQConnectionFactory;
49 import org.apache.activemq.command.ActiveMQQueue;
50 import org.apache.activemq.ra.ActiveMQActivationSpec;
51 import org.apache.activemq.ra.ActiveMQResourceAdapter;
52
53 import java.util.concurrent.CountDownLatch JavaDoc;
54 import java.util.concurrent.TimeUnit JavaDoc;
55
56 public class MDBTest extends TestCase {
57
58     private final class StubBootstrapContext implements BootstrapContext JavaDoc {
59         public WorkManager JavaDoc getWorkManager() {
60             return new WorkManager JavaDoc() {
61                 public void doWork(Work JavaDoc work) throws WorkException JavaDoc {
62                     new Thread JavaDoc(work).start();
63                 }
64         
65                 public void doWork(Work JavaDoc work, long arg1, ExecutionContext JavaDoc arg2, WorkListener JavaDoc arg3)
66                         throws WorkException JavaDoc {
67                     new Thread JavaDoc(work).start();
68                 }
69         
70                 public long startWork(Work JavaDoc work) throws WorkException JavaDoc {
71                     new Thread JavaDoc(work).start();
72                     return 0;
73                 }
74         
75                 public long startWork(Work JavaDoc work, long arg1, ExecutionContext JavaDoc arg2, WorkListener JavaDoc arg3)
76                         throws WorkException JavaDoc {
77                     new Thread JavaDoc(work).start();
78                     return 0;
79                 }
80         
81                 public void scheduleWork(Work JavaDoc work) throws WorkException JavaDoc {
82                     new Thread JavaDoc(work).start();
83                 }
84         
85                 public void scheduleWork(Work JavaDoc work, long arg1, ExecutionContext JavaDoc arg2, WorkListener JavaDoc arg3)
86                         throws WorkException JavaDoc {
87                     new Thread JavaDoc(work).start();
88                 }
89             };
90         }
91
92         public XATerminator JavaDoc getXATerminator() {
93             return null;
94         }
95
96         public Timer JavaDoc createTimer() throws UnavailableException JavaDoc {
97             return null;
98         }
99     }
100
101     public class StubMessageEndpoint implements MessageEndpoint JavaDoc, MessageListener JavaDoc {
102         public int messageCount;
103         public XAResource JavaDoc xaresource;
104         public Xid JavaDoc xid=null;
105         
106         public void beforeDelivery(Method JavaDoc method) throws NoSuchMethodException JavaDoc, ResourceException JavaDoc {
107             try {
108                 if( xid==null )
109                     xid = createXid();
110                 xaresource.start(xid,0);
111             } catch (Throwable JavaDoc e) {
112                 throw new ResourceException JavaDoc(e);
113             }
114         }
115
116         public void afterDelivery() throws ResourceException JavaDoc {
117             try {
118                 xaresource.end(xid,0);
119                 xaresource.prepare(xid);
120                 xaresource.commit(xid,false);
121             } catch (Throwable JavaDoc e) {
122                 throw new ResourceException JavaDoc(e);
123             }
124         }
125
126         public void release() {
127         }
128
129         public void onMessage(Message JavaDoc message) {
130             messageCount++;
131         }
132
133     }
134     
135     public void testMessageDelivery() throws Exception JavaDoc {
136
137         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
138         Connection JavaDoc connection = factory.createConnection();
139         Session JavaDoc session = connection.createSession(false, 0);
140
141         ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
142         adapter.setServerUrl("vm://localhost?broker.persistent=false");
143         adapter.start(new StubBootstrapContext());
144
145         final CountDownLatch JavaDoc messageDelivered = new CountDownLatch JavaDoc(1);
146         
147         final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
148             public void onMessage(Message JavaDoc message) {
149                 super.onMessage(message);
150                 messageDelivered.countDown();
151             };
152         };
153         
154         ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
155         activationSpec.setDestinationType(Queue JavaDoc.class.getName());
156         activationSpec.setDestination("TEST");
157         activationSpec.setResourceAdapter(adapter);
158         activationSpec.validate();
159         
160         MessageEndpointFactory JavaDoc messageEndpointFactory = new MessageEndpointFactory JavaDoc() {
161             public MessageEndpoint JavaDoc createEndpoint(XAResource JavaDoc resource) throws UnavailableException JavaDoc {
162                 endpoint.xaresource = resource;
163                 return endpoint;
164             }
165             public boolean isDeliveryTransacted(Method JavaDoc method) throws NoSuchMethodException JavaDoc {
166                 return true;
167             }
168         };
169
170         // Activate an Endpoint
171
adapter.endpointActivation(messageEndpointFactory, activationSpec);
172         
173         // Give endpoint a chance to setup and register its listeners
174
try {
175             Thread.sleep(1000);
176         } catch (Exception JavaDoc e) {
177
178         }
179         
180         // Send the broker a message to that endpoint
181
MessageProducer JavaDoc producer = session.createProducer(new ActiveMQQueue("TEST"));
182         producer.send(session.createTextMessage("Hello!"));
183         connection.close();
184         
185         // Wait for the message to be delivered.
186
assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
187         
188         // Shut the Endpoint down.
189
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
190         adapter.stop();
191         
192     }
193     
194     long txGenerator = System.currentTimeMillis();
195     
196     public Xid JavaDoc createXid() throws IOException JavaDoc {
197         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
198         DataOutputStream JavaDoc os = new DataOutputStream JavaDoc(baos);
199         os.writeLong(++txGenerator);
200         os.close();
201         final byte[] bs = baos.toByteArray();
202
203         return new Xid JavaDoc() {
204             public int getFormatId() {
205                 return 86;
206             }
207             public byte[] getGlobalTransactionId() {
208                 return bs;
209             }
210             public byte[] getBranchQualifier() {
211                 return bs;
212             }
213         };
214         
215     }
216
217 }
218
Popular Tags