1 18 package org.apache.activemq.console.command; 19 20 import org.apache.activemq.console.formatter.GlobalWriter; 21 import org.apache.activemq.console.util.JmxMBeansUtil; 22 23 import javax.management.ObjectInstance ; 24 import javax.management.ObjectName ; 25 import javax.management.MBeanServerConnection ; 26 import javax.management.openmbean.CompositeData ; 27 import javax.management.remote.JMXConnector ; 28 import java.util.List ; 29 import java.util.StringTokenizer ; 30 import java.util.ArrayList ; 31 import java.util.Iterator ; 32 33 public class PurgeCommand extends AbstractJmxCommand { 34 35 private final List queryAddObjects = new ArrayList (10); 36 private final List querySubObjects = new ArrayList (10); 37 38 43 protected void runTask(List tokens) throws Exception { 44 try { 45 if (tokens.isEmpty()) { 47 tokens.add("*"); 48 } 49 50 for (Iterator i=tokens.iterator(); i.hasNext();) { 52 List queueList = JmxMBeansUtil.queryMBeans(useJmxServiceUrl(), "Type=Queue,Destination=" + i.next() + ",*"); 53 54 for (Iterator j=queueList.iterator(); j.hasNext();) { 55 ObjectName queueName = ((ObjectInstance )j.next()).getObjectName(); 56 if (queryAddObjects.isEmpty()) { 57 purgeQueue(queueName); 58 } else { 59 List messages = JmxMBeansUtil.createMessageQueryFilter(useJmxServiceUrl(), queueName).query(queryAddObjects); 60 purgeMessages(queueName, messages); 61 } 62 } 63 } 64 } catch (Exception e) { 65 GlobalWriter.printException(new RuntimeException ("Failed to execute purge task. Reason: " + e)); 66 throw new Exception (e); 67 } 68 } 69 70 75 public void purgeQueue(ObjectName queue) throws Exception { 76 JMXConnector conn = createJmxConnector(); 77 MBeanServerConnection server = conn.getMBeanServerConnection(); 78 GlobalWriter.printInfo("Purging all messages in queue: " + queue.getKeyProperty("Destination")); 79 server.invoke(queue, "purge", new Object [] {}, new String [] {}); 80 conn.close(); 81 } 82 83 89 public void purgeMessages(ObjectName queue, List messages) throws Exception { 90 JMXConnector conn = createJmxConnector(); 91 MBeanServerConnection server = conn.getMBeanServerConnection(); 92 93 Object [] param = new Object [1]; 94 for (Iterator i=messages.iterator(); i.hasNext();) { 95 CompositeData msg = (CompositeData )i.next(); 96 param[0] = "" + msg.get("JMSMessageID"); 97 GlobalWriter.printInfo("Removing message: " + param[0] + " from queue: " + queue.getKeyProperty("Destination")); 98 server.invoke(queue, "removeMessage", param, new String [] {"java.lang.String"}); 99 } 100 101 conn.close(); 102 } 103 104 110 protected void handleOption(String token, List tokens) throws Exception { 111 if (token.startsWith("--msgsel")) { 113 114 if (tokens.isEmpty() || ((String )tokens.get(0)).startsWith("-")) { 116 GlobalWriter.printException(new IllegalArgumentException ("Message selector not specified")); 117 return; 118 } 119 120 StringTokenizer queryTokens = new StringTokenizer ((String )tokens.remove(0), COMMAND_OPTION_DELIMETER); 121 while (queryTokens.hasMoreTokens()) { 122 queryAddObjects.add(queryTokens.nextToken()); 123 } 124 } 125 126 else if (token.startsWith("--xmsgsel")) { 128 129 if (tokens.isEmpty() || ((String )tokens.get(0)).startsWith("-")) { 131 GlobalWriter.printException(new IllegalArgumentException ("Message selector not specified")); 132 return; 133 } 134 135 StringTokenizer queryTokens = new StringTokenizer ((String )tokens.remove(0), COMMAND_OPTION_DELIMETER); 136 while (queryTokens.hasMoreTokens()) { 137 querySubObjects.add(queryTokens.nextToken()); 138 } 139 140 } 141 142 else { 144 super.handleOption(token, tokens); 145 } 146 } 147 148 151 protected void printHelp() { 152 GlobalWriter.printHelp(helpFile); 153 } 154 155 protected String [] helpFile = new String [] { 156 "Task Usage: Main purge [browse-options] <destinations>", 157 "Description: Delete selected destination's messages that matches the message selector.", 158 "", 159 "Browse Options:", 160 " --msgsel <msgsel1,msglsel2> Add to the search list messages matched by the query similar to", 161 " the messages selector format.", 162 " --jmxurl <url> Set the JMX URL to connect to.", 163 " --version Display the version information.", 164 " -h,-?,--help Display the browse broker help information.", 165 "", 166 "Examples:", 167 " Main purge FOO.BAR", 168 " - Delete all the messages in queue FOO.BAR", 169 170 " Main purge --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.*", 171 " - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in", 172 " the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the", 173 " queue FOO.BAR", 174 " * To use wildcard queries, the field must be a string and the query enclosed in ''", 175 "", 176 }; 177 } 178 | Popular Tags |