1 22 package org.jboss.test.timer.ejb; 23 24 import java.io.Serializable ; 25 import java.util.Date ; 26 import javax.ejb.EJBException ; 27 import javax.ejb.MessageDrivenBean ; 28 import javax.ejb.MessageDrivenContext ; 29 import javax.ejb.TimedObject ; 30 import javax.ejb.Timer ; 31 import javax.ejb.TimerService ; 32 import javax.jms.DeliveryMode ; 33 import javax.jms.JMSException ; 34 import javax.jms.Message ; 35 import javax.jms.MessageListener ; 36 import javax.jms.Queue ; 37 import javax.jms.QueueConnection ; 38 import javax.jms.QueueConnectionFactory ; 39 import javax.jms.QueueSender ; 40 import javax.jms.QueueSession ; 41 import javax.jms.Session ; 42 import javax.jms.TextMessage ; 43 import javax.naming.InitialContext ; 44 45 import org.jboss.logging.Logger; 46 47 62 public class OnCreateTimerMessageBean implements MessageDrivenBean , MessageListener , TimedObject 63 { 64 private static Logger log = Logger.getLogger(OnCreateTimerMessageBean.class); 65 private MessageDrivenContext messageContext = null; 66 private QueueConnection qc = null; 67 private InitialContext ctx = null; 68 private TimerService ts; 69 70 static class ReplyInfo implements Serializable 71 { 72 private int msgID; 73 private Queue replyTo; 74 private Date first; 75 private Date last; 76 ReplyInfo(int msgID, Queue replyTo, Date first, Date last) 77 { 78 this.msgID = msgID; 79 this.replyTo = replyTo; 80 this.first = first; 81 this.last = last; 82 } 83 boolean cancel(Date next) 84 { 85 return last.compareTo(next) < 0; 86 } 87 long getElapsed() 88 { 89 return System.currentTimeMillis() - first.getTime(); 90 } 91 } 92 93 public void setMessageDrivenContext(MessageDrivenContext ctx) 94 throws EJBException 95 { 96 messageContext = ctx; 97 } 98 99 public void ejbCreate() 100 { 101 try 102 { 103 ctx = new InitialContext (); 104 QueueConnectionFactory qcf = (QueueConnectionFactory ) ctx.lookup("java:comp/env/jms/QCF"); 105 qc = qcf.createQueueConnection(); 106 ts = messageContext.getTimerService(); 107 } 108 catch (Exception e) 109 { 110 log.error("Failed to init timer", e); 111 throw new EJBException ("ejbCreate failed", e); 112 } 113 } 114 115 public void ejbTimeout(Timer timer) 116 { 117 log.info("ejbTimeout(), timer: " + timer); 118 ReplyInfo info = (ReplyInfo) timer.getInfo(); 119 Date next = timer.getNextTimeout(); 120 if( info.cancel(next) ) 121 { 122 log.info("Cancelling timer"); 123 timer.cancel(); 124 } 125 126 try 127 { 128 long elapsed = info.getElapsed(); 129 sendReply("ejbTimeout", info.msgID, elapsed, info.replyTo); 130 } 131 catch(Exception e) 132 { 133 log.error("Failed to send timer msg", e); 134 } 135 } 136 137 public void ejbRemove() throws EJBException 138 { 139 try 140 { 141 qc.close(); 142 log.info("QueueConnection is closed."); 143 } 144 catch (JMSException e) 145 { 146 log.error("Failed to close connection", e); 147 } 148 } 149 150 public void onMessage(Message message) 151 { 152 try 153 { 154 TextMessage msg = (TextMessage ) message; 155 log.info("onMessage() called, msg="+msg); 156 int msgID = msg.getIntProperty("UNIQUE_ID"); 157 Queue replyTo = (Queue ) message.getJMSReplyTo(); 158 sendReply("onMessage", msgID, 0, replyTo); 159 this.initTimer(msgID, replyTo); 161 } 162 catch (Exception e) 163 { 164 log.error("onMessage failure", e); 165 } 166 } 167 168 public void initTimer(int msgID, Queue replyTo) 169 { 170 try 171 { 172 Date first = new Date (System.currentTimeMillis() + 1000); 173 Date last = new Date (System.currentTimeMillis() + 11000); 174 ReplyInfo info = new ReplyInfo(msgID, replyTo, first, last); 175 Timer timer = ts.createTimer(first, 1000, info); 176 log.info("Timer created with a timeout: " + first 177 + " and with info: " + msgID 178 + ", handle: "+timer.getHandle()); 179 } 180 catch (Exception e) 181 { 182 log.info("Failed to init timer", e); 183 } 184 return; 185 } 186 187 private void sendReply(String msg, int msgID, long elapsed, Queue dest) 188 throws JMSException 189 { 190 QueueSession qs = null; 191 try 192 { 193 qs = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 194 QueueSender sender = qs.createSender(dest); 195 TextMessage reply = qs.createTextMessage(); 196 reply.setText(msg + " : " + msgID); 197 reply.setIntProperty("UNIQUE_ID", msgID); 198 reply.setLongProperty("Elapsed", elapsed); 199 sender.send(reply, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, 180000); 200 log.info("Message sent"); 201 } 202 finally 203 { 204 try 205 { 206 qs.close(); 207 log.info("JBossMQ QueueSession Closed"); 208 } 209 catch (JMSException e) 210 { 211 log.error("Failed to close queue session", e); 212 } 213 } 214 } 215 } 216 | Popular Tags |