1 22 package org.jboss.ejb3.test.strictpool; 23 24 import javax.annotation.Resource; 25 import javax.ejb.ActivationConfigProperty ; 26 import javax.ejb.EJBException ; 27 import javax.ejb.MessageDriven ; 28 import javax.ejb.MessageDrivenContext ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageListener ; 32 import javax.jms.Queue ; 33 import javax.jms.QueueConnection ; 34 import javax.jms.QueueConnectionFactory ; 35 import javax.jms.QueueSender ; 36 import javax.jms.QueueSession ; 37 import javax.jms.TextMessage ; 38 import javax.naming.InitialContext ; 39 40 import org.jboss.annotation.ejb.PoolClass; 41 import org.jboss.annotation.ejb.PoolClass; 42 43 48 @MessageDriven (activationConfig = 49 { 50 @ActivationConfigProperty (propertyName="destinationType", propertyValue="javax.jms.Queue"), 51 @ActivationConfigProperty (propertyName="destination", propertyValue="queue/queueA"), 52 @ActivationConfigProperty (propertyName="maxMessages", propertyValue="10"), 53 @ActivationConfigProperty (propertyName="minSession", propertyValue="10"), 54 @ActivationConfigProperty (propertyName="maxSession", propertyValue="10") 55 }) 56 @PoolClass (value=org.jboss.ejb3.StrictMaxPool.class, maxSize=StrictlyPooledMDB.maxActiveCount, timeout=10000) 57 public class StrictlyPooledMDB implements MessageListener 58 { 59 60 public static final int maxActiveCount = 2; 61 62 private static int activeCount; 63 64 private MessageDrivenContext ctx = null; 65 private QueueConnection queConn; 66 private QueueSession session; 67 private QueueSender sender; 68 69 private static synchronized int incActiveCount() 70 { 71 return activeCount ++; 72 } 73 private static synchronized int decActiveCount() 74 { 75 return activeCount --; 76 } 77 78 @Resource public void setMessageDrivenContext(MessageDrivenContext ctx) 79 throws EJBException 80 { 81 System.out.println("setMessageDrivenContext()"); 82 this.ctx = ctx; 83 try 84 { 85 InitialContext iniCtx = new InitialContext (); 86 QueueConnectionFactory factory = (QueueConnectionFactory ) iniCtx.lookup("java:/ConnectionFactory"); 87 queConn = factory.createQueueConnection(); 88 session = queConn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); 89 Queue queue = (Queue ) iniCtx.lookup("queue/queueB"); 90 sender = session.createSender(queue); 91 } 92 catch(Exception e) 93 { 94 System.out.println("Setup failure"); 95 e.printStackTrace(); 96 throw new EJBException ("Setup failure", e); 97 } 98 } 99 100 public void ejbCreate() 101 { 102 } 103 104 public void ejbRemove() 105 { 106 try 107 { 108 if( sender != null ) 109 sender.close(); 110 if( session != null ) 111 session.close(); 112 if( queConn != null ) 113 queConn.close(); 114 } 115 catch(Exception e) 116 { 117 System.out.println("Failed to close JMS resources"); 118 e.printStackTrace(); 119 } 120 } 121 122 public void onMessage(Message message) 123 { 124 int count = incActiveCount(); 125 System.out.println("Begin onMessage, activeCount="+count+", ctx="+ctx); 126 try 127 { 128 Message reply = null; 129 if( count > maxActiveCount ) 130 { 131 String msg = "IllegalState, activeCount > maxActiveCount, " 132 + count + " > " + maxActiveCount; 133 Exception e = new IllegalStateException (msg); 135 reply = session.createObjectMessage(e); 136 } 137 else 138 { 139 TextMessage tm = (TextMessage ) message; 140 reply = session.createTextMessage("Recevied msg="+tm.getText()); 142 } 143 Thread.currentThread().sleep(1000); 144 sender.send(reply); 145 } 146 catch(JMSException e) 147 { 148 System.out.println("Failed to send error message"); 149 e.printStackTrace(); 150 } 151 catch(InterruptedException e) 152 { 153 } 154 finally 155 { 156 count = decActiveCount(); 157 System.out.println("End onMessage, activeCount="+count+", ctx="+ctx); 158 } 159 } 160 } 161 | Popular Tags |