1 22 package org.jboss.ejb3.test.strictpool; 23 24 import javax.annotation.Resource; 25 import javax.ejb.ActivationConfigProperty ; 26 import javax.ejb.EJBException ; 27 import javax.ejb.MessageDriven ; 28 import javax.ejb.MessageDrivenContext ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageListener ; 32 import javax.jms.Queue ; 33 import javax.jms.QueueConnection ; 34 import javax.jms.QueueConnectionFactory ; 35 import javax.jms.QueueSender ; 36 import javax.jms.QueueSession ; 37 import javax.jms.TextMessage ; 38 import javax.naming.InitialContext ; 39 40 import org.jboss.annotation.ejb.PoolClass; 41 import org.jboss.annotation.ejb.PoolClass; 42 43 47 @MessageDriven (activationConfig = 48 { 49 @ActivationConfigProperty (propertyName="destinationType", propertyValue="javax.jms.Queue"), 50 @ActivationConfigProperty (propertyName="destination", propertyValue="queue/overrideQueueA"), 51 @ActivationConfigProperty (propertyName="maxMessages", propertyValue="10"), 52 @ActivationConfigProperty (propertyName="minSession", propertyValue="10"), 53 @ActivationConfigProperty (propertyName="maxSession", propertyValue="10") 54 }) 55 @PoolClass (value=org.jboss.ejb3.test.strictpool.BogusPool.class, maxSize=0, timeout=0) 56 public class OverrideStrictlyPooledMDB implements MessageListener 57 { 58 59 public static final int maxActiveCount = 2; 60 61 private static int activeCount; 62 63 private MessageDrivenContext ctx = null; 64 private QueueConnection queConn; 65 private QueueSession session; 66 private QueueSender sender; 67 68 private static synchronized int incActiveCount() 69 { 70 return activeCount ++; 71 } 72 private static synchronized int decActiveCount() 73 { 74 return activeCount --; 75 } 76 77 @Resource public void setMessageDrivenContext(MessageDrivenContext ctx) 78 throws EJBException 79 { 80 System.out.println("setMessageDrivenContext()"); 81 this.ctx = ctx; 82 try 83 { 84 InitialContext iniCtx = new InitialContext (); 85 QueueConnectionFactory factory = (QueueConnectionFactory ) iniCtx.lookup("java:/ConnectionFactory"); 86 queConn = factory.createQueueConnection(); 87 session = queConn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); 88 Queue queue = (Queue ) iniCtx.lookup("queue/overrideQueueB"); 89 sender = session.createSender(queue); 90 } 91 catch(Exception e) 92 { 93 System.out.println("Setup failure"); 94 e.printStackTrace(); 95 throw new EJBException ("Setup failure", e); 96 } 97 } 98 99 public void ejbCreate() 100 { 101 } 102 103 public void ejbRemove() 104 { 105 try 106 { 107 if( sender != null ) 108 sender.close(); 109 if( session != null ) 110 session.close(); 111 if( queConn != null ) 112 queConn.close(); 113 } 114 catch(Exception e) 115 { 116 System.out.println("Failed to close JMS resources"); 117 e.printStackTrace(); 118 } 119 } 120 121 public void onMessage(Message message) 122 { 123 int count = incActiveCount(); 124 System.out.println("Begin onMessage, activeCount="+count+", ctx="+ctx); 125 try 126 { 127 Message reply = null; 128 if( count > maxActiveCount ) 129 { 130 String msg = "IllegalState, activeCount > maxActiveCount, " 131 + count + " > " + maxActiveCount; 132 Exception e = new IllegalStateException (msg); 134 reply = session.createObjectMessage(e); 135 } 136 else 137 { 138 TextMessage tm = (TextMessage ) message; 139 reply = session.createTextMessage("Recevied msg="+tm.getText()); 141 } 142 Thread.currentThread().sleep(1000); 143 sender.send(reply); 144 } 145 catch(JMSException e) 146 { 147 System.out.println("Failed to send error message"); 148 e.printStackTrace(); 149 } 150 catch(InterruptedException e) 151 { 152 } 153 finally 154 { 155 count = decActiveCount(); 156 System.out.println("End onMessage, activeCount="+count+", ctx="+ctx); 157 } 158 } 159 } 160 | Popular Tags |