1 22 package org.jboss.test.timer.ejb; 23 24 import java.io.Serializable ; 25 import javax.ejb.EJBException ; 26 import javax.ejb.MessageDrivenBean ; 27 import javax.ejb.MessageDrivenContext ; 28 import javax.ejb.TimedObject ; 29 import javax.ejb.Timer ; 30 import javax.ejb.TimerService ; 31 import javax.jms.DeliveryMode ; 32 import javax.jms.JMSException ; 33 import javax.jms.Message ; 34 import javax.jms.MessageListener ; 35 import javax.jms.Queue ; 36 import javax.jms.QueueConnection ; 37 import javax.jms.QueueConnectionFactory ; 38 import javax.jms.QueueSender ; 39 import javax.jms.QueueSession ; 40 import javax.jms.Session ; 41 import javax.jms.TextMessage ; 42 import javax.naming.InitialContext ; 43 44 import org.jboss.logging.Logger; 45 46 60 public class TimerMessageBean implements MessageDrivenBean , MessageListener , TimedObject 61 { 62 private static Logger log = Logger.getLogger(TimerMessageBean.class); 63 private MessageDrivenContext messageContext = null; 64 private QueueConnection qc = null; 65 private InitialContext ctx = null; 66 private long timerTimeout = 10000; 67 68 static class ReplyInfo implements Serializable 69 { 70 private int msgID; 71 private Queue replyTo; 72 ReplyInfo(int msgID, Queue replyTo) 73 { 74 this.msgID = msgID; 75 this.replyTo = replyTo; 76 } 77 } 78 79 public void setMessageDrivenContext(MessageDrivenContext ctx) 80 throws EJBException 81 { 82 messageContext = ctx; 83 } 84 85 public void ejbCreate() 86 { 87 try 88 { 89 ctx = new InitialContext (); 90 QueueConnectionFactory qcf = (QueueConnectionFactory ) ctx.lookup("java:comp/env/jms/QCF"); 91 qc = qcf.createQueueConnection(); 92 } 93 catch (Exception e) 94 { 95 throw new EJBException ("ejbCreate failed", e); 96 } 97 } 98 99 public void ejbTimeout(Timer timer) 100 { 101 log.info("ejbTimeout(), timer: " + timer); 102 ReplyInfo info = (ReplyInfo) timer.getInfo(); 103 try 104 { 105 sendReply("ejbTimeout", info.msgID, info.replyTo); 106 } 107 catch(Exception e) 108 { 109 log.error("Failed to send timer msg", e); 110 } 111 } 112 113 public void ejbRemove() throws EJBException 114 { 115 try 116 { 117 qc.close(); 118 log.info("QueueConnection is closed."); 119 } 120 catch (JMSException e) 121 { 122 log.error("Failed to close connection", e); 123 } 124 } 125 126 public void onMessage(Message message) 127 { 128 try 129 { 130 TextMessage msg = (TextMessage ) message; 131 log.info("onMessage() called, msg="+msg); 132 int msgID = msg.getIntProperty("UNIQUE_ID"); 133 Queue replyTo = (Queue ) message.getJMSReplyTo(); 134 initTimer(msgID, replyTo); 135 sendReply("onMessage", msgID, replyTo); 136 } 137 catch (Exception e) 138 { 139 log.error("onMessage failure", e); 140 } 141 } 142 143 public void initTimer(int msgID, Queue replyTo) 144 { 145 try 146 { 147 TimerService ts = messageContext.getTimerService(); 148 ReplyInfo info = new ReplyInfo(msgID, replyTo); 149 Timer timer = ts.createTimer(timerTimeout, info); 150 log.info("Timer created with a timeout: " + timerTimeout 151 + " and with info: " + msgID 152 + ", handle: "+timer.getHandle()); 153 } 154 catch (Exception e) 155 { 156 log.info("Failed to init timer", e); 157 } 158 return; 159 160 } 161 162 private void sendReply(String msg, int msgID, Queue dest) 163 throws JMSException 164 { 165 QueueSession qs = null; 166 try 167 { 168 qs = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 169 QueueSender sender = qs.createSender(dest); 170 TextMessage reply = qs.createTextMessage(); 171 reply.setText(msg + " : " + msgID); 172 reply.setIntProperty("UNIQUE_ID", msgID); 173 sender.send(reply, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, 180000); 174 log.info("Message sent"); 175 } 176 finally 177 { 178 try 179 { 180 qs.close(); 181 log.info("JBossMQ QueueSession Closed"); 182 } 183 catch (JMSException e) 184 { 185 log.error("Failed to close queue session", e); 186 } 187 } 188 } 189 } 190 | Popular Tags |