1 22 package org.jboss.test.cts.ejb; 23 24 import javax.ejb.MessageDrivenBean ; 25 import javax.ejb.MessageDrivenContext ; 26 import javax.ejb.EJBException ; 27 import javax.jms.MessageListener ; 28 import javax.jms.QueueConnection ; 29 import javax.jms.QueueSession ; 30 import javax.jms.QueueSender ; 31 import javax.jms.QueueConnectionFactory ; 32 import javax.jms.Queue ; 33 import javax.jms.Message ; 34 import javax.jms.TextMessage ; 35 import javax.jms.JMSException ; 36 import javax.naming.InitialContext ; 37 import org.jboss.logging.Logger; 38 39 45 public class StrictlyPooledMDB implements MessageDrivenBean , MessageListener 46 { 47 private static Logger log = Logger.getLogger(StrictlyPooledMDB.class); 48 49 private static int maxActiveCount = 5; 50 51 private static int activeCount; 52 53 private MessageDrivenContext ctx = null; 54 private QueueConnection queConn; 55 private QueueSession session; 56 private QueueSender sender; 57 58 private static synchronized int incActiveCount() 59 { 60 return activeCount ++; 61 } 62 private static synchronized int decActiveCount() 63 { 64 return activeCount --; 65 } 66 67 public void setMessageDrivenContext(MessageDrivenContext ctx) 68 throws EJBException 69 { 70 this.ctx = ctx; 71 try 72 { 73 InitialContext iniCtx = new InitialContext (); 74 Integer i = (Integer ) iniCtx.lookup("java:comp/env/maxActiveCount"); 75 maxActiveCount = i.intValue(); 76 QueueConnectionFactory factory = (QueueConnectionFactory ) iniCtx.lookup("java:/ConnectionFactory"); 77 queConn = factory.createQueueConnection(); 78 session = queConn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); 79 Queue queue = (Queue ) iniCtx.lookup("queue/B"); 80 sender = session.createSender(queue); 81 } 82 catch(Exception e) 83 { 84 log.error("Setup failure", e); 85 throw new EJBException ("Setup failure", e); 86 } 87 } 88 89 public void ejbCreate() 90 { 91 } 92 93 public void ejbRemove() 94 { 95 try 96 { 97 if( sender != null ) 98 sender.close(); 99 if( session != null ) 100 session.close(); 101 if( queConn != null ) 102 queConn.close(); 103 } 104 catch(Exception e) 105 { 106 log.error("Failed to close JMS resources", e); 107 } 108 } 109 110 public void onMessage(Message message) 111 { 112 int count = incActiveCount(); 113 log.debug("Begin onMessage, activeCount="+count+", ctx="+ctx); 114 try 115 { 116 Message reply = null; 117 if( count > maxActiveCount ) 118 { 119 String msg = "IllegalState, activeCount > maxActiveCount, " 120 + count + " > " + maxActiveCount; 121 Exception e = new IllegalStateException (msg); 123 reply = session.createObjectMessage(e); 124 } 125 else 126 { 127 TextMessage tm = (TextMessage ) message; 128 reply = session.createTextMessage("Recevied msg="+tm.getText()); 130 } 131 Thread.currentThread().sleep(1000); 132 sender.send(reply); 133 } 134 catch(JMSException e) 135 { 136 log.error("Failed to send error message", e); 137 } 138 catch(InterruptedException e) 139 { 140 } 141 finally 142 { 143 count = decActiveCount(); 144 log.debug("End onMessage, activeCount="+count+", ctx="+ctx); 145 } 146 } 147 } 148 | Popular Tags |