1 18 package org.apache.activemq.broker.jmx; 19 20 import java.util.ArrayList ; 21 import java.util.Collections ; 22 import java.util.Iterator ; 23 import java.util.List ; 24 import java.util.Map ; 25 26 import javax.jms.Connection ; 27 import javax.jms.InvalidSelectorException ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Session ; 30 import javax.management.openmbean.CompositeData ; 31 import javax.management.openmbean.CompositeDataSupport ; 32 import javax.management.openmbean.CompositeType ; 33 import javax.management.openmbean.OpenDataException ; 34 import javax.management.openmbean.TabularData ; 35 import javax.management.openmbean.TabularDataSupport ; 36 import javax.management.openmbean.TabularType ; 37 38 import org.apache.activemq.ActiveMQConnectionFactory; 39 import org.apache.activemq.broker.BrokerService; 40 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 41 import org.apache.activemq.broker.region.Destination; 42 import org.apache.activemq.command.ActiveMQDestination; 43 import org.apache.activemq.command.ActiveMQMessage; 44 import org.apache.activemq.command.ActiveMQTextMessage; 45 import org.apache.activemq.command.Message; 46 import org.apache.activemq.filter.BooleanExpression; 47 import org.apache.activemq.filter.MessageEvaluationContext; 48 import org.apache.activemq.selector.SelectorParser; 49 import org.apache.commons.logging.Log; 50 import org.apache.commons.logging.LogFactory; 51 52 public class DestinationView implements DestinationViewMBean { 53 private static final Log log = LogFactory.getLog(DestinationViewMBean.class); 54 protected final Destination destination; 55 protected final ManagedRegionBroker broker; 56 57 public DestinationView(ManagedRegionBroker broker, Destination destination){ 58 this.broker = broker; 59 this.destination=destination; 60 } 61 62 63 public void gc() { 64 destination.gc(); 65 } 66 67 public String getName() { 68 return destination.getName(); 69 } 70 71 public void resetStatistics() { 72 destination.getDestinationStatistics().reset(); 73 } 74 75 public long getEnqueueCount() { 76 return destination.getDestinationStatistics().getEnqueues().getCount(); 77 } 78 79 public long getDequeueCount() { 80 return destination.getDestinationStatistics().getDequeues().getCount(); 81 } 82 83 public long getDispatchCount() { 84 return destination.getDestinationStatistics().getDispatched().getCount(); 85 } 86 87 public long getConsumerCount() { 88 return destination.getDestinationStatistics().getConsumers().getCount(); 89 } 90 91 public long getQueueSize() { 92 return destination.getDestinationStatistics().getMessages().getCount(); 93 } 94 95 public long getMessagesCached() { 96 return destination.getDestinationStatistics().getMessagesCached().getCount(); 97 } 98 99 public int getMemoryPercentageUsed() { 100 return destination.getUsageManager().getPercentUsage(); 101 } 102 103 public long getMemoryLimit() { 104 return destination.getUsageManager().getLimit(); 105 } 106 107 public void setMemoryLimit(long limit) { 108 destination.getUsageManager().setLimit(limit); 109 } 110 111 public CompositeData [] browse() throws OpenDataException { 112 try { 113 return browse(null); 114 } catch (InvalidSelectorException e) { 115 throw new RuntimeException (e); 117 } 118 } 119 120 public CompositeData [] browse(String selector) throws OpenDataException , InvalidSelectorException { 121 Message[] messages=destination.browse(); 122 ArrayList c = new ArrayList (); 123 124 MessageEvaluationContext ctx = new MessageEvaluationContext(); 125 ctx.setDestination(destination.getActiveMQDestination()); 126 BooleanExpression selectorExpression = selector==null ? null : new SelectorParser().parse(selector); 127 128 for(int i=0;i<messages.length;i++){ 129 try{ 130 131 if( selectorExpression==null ) { 132 c.add(OpenTypeSupport.convert(messages[i])); 133 } else { 134 ctx.setMessageReference(messages[i]); 135 if ( selectorExpression.matches(ctx) ) { 136 c.add(OpenTypeSupport.convert(messages[i])); 137 } 138 } 139 140 } catch(Throwable e) { 141 log.warn("exception browsing destination",e); 142 } 143 } 144 145 CompositeData rc[]=new CompositeData [c.size()]; 146 c.toArray(rc); 147 return rc; 148 } 149 150 153 public List browseMessages() throws InvalidSelectorException { 154 return browseMessages(null); 155 } 156 157 160 public List browseMessages(String selector) throws InvalidSelectorException { 161 Message[] messages = destination.browse(); 162 ArrayList answer = new ArrayList (); 163 164 MessageEvaluationContext ctx = new MessageEvaluationContext(); 165 ctx.setDestination(destination.getActiveMQDestination()); 166 BooleanExpression selectorExpression = selector == null ? null : new SelectorParser().parse(selector); 167 168 for (int i = 0; i < messages.length; i++) { 169 try { 170 Message message = messages[i]; 171 if (selectorExpression == null) { 172 answer.add(OpenTypeSupport.convert(message)); 173 } 174 else { 175 ctx.setMessageReference(message); 176 if (selectorExpression.matches(ctx)) { 177 answer.add(message); 178 } 179 } 180 181 } 182 catch (Throwable e) { 183 log.warn("exception browsing destination",e); 184 } 185 } 186 return answer; 187 } 188 189 public TabularData browseAsTable() throws OpenDataException { 190 try { 191 return browseAsTable(null); 192 } catch (InvalidSelectorException e) { 193 throw new RuntimeException (e); 194 } 195 } 196 197 public TabularData browseAsTable(String selector) throws OpenDataException , InvalidSelectorException { 198 OpenTypeFactory factory=OpenTypeSupport.getFactory(ActiveMQMessage.class); 199 Message[] messages=destination.browse(); 200 CompositeType ct=factory.getCompositeType(); 201 TabularType tt=new TabularType ("MessageList","MessageList",ct,new String [] { "JMSMessageID" }); 202 TabularDataSupport rc=new TabularDataSupport (tt); 203 204 205 MessageEvaluationContext ctx = new MessageEvaluationContext(); 206 ctx.setDestination(destination.getActiveMQDestination()); 207 BooleanExpression selectorExpression = selector==null ? null : new SelectorParser().parse(selector); 208 209 for(int i=0;i<messages.length;i++){ 210 try { 211 if( selectorExpression==null ) { 212 rc.put(new CompositeDataSupport (ct,factory.getFields(messages[i]))); 213 } else { 214 ctx.setMessageReference(messages[i]); 215 if ( selectorExpression.matches(ctx) ) { 216 rc.put(new CompositeDataSupport (ct,factory.getFields(messages[i]))); 217 } 218 } 219 } catch(Throwable e) { 220 log.warn("exception browsing destination",e); 221 } 222 } 223 224 return rc; 225 } 226 227 public String sendTextMessage(String body) throws Exception { 228 return sendTextMessage(Collections.EMPTY_MAP, body); 229 } 230 231 public String sendTextMessage(Map headers, String body) throws Exception { 232 233 String brokerUrl = "vm://"+broker.getBrokerName(); 234 ActiveMQDestination dest = destination.getActiveMQDestination(); 235 236 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl); 237 Connection connection = null; 238 try { 239 240 connection = cf.createConnection(); 241 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 242 MessageProducer producer = session.createProducer(dest); 243 ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body); 244 245 for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) { 246 Map.Entry entry = (Map.Entry ) iter.next(); 247 msg.setObjectProperty((String ) entry.getKey(), entry.getValue()); 248 } 249 250 producer.setDeliveryMode(msg.getJMSDeliveryMode()); 251 producer.setPriority(msg.getPriority()); 252 long ttl = msg.getExpiration() - System.currentTimeMillis(); 253 producer.setTimeToLive(ttl > 0 ? ttl : 0); 254 producer.send(msg); 255 256 return msg.getJMSMessageID(); 257 258 } finally { 259 connection.close(); 260 } 261 262 } 263 264 } 265 | Popular Tags |