1 17 18 package org.apache.geronimo.console.core.jms; 19 20 import java.util.ArrayList ; 21 import java.util.List ; 22 23 import javax.jms.Message ; 24 import javax.jms.QueueSession ; 25 import javax.jms.Topic ; 26 import javax.jms.TopicConnection ; 27 import javax.jms.TopicConnectionFactory ; 28 import javax.jms.TopicSession ; 29 import javax.jms.TopicSubscriber ; 30 import javax.management.MalformedObjectNameException ; 31 import javax.management.ObjectName ; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.geronimo.connector.AdminObjectWrapper; 36 import org.apache.geronimo.gbean.GBeanInfo; 37 import org.apache.geronimo.gbean.GBeanInfoBuilder; 38 import org.apache.geronimo.gbean.GBeanLifecycle; 39 import org.apache.geronimo.gbean.WaitingException; 40 import org.apache.geronimo.kernel.Kernel; 41 import org.apache.geronimo.kernel.KernelRegistry; 42 import org.apache.geronimo.kernel.management.State; 43 44 public class TopicBrowserGBean implements GBeanLifecycle, Runnable { 45 46 private static Log log = LogFactory.getLog(TopicBrowserGBean.class); 47 48 private static Kernel kernel = KernelRegistry.getSingleKernel(); 49 50 static { 51 try { 52 ACTIVEMQ_CONTAINER_OBJNAME = ObjectName 53 .getInstance("geronimo.server:J2EEApplication=null,J2EEModule=org/apache/geronimo/ActiveMQServer,J2EEServer=geronimo,j2eeType=JMSServer,name=ActiveMQl"); 54 ACTIVEMQ_CONNECTOR_OBJNAME = ObjectName 55 .getInstance("geronimo.server:J2EEApplication=null,J2EEServer=geronimo,JCAResource=org/apache/geronimo/SystemJMS,j2eeType=JCAManagedConnectionFactory,name=DefaultActiveMQConnectionFactory"); 56 } catch (MalformedObjectNameException moe) { 57 log.warn("Could not initialize ObjectName", moe); 58 } 59 } 60 61 private static ObjectName ACTIVEMQ_CONTAINER_OBJNAME; 62 63 private static ObjectName ACTIVEMQ_CONNECTOR_OBJNAME; 64 65 String subscriberName; 66 67 TopicConnectionFactory tConFactory; 68 69 TopicConnection tConnection; 70 71 AdminObjectWrapper connectionFactoryWrapper, topicWrapper; 72 73 TopicSession tSession; 74 75 TopicSubscriber tSubscriber; 76 77 Topic topic; 78 79 Thread t; 80 81 boolean stop; 82 83 public void run() { 84 try { 85 tConFactory = (TopicConnectionFactory ) connectionFactoryWrapper 86 .$getResource(); 87 topic = (Topic ) topicWrapper.$getResource(); 88 tConnection = tConFactory.createTopicConnection(); 89 tConnection.setClientID(subscriberName); 90 tSession = tConnection.createTopicSession(false, 91 QueueSession.AUTO_ACKNOWLEDGE); 92 tSubscriber = tSession.createDurableSubscriber(topic, 93 subscriberName); 94 tConnection.start(); 95 while (!stop) { 96 Thread.yield(); 97 } 98 if (tSession != null) { 99 tSession.close(); 100 } 101 if (tConnection != null) { 102 if (((Integer ) kernel.getAttribute(ACTIVEMQ_CONTAINER_OBJNAME, 106 "state")).intValue() == State.RUNNING_INDEX 107 && ((Integer ) kernel.getAttribute( 108 ACTIVEMQ_CONNECTOR_OBJNAME, "state")) 109 .intValue() == State.RUNNING_INDEX) { 110 tConnection.close(); 111 } 112 } 113 } catch (Exception e) { 114 throw new RuntimeException (e); 115 } 116 t = null; 117 log.debug("Worker thread stopped."); 118 } 119 120 public TopicBrowserGBean(String subscriberName, 121 AdminObjectWrapper connectionFactoryWrapper, 122 AdminObjectWrapper topicWrapper) { 123 this.subscriberName = subscriberName + "@" + this.getClass().getName(); 124 this.connectionFactoryWrapper = connectionFactoryWrapper; 125 this.topicWrapper = topicWrapper; 126 } 127 128 133 public void doStart() throws WaitingException, Exception { 134 t = new Thread (this); 135 t.start(); 136 log.debug("Subscribed to topic."); 137 } 138 139 144 public void doStop() throws WaitingException, Exception { 145 stop = true; 146 log.debug("Unsubscribed to topic."); 147 } 148 149 public void doFail() { 150 stop = true; 151 log.warn("GBean failed."); 152 } 153 154 162 public List getMessages() throws Exception { 163 List ret = new ArrayList (); 164 Message m = null; 165 do { 166 m = tSubscriber.receiveNoWait(); 167 if (m != null) { 168 ret.add(m); 169 } 170 } while (m != null); 171 return ret; 172 } 173 174 177 public void unsubscribe() throws Exception { 178 if (tSubscriber != null) { 179 tSubscriber.close(); 180 if (tSession != null) { 181 tSession.unsubscribe(subscriberName); 182 log.debug(subscriberName + " unsubscribed from Topic " + topic.getTopicName() + "."); 183 } 184 } 185 } 186 187 public static final GBeanInfo GBEAN_INFO; 188 189 static { 190 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic("Topic Browser GBean", TopicBrowserGBean.class); 191 infoFactory.addAttribute("subscriberName", String .class, true); 192 193 infoFactory.addReference("ConnectionFactoryWrapper", 194 AdminObjectWrapper.class); 195 infoFactory.addReference("TopicWrapper", AdminObjectWrapper.class); 196 197 infoFactory.addOperation("getMessages"); 198 infoFactory.addOperation("unsubscribe"); 199 200 infoFactory.setConstructor(new String [] { "subscriberName", 201 "ConnectionFactoryWrapper", "TopicWrapper" }); 202 203 GBEAN_INFO = infoFactory.getBeanInfo(); 204 } 205 206 public static GBeanInfo getGBeanInfo() { 207 return GBEAN_INFO; 208 } 209 210 } 211 | Popular Tags |