1 22 package org.jboss.test.jbossmq.test; 23 24 import java.util.Arrays ; 25 import java.util.List ; 26 27 import javax.jms.Destination ; 28 import javax.jms.ExceptionListener ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageConsumer ; 32 import javax.jms.MessageProducer ; 33 import javax.jms.Session ; 34 import javax.jms.Topic ; 35 import javax.jms.XAConnection ; 36 import javax.jms.XAConnectionFactory ; 37 import javax.jms.XASession ; 38 import javax.naming.InitialContext ; 39 import javax.transaction.xa.XAResource ; 40 import javax.transaction.xa.Xid ; 41 42 import org.jboss.test.JBossTestCase; 43 44 50 public abstract class RecoveryTest extends JBossTestCase 51 { 52 static String RECOVERY_CONFIG = "jbossmq-recovery-service.xml"; 53 54 static String SUBSCRIPTION = "testSubscription"; 55 56 static String FACTORY = "RecoveryXAConnectionFactory"; 57 58 XAConnectionFactory connectionFactory; 59 60 public RecoveryTest(String name) throws Exception 61 { 62 super(name); 63 } 64 65 class JMSClient implements ExceptionListener 66 { 67 Destination destination; 68 69 XAConnection connection; 70 71 XASession xaSession; 72 73 MessageProducer producer; 74 75 MessageConsumer consumer; 76 77 public JMSClient(Destination destination) throws Exception 78 { 79 this.destination = destination; 80 connection = connectionFactory.createXAConnection(); 81 connection.setClientID("JMSClient"); 82 connection.setExceptionListener(this); 83 } 84 85 public void onException(JMSException e) 86 { 87 log.warn("onException", e); 88 close(); 89 } 90 91 public void sendMessage(String text) throws Exception 92 { 93 Message message = getSession().createTextMessage(text); 94 getProducer().send(message); 95 } 96 97 public void sendMessage(int i) throws Exception 98 { 99 Message message = getSession().createObjectMessage(new Integer (i)); 100 getProducer().send(message); 101 } 102 103 public MessageProducer getProducer() throws Exception 104 { 105 if (producer == null) 106 producer = getSession().createProducer(destination); 107 return producer; 108 } 109 110 public Message receiveNoWait() throws Exception 111 { 112 return getConsumer().receiveNoWait(); 113 } 114 115 public MessageConsumer getConsumer() throws Exception 116 { 117 if (consumer == null) 118 { 119 connection.start(); 120 if (destination instanceof Topic ) 121 consumer = getSession().createDurableSubscriber((Topic ) destination, SUBSCRIPTION); 122 else 123 consumer = getSession().createConsumer(destination); 124 } 125 return consumer; 126 } 127 128 public void removeSubscription() throws Exception 129 { 130 getSession().unsubscribe(SUBSCRIPTION); 131 } 132 133 public void enlist(Xid xid) throws Exception 134 { 135 getXAResource().start(xid, XAResource.TMNOFLAGS); 136 } 137 138 public void delist(Xid xid) throws Exception 139 { 140 getXAResource().end(xid, XAResource.TMSUCCESS); 141 } 142 143 public int prepare(Xid xid) throws Exception 144 { 145 return getXAResource().prepare(xid); 146 } 147 148 public void commit(Xid xid) throws Exception 149 { 150 getXAResource().commit(xid, false); 151 } 152 153 public void rollback(Xid xid) throws Exception 154 { 155 getXAResource().rollback(xid); 156 } 157 158 public void forget(Xid xid) throws Exception 159 { 160 getXAResource().forget(xid); 161 } 162 163 public List recover() throws Exception 164 { 165 return Arrays.asList(getXAResource().recover(XAResource.TMSTARTRSCAN)); 166 } 167 168 public XAResource getXAResource() throws Exception 169 { 170 return getXASession().getXAResource(); 171 } 172 173 public Session getSession() throws Exception 174 { 175 return getXASession().getSession(); 176 } 177 178 public XASession getXASession() throws Exception 179 { 180 if (xaSession == null) 181 xaSession = connection.createXASession(); 182 return xaSession; 183 } 184 185 public void close() 186 { 187 try 188 { 189 if (connection != null) 190 connection.close(); 191 } 192 catch (Throwable ignored) 193 { 194 } 195 connection = null; 196 } 197 } 198 199 public static class MyXid implements Xid 200 { 201 static byte next = 0; 202 203 byte[] xid; 204 205 public MyXid() 206 { 207 xid = new byte[] { ++next }; 208 } 209 210 public int getFormatId() 211 { 212 return 314; 213 } 214 215 public byte[] getGlobalTransactionId() 216 { 217 return xid; 218 } 219 220 public byte[] getBranchQualifier() 221 { 222 return null; 223 } 224 } 225 226 protected void assertXidEquals(Xid xid1, Xid xid2) 227 { 228 if (xid1 == null) 229 fail("Null xid1"); 230 if (xid2 == null) 231 fail("Null xid2"); 232 233 assertEquals("Different format id " + xid1.getFormatId() + " " + xid2.getFormatId(), xid1.getFormatId(), xid2.getFormatId()); 234 assertTrue("Different global id " + xid1.getGlobalTransactionId() + " " + xid2.getGlobalTransactionId(), Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId())); 235 if (xid1.getBranchQualifier() == null) 236 { 237 if (xid1.getBranchQualifier() == null) 238 return; 239 else 240 fail("Different branch null " + xid2.getBranchQualifier()); 241 } 242 assertTrue("Different branch " + xid1.getBranchQualifier() + " " + xid2.getBranchQualifier(), Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())); 243 } 244 245 protected void reset(Destination destination) 246 { 247 try 248 { 249 JMSClient client = new JMSClient(destination); 250 try 251 { 252 List xids = client.recover(); 253 log.debug("Recovering: " + xids); 254 for (int i = 0; i < xids.size(); ++i) 255 client.rollback((Xid ) xids.get(i)); 256 } 257 finally 258 { 259 client.close(); 260 } 261 } 262 catch (Exception e) 263 { 264 log.warn("Error resetting", e); 265 } 266 clearDestination(destination); 267 } 268 269 protected int clearDestination(Destination destination) 270 { 271 int count = 0; 272 try 273 { 274 JMSClient client = new JMSClient(destination); 275 try 276 { 277 Message message = client.receiveNoWait(); 278 while (message != null) 279 { 280 ++count; 281 message = client.receiveNoWait(); 282 } 283 } 284 finally 285 { 286 client.close(); 287 } 288 } 289 catch (Exception e) 290 { 291 log.warn("Error clearing " + destination, e); 292 } 293 return count; 294 } 295 296 protected void setup(Destination destination, int count) throws Exception 297 { 298 JMSClient client = new JMSClient(destination); 299 try 300 { 301 for (int i = 0; i < count; ++i) 302 client.sendMessage(i); 303 } 304 finally 305 { 306 client.close(); 307 } 308 } 309 310 protected void makeSubscription(Topic topic) throws Exception 311 { 312 JMSClient client = new JMSClient(topic); 313 try 314 { 315 client.getConsumer(); 316 } 317 finally 318 { 319 client.close(); 320 } 321 } 322 323 protected void removeSubscription(Topic topic) throws Exception 324 { 325 JMSClient client = new JMSClient(topic); 326 try 327 { 328 client.removeSubscription(); 329 } 330 finally 331 { 332 client.close(); 333 } 334 } 335 336 protected void deployRecoveryService() throws Exception 337 { 338 deploy(RECOVERY_CONFIG); 339 try 340 { 341 InitialContext ctx = getInitialContext(); 342 connectionFactory = (XAConnectionFactory ) ctx.lookup(FACTORY); 343 } 344 catch (Throwable t) 345 { 346 try 347 { 348 undeploy(RECOVERY_CONFIG); 349 } 350 catch (Throwable ignored) 351 { 352 } 353 } 354 } 355 356 protected void undeployRecoveryService() throws Exception 357 { 358 undeploy(RECOVERY_CONFIG); 359 } 360 361 protected void restartRecoveryService() throws Exception 362 { 363 undeployRecoveryService(); 364 deployRecoveryService(); 365 } 366 } 367 | Popular Tags |