1 18 package org.apache.activemq.systest.impl; 19 20 import org.apache.activemq.systest.AgentStopper; 21 import org.apache.activemq.systest.MessageList; 22 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.ObjectMessage ; 27 import javax.jms.Session ; 28 import javax.jms.TextMessage ; 29 30 import java.io.Serializable ; 31 import java.util.ArrayList ; 32 import java.util.Date ; 33 import java.util.Iterator ; 34 import java.util.List ; 35 36 import junit.framework.Assert; 37 38 43 public class MessageListImpl extends Assert implements MessageList { 44 45 private int numberOfMessages; 46 private int charactersPerMessage; 47 private List payloads; 48 private String customHeader = "testHeader"; 49 private boolean useTextMessage = true; 50 private int sentCounter; 51 52 public MessageListImpl(int numberOfMessages, int charactersPerMessage) { 53 this.numberOfMessages = numberOfMessages; 54 this.charactersPerMessage = charactersPerMessage; 55 } 56 57 public void start() throws Exception { 58 payloads = new ArrayList (numberOfMessages); 59 for (int i = 0; i < numberOfMessages; i++) { 60 payloads.add(createTextPayload(i, charactersPerMessage)); 61 } 62 } 63 64 public void stop() throws Exception { 65 payloads = null; 66 } 67 68 public void stop(AgentStopper stopper) { 69 payloads = null; 70 } 71 72 public void assertMessagesCorrect(List actual) throws JMSException { 73 int actualSize = actual.size(); 74 75 System.out.println("Consumed so far: " + actualSize + " message(s)"); 76 77 assertTrue("Not enough messages received: " + actualSize + " when expected: " + getSize(), actualSize >= getSize()); 78 79 for (int i = 0; i < actualSize; i++) { 80 assertMessageCorrect(i, actual); 81 } 82 83 assertEquals("Number of messages received", getSize(), actualSize); 84 } 85 86 public void assertMessageCorrect(int index, List actualMessages) throws JMSException { 87 Object expected = getPayloads().get(index); 88 89 Message message = (Message ) actualMessages.get(index); 90 91 assertHeadersCorrect(message, index); 92 93 Object actual = getMessagePayload(message); 94 assertEquals("Message payload for message: " + index, expected, actual); 95 } 96 97 public void sendMessages(Session session, MessageProducer producer) throws JMSException { 98 int counter = 0; 99 for (Iterator iter = getPayloads().iterator(); iter.hasNext(); counter++) { 100 Object element = (Object ) iter.next(); 101 if (counter == sentCounter) { 102 sentCounter++; 103 Message message = createMessage(session, element, counter); 104 appendHeaders(message, counter); 105 producer.send(message); 106 } 107 } 108 } 109 110 public void sendMessages(Session session, MessageProducer producer, int percent) throws JMSException { 111 int numberOfMessages = (getPayloads().size() * percent) / 100; 112 int counter = 0; 113 int lastMessageId = sentCounter + numberOfMessages; 114 for (Iterator iter = getPayloads().iterator(); iter.hasNext() && counter < lastMessageId; counter++) { 115 Object element = (Object ) iter.next(); 116 if (counter == sentCounter) { 117 sentCounter++; 118 Message message = createMessage(session, element, counter); 119 appendHeaders(message, counter); 120 producer.send(message); 121 } 122 } 123 } 124 125 public int getSize() { 126 return getPayloads().size(); 127 } 128 129 public String getCustomHeader() { 132 return customHeader; 133 } 134 135 public void setCustomHeader(String customHeader) { 136 this.customHeader = customHeader; 137 } 138 139 public List getPayloads() { 140 return payloads; 141 } 142 143 protected void appendHeaders(Message message, int counter) throws JMSException { 146 message.setIntProperty(getCustomHeader(), counter); 147 message.setStringProperty("cheese", "Edam"); 148 } 149 150 protected void assertHeadersCorrect(Message message, int index) throws JMSException { 151 assertEquals("String property 'cheese' for message: " + index, "Edam", message.getStringProperty("cheese")); 152 assertEquals("int property '" + getCustomHeader() + "' for message: " + index, index, message.getIntProperty(getCustomHeader())); 153 } 154 155 protected Object getMessagePayload(Message message) throws JMSException { 156 if (message instanceof TextMessage ) { 157 TextMessage textMessage = (TextMessage ) message; 158 return textMessage.getText(); 159 } 160 else if (message instanceof ObjectMessage ) { 161 ObjectMessage objectMessage = (ObjectMessage ) message; 162 return objectMessage.getObject(); 163 } 164 else { 165 return null; 166 } 167 } 168 169 protected Message createMessage(Session session, Object element, int counter) throws JMSException { 170 if (element instanceof String && useTextMessage) { 171 return session.createTextMessage((String ) element); 172 } 173 else if (element == null) { 174 return session.createMessage(); 175 } 176 else { 177 return session.createObjectMessage((Serializable ) element); 178 } 179 } 180 181 protected Object createTextPayload(int messageCounter, int charactersPerMessage) { 182 return "Body of message: " + messageCounter + " sent at: " + new Date (); 183 } 184 } 185 | Popular Tags |