1 21 22 package com.rift.coad.daemon.messageservice; 23 24 import javax.naming.InitialContext ; 26 import javax.naming.Context ; 27 import java.sql.PreparedStatement ; 28 import java.sql.ResultSet ; 29 import java.sql.Statement ; 30 import javax.sql.DataSource ; 31 import java.util.Set ; 32 import java.util.HashSet ; 33 import javax.transaction.UserTransaction ; 34 import java.sql.Timestamp ; 35 import java.util.ArrayList ; 36 import java.util.Date ; 37 import java.util.Enumeration ; 38 import java.util.HashSet ; 39 import java.util.Iterator ; 40 import java.util.List ; 41 import java.util.Set ; 42 import javax.transaction.xa.XAException ; 43 import javax.transaction.xa.XAResource ; 44 import javax.transaction.xa.Xid ; 45 import org.hibernate.*; 46 import org.hibernate.cfg.*; 47 import org.apache.log4j.Logger; 48 import org.apache.log4j.BasicConfigurator; 49 50 import junit.framework.*; 52 53 import org.objectweb.jotm.Jotm; 55 56 import com.rift.coad.lib.naming.NamingDirector; 58 import com.rift.coad.lib.naming.ContextManager; 59 import com.rift.coad.lib.db.DBSourceManager; 60 import com.rift.coad.lib.common.ObjectSerializer; 61 import com.rift.coad.lib.interceptor.InterceptorFactory; 62 import com.rift.coad.lib.security.RoleManager; 63 import com.rift.coad.lib.security.ThreadsPermissionContainer; 64 import com.rift.coad.lib.security.ThreadPermissionSession; 65 import com.rift.coad.lib.security.UserSession; 66 import com.rift.coad.lib.security.user.UserSessionManager; 67 import com.rift.coad.lib.security.user.UserStoreManager; 68 import com.rift.coad.lib.security.SessionManager; 69 import com.rift.coad.lib.security.login.LoginManager; 70 import com.rift.coad.lib.thread.CoadunationThreadGroup; 71 import com.rift.coad.lib.transaction.TransactionDirector; 72 import com.rift.coad.daemon.messageservice.Message; 73 import com.rift.coad.daemon.messageservice.RPCMessage; 74 import com.rift.coad.daemon.messageservice.TextMessage; 75 import com.rift.coad.daemon.messageservice.MessageManager; 76 import com.rift.coad.daemon.messageservice.MessageServiceException; 77 import com.rift.coad.daemon.messageservice.MessageServiceImpl; 78 import com.rift.coad.daemon.messageservice.db.*; 79 import com.rift.coad.daemon.messageservice.message.MessageImpl; 80 import com.rift.coad.daemon.messageservice.message.RPCMessageImpl; 81 import com.rift.coad.daemon.messageservice.message.TextMessageImpl; 82 import com.rift.coad.hibernate.util.HibernateUtil; 83 import com.rift.coad.util.lock.ObjectLockFactory; 84 import com.rift.coad.util.transaction.TransactionManager; 85 86 91 public class MessageQueueManagerTest extends TestCase { 92 93 96 public class TestMessageManager implements MessageManager { 97 public String id = null; 99 public Date nextProcessTime = new Date (); 100 public int priority = 0; 101 104 public TestMessageManager (String id, int priority) { 105 this.id = id; 106 this.priority = priority; 107 } 108 109 110 113 public String getID() { 114 return id; 115 } 116 117 120 public Message getMessage() throws MessageServiceException { 121 return null; 122 } 123 124 127 public void updateMessage(Message updatedMessage) throws MessageServiceException { 128 129 } 130 131 134 public Date nextProcessTime() { 135 return nextProcessTime; 136 } 137 138 141 public void setNextProcessTime(Date nextProcessTime) { 142 this.nextProcessTime = nextProcessTime; 143 } 144 145 148 public int getPriority() { 149 return priority; 150 } 151 152 153 159 public int compareTo(Object o) { 160 TestMessageManager tmsg =(TestMessageManager)o; 161 if (tmsg.nextProcessTime().getTime() > nextProcessTime().getTime()) { 162 return -1; 163 } else if (nextProcessTime().getTime() > tmsg.nextProcessTime().getTime()) { 164 return 1; 165 } else if (tmsg.getPriority() > getPriority()) { 166 return -1; 167 } else if (getPriority() > tmsg.getPriority()) { 168 return 1; 169 } 170 return 0; 171 } 172 173 174 public void commit(Xid xid, boolean b) throws XAException { 175 } 176 177 public void end(Xid xid, int i) throws XAException { 178 } 179 180 public void forget(Xid xid) throws XAException { 181 } 182 183 public int getTransactionTimeout() throws XAException { 184 return -1; 185 } 186 187 public boolean isSameRM(XAResource xAResource) throws XAException { 188 return this == xAResource; 189 } 190 191 public int prepare(Xid xid) throws XAException { 192 return 0; 193 } 194 195 public Xid [] recover(int i) throws XAException { 196 return null; 197 } 198 199 public void rollback(Xid xid) throws XAException { 200 } 201 202 public boolean setTransactionTimeout(int i) throws XAException { 203 return true; 204 } 205 206 public void start(Xid xid, int i) throws XAException { 207 } 208 209 public String getMessageQueueName() { 210 return "test"; 211 } 212 213 public void remove() throws MessageServiceException { 214 } 215 216 } 217 218 boolean gotRef = false; 219 220 public MessageQueueManagerTest(String testName) { 221 super(testName); 222 } 224 225 protected void setUp() throws Exception { 226 } 227 228 protected void tearDown() throws Exception { 229 } 230 231 232 235 public void testMessageQueueManager() throws Exception { 236 System.out.println("testMessageQueueManager"); 237 238 ThreadsPermissionContainer permissions = new ThreadsPermissionContainer(); 240 SessionManager.init(permissions); 241 UserStoreManager userStoreManager = new UserStoreManager(); 242 UserSessionManager sessionManager = new UserSessionManager(permissions, 243 userStoreManager); 244 LoginManager.init(sessionManager,userStoreManager); 245 CoadunationThreadGroup threadGroup = new CoadunationThreadGroup(sessionManager, 247 userStoreManager); 248 249 RoleManager.getInstance(); 251 252 InterceptorFactory.init(permissions,sessionManager,userStoreManager); 253 254 Set set = new HashSet (); 256 set.add("test"); 257 UserSession user = new UserSession("test1", set); 258 permissions.putSession(new Long (Thread.currentThread().getId()), 259 new ThreadPermissionSession( 260 new Long (Thread.currentThread().getId()),user)); 261 262 NamingDirector.init(threadGroup); 264 265 TransactionDirector transactionDirector = TransactionDirector.init(); 267 268 DBSourceManager.init(); 270 Context context = new InitialContext (); 271 ObjectLockFactory.init(); 272 TransactionManager.init(); 273 274 MessageQueueManager expResult = MessageQueueManager.getInstance(); 275 MessageQueueManager result = MessageQueueManager.getInstance(); 276 assertEquals(expResult, result); 277 278 279 UserTransaction ut = 280 (UserTransaction )context.lookup("java:comp/UserTransaction"); 281 282 ut.begin(); 283 284 MessageQueue queue = result.getQueue("test"); 285 286 ut.commit(); 287 288 ut.begin(); 289 assertEquals(queue, result.getQueue("test")); 290 ut.commit(); 291 292 293 ut.begin(); 294 Session session = HibernateUtil.getInstance(MessageServiceImpl.class). 295 getSession(); 296 com.rift.coad.daemon.messageservice.db.MessageQueue messageQueue = 297 new com.rift.coad.daemon.messageservice.db.MessageQueue("test2"); 298 session.persist(messageQueue); 299 ut.commit(); 300 301 302 ut.begin(); 303 304 MessageQueue queue2 = result.getQueue("test2"); 305 306 ut.commit(); 307 308 ut.begin(); 309 assertEquals(queue2, result.getQueue("test2")); 310 ut.commit(); 311 312 ut.begin(); 313 result.getQueue("test3"); 314 Thread testThread = new Thread (new Runnable () { 315 public void run() { 316 try { 317 MessageQueueManager.getInstance().getQueue("test3"); 318 gotRef = true; 319 } catch (Exception ex) { 320 System.out.println("Failed to get the queue reference : " + 321 ex.getMessage()); 322 ex.printStackTrace(System.out); 323 } 324 } 325 }); 326 testThread.start(); 327 Thread.sleep(500); 328 if (gotRef) { 329 fail("Managed aquire the queue reference"); 330 } 331 ut.commit(); 332 Thread.sleep(500); 333 if (gotRef == false) { 334 fail("Failed aquire the queue reference"); 335 } 336 337 ut.begin(); 338 TestMessageManager message1 = new TestMessageManager("test1",1); 339 queue.addMessage(message1); 340 TestMessageManager message2 = new TestMessageManager("test2",1); 341 queue2.addMessage(message2); 342 ut.commit(); 343 344 Date nextDate = new Date (); 345 System.out.println("Get message"); 346 MessageProcessInfo messageProcessInfo = result.getNextMessage(nextDate); 347 System.out.println("After retrieving the message"); 348 if ((messageProcessInfo == null) || 349 ((messageProcessInfo.getMessageManager() != message1) && 350 (messageProcessInfo.getMessageManager() != message2))) { 351 fail("Failed to retrieve the next message"); 352 } 353 354 System.out.println("Get message"); 355 messageProcessInfo = result.getNextMessage(nextDate); 356 System.out.println("After retrieving the message"); 357 if ((messageProcessInfo == null) || 358 ((messageProcessInfo.getMessageManager() != message1) && 359 (messageProcessInfo.getMessageManager() != message2))) { 360 fail("Failed to retrieve the next message"); 361 } 362 363 System.out.println("Get message"); 364 messageProcessInfo = result.getNextMessage(nextDate); 365 System.out.println("After retrieving the message"); 366 if (messageProcessInfo != null) { 367 fail("Succeeded in retrieving a message"); 368 } 369 370 queue.pushBackMessage(message1); 371 372 messageProcessInfo = result.getNextMessage(nextDate); 373 if ((messageProcessInfo == null) || 374 ((messageProcessInfo.getMessageManager() != message1) && 375 (messageProcessInfo.getMessageManager() != message2))) { 376 fail("Failed to retrieve the next message"); 377 } 378 379 380 ut.begin(); 381 session = HibernateUtil.getInstance(MessageServiceImpl.class). 382 getSession(); 383 com.rift.coad.daemon.messageservice.db.MessageQueue messageQueue2 = 384 new com.rift.coad.daemon.messageservice.db.MessageQueue("test5"); 385 messageQueue2.setNamed(new Integer (1)); 386 session.persist(messageQueue2); 387 ut.commit(); 388 389 390 ut.begin(); 391 392 try { 393 result.getQueue("test5"); 394 fail("Was able to retrieve the named queue"); 395 } catch (MessageServiceException ex) { 396 System.out.println(ex.getMessage()); 397 } 398 399 ut.commit(); 400 } 401 402 403 } 404 | Popular Tags |