1 17 package org.apache.servicemix.jbi.messaging; 18 19 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 20 21 import org.apache.servicemix.jbi.RuntimeJBIException; 22 import org.apache.servicemix.jbi.container.ActivationSpec; 23 import org.apache.servicemix.jbi.container.JBIContainer; 24 import org.apache.servicemix.jbi.resolver.ServiceNameEndpointResolver; 25 import org.apache.servicemix.tck.AsyncReceiverPojo; 26 import org.apache.servicemix.tck.Receiver; 27 import org.apache.servicemix.tck.ReceiverComponent; 28 import org.apache.servicemix.tck.SenderComponent; 29 import org.springframework.transaction.TransactionStatus; 30 import org.springframework.transaction.support.TransactionCallback; 31 32 import javax.jbi.JBIException; 33 import javax.jbi.messaging.ExchangeStatus; 34 import javax.jbi.messaging.MessageExchange; 35 import javax.jbi.messaging.MessagingException; 36 37 import java.util.Map ; 38 39 42 public abstract class AbstractPersistenceTest extends AbstractTransactionTest { 43 44 protected JBIContainer createJbiContainer(String name) throws Exception { 45 JBIContainer container = new JBIContainer(); 46 container.setTransactionManager(tm); 47 container.setName(name); 48 container.setFlow(createFlow()); 49 container.setPersistent(true); 50 container.setMonitorInstallationDirectory(false); 51 container.init(); 52 container.start(); 53 return container; 54 } 55 56 protected void runSimpleTest(final boolean syncSend, final boolean syncReceive) throws Exception { 57 final int numMessages = NUM_MESSAGES; 59 final SenderComponent sender = new SenderComponent(); 60 sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE)); 61 final Receiver receiver; 62 final Map delivered = new ConcurrentHashMap(); 63 if (syncReceive) { 64 receiver = new ReceiverComponent() { 65 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 66 try { 67 if (delivered.get(exchange.getExchangeId()) == null) { 68 System.err.println("Message delivery rolled back: " + exchange.getExchangeId()); 69 delivered.put(exchange.getExchangeId(), Boolean.TRUE); 70 tm.setRollbackOnly(); 71 } else { 72 System.err.println("Message delivery accepted: " + exchange.getExchangeId()); 73 super.onMessageExchange(exchange); 74 } 75 } catch (Exception e) { 76 throw new MessagingException(e); 77 } 78 } 79 }; 80 } else { 81 receiver = new AsyncReceiverPojo() { 82 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 83 try { 84 if (delivered.get(exchange.getExchangeId()) == null) { 85 System.err.println("Message delivery rolled back: " + exchange.getExchangeId()); 86 delivered.put(exchange.getExchangeId(), Boolean.TRUE); 87 tm.setRollbackOnly(); 88 exchange.setStatus(ExchangeStatus.DONE); 89 getContext().getDeliveryChannel().send(exchange); 90 } else { 91 System.err.println("Message delivery accepted: " + exchange.getExchangeId()); 92 super.onMessageExchange(exchange); 93 } 94 } catch (Exception e) { 95 throw new MessagingException(e); 96 } 97 } 98 }; 99 } 100 101 senderContainer.activateComponent(new ActivationSpec("sender", sender)); 102 senderContainer.activateComponent(new ActivationSpec("receiver", receiver)); 103 104 tt.execute(new TransactionCallback() { 105 public Object doInTransaction(TransactionStatus status) { 106 try { 107 sender.sendMessages(numMessages, syncSend); 108 } catch (JBIException e) { 109 throw new RuntimeJBIException(e); 110 } 111 return null; 112 } 113 }); 114 receiver.getMessageList().assertMessagesReceived(numMessages); 116 } 117 118 } 119 | Popular Tags |