1 18 package org.apache.activemq.ra; 19 20 import java.io.ByteArrayOutputStream ; 21 import java.io.DataOutputStream ; 22 import java.io.IOException ; 23 import java.lang.reflect.Method ; 24 import java.util.Timer ; 25 26 import javax.jms.Connection ; 27 import javax.jms.Message ; 28 import javax.jms.MessageListener ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Queue ; 31 import javax.jms.Session ; 32 import javax.resource.ResourceException ; 33 import javax.resource.spi.BootstrapContext ; 34 import javax.resource.spi.UnavailableException ; 35 import javax.resource.spi.XATerminator ; 36 import javax.resource.spi.endpoint.MessageEndpoint ; 37 import javax.resource.spi.endpoint.MessageEndpointFactory ; 38 import javax.resource.spi.work.ExecutionContext ; 39 import javax.resource.spi.work.Work ; 40 import javax.resource.spi.work.WorkException ; 41 import javax.resource.spi.work.WorkListener ; 42 import javax.resource.spi.work.WorkManager ; 43 import javax.transaction.xa.XAResource ; 44 import javax.transaction.xa.Xid ; 45 46 import junit.framework.TestCase; 47 48 import org.apache.activemq.ActiveMQConnectionFactory; 49 import org.apache.activemq.command.ActiveMQQueue; 50 import org.apache.activemq.ra.ActiveMQActivationSpec; 51 import org.apache.activemq.ra.ActiveMQResourceAdapter; 52 53 import java.util.concurrent.CountDownLatch ; 54 import java.util.concurrent.TimeUnit ; 55 56 public class MDBTest extends TestCase { 57 58 private final class StubBootstrapContext implements BootstrapContext { 59 public WorkManager getWorkManager() { 60 return new WorkManager () { 61 public void doWork(Work work) throws WorkException { 62 new Thread (work).start(); 63 } 64 65 public void doWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) 66 throws WorkException { 67 new Thread (work).start(); 68 } 69 70 public long startWork(Work work) throws WorkException { 71 new Thread (work).start(); 72 return 0; 73 } 74 75 public long startWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) 76 throws WorkException { 77 new Thread (work).start(); 78 return 0; 79 } 80 81 public void scheduleWork(Work work) throws WorkException { 82 new Thread (work).start(); 83 } 84 85 public void scheduleWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) 86 throws WorkException { 87 new Thread (work).start(); 88 } 89 }; 90 } 91 92 public XATerminator getXATerminator() { 93 return null; 94 } 95 96 public Timer createTimer() throws UnavailableException { 97 return null; 98 } 99 } 100 101 public class StubMessageEndpoint implements MessageEndpoint , MessageListener { 102 public int messageCount; 103 public XAResource xaresource; 104 public Xid xid=null; 105 106 public void beforeDelivery(Method method) throws NoSuchMethodException , ResourceException { 107 try { 108 if( xid==null ) 109 xid = createXid(); 110 xaresource.start(xid,0); 111 } catch (Throwable e) { 112 throw new ResourceException (e); 113 } 114 } 115 116 public void afterDelivery() throws ResourceException { 117 try { 118 xaresource.end(xid,0); 119 xaresource.prepare(xid); 120 xaresource.commit(xid,false); 121 } catch (Throwable e) { 122 throw new ResourceException (e); 123 } 124 } 125 126 public void release() { 127 } 128 129 public void onMessage(Message message) { 130 messageCount++; 131 } 132 133 } 134 135 public void testMessageDelivery() throws Exception { 136 137 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 138 Connection connection = factory.createConnection(); 139 Session session = connection.createSession(false, 0); 140 141 ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); 142 adapter.setServerUrl("vm://localhost?broker.persistent=false"); 143 adapter.start(new StubBootstrapContext()); 144 145 final CountDownLatch messageDelivered = new CountDownLatch (1); 146 147 final StubMessageEndpoint endpoint = new StubMessageEndpoint() { 148 public void onMessage(Message message) { 149 super.onMessage(message); 150 messageDelivered.countDown(); 151 }; 152 }; 153 154 ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); 155 activationSpec.setDestinationType(Queue .class.getName()); 156 activationSpec.setDestination("TEST"); 157 activationSpec.setResourceAdapter(adapter); 158 activationSpec.validate(); 159 160 MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory () { 161 public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException { 162 endpoint.xaresource = resource; 163 return endpoint; 164 } 165 public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { 166 return true; 167 } 168 }; 169 170 adapter.endpointActivation(messageEndpointFactory, activationSpec); 172 173 try { 175 Thread.sleep(1000); 176 } catch (Exception e) { 177 178 } 179 180 MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); 182 producer.send(session.createTextMessage("Hello!")); 183 connection.close(); 184 185 assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS)); 187 188 adapter.endpointDeactivation(messageEndpointFactory, activationSpec); 190 adapter.stop(); 191 192 } 193 194 long txGenerator = System.currentTimeMillis(); 195 196 public Xid createXid() throws IOException { 197 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 198 DataOutputStream os = new DataOutputStream (baos); 199 os.writeLong(++txGenerator); 200 os.close(); 201 final byte[] bs = baos.toByteArray(); 202 203 return new Xid () { 204 public int getFormatId() { 205 return 86; 206 } 207 public byte[] getGlobalTransactionId() { 208 return bs; 209 } 210 public byte[] getBranchQualifier() { 211 return bs; 212 } 213 }; 214 215 } 216 217 } 218 | Popular Tags |