1 18 package org.apache.activemq.broker.util; 19 20 import java.io.DataOutputStream ; 21 import java.io.IOException ; 22 import java.net.DatagramPacket ; 23 import java.net.DatagramSocket ; 24 import java.net.InetAddress ; 25 import java.net.InetSocketAddress ; 26 import java.net.SocketAddress ; 27 import java.net.URI ; 28 import java.net.URISyntaxException ; 29 import java.net.UnknownHostException ; 30 31 import org.apache.activemq.broker.BrokerPluginSupport; 32 import org.apache.activemq.broker.ConnectionContext; 33 import org.apache.activemq.broker.ConsumerBrokerExchange; 34 import org.apache.activemq.broker.ProducerBrokerExchange; 35 import org.apache.activemq.broker.region.Subscription; 36 import org.apache.activemq.command.ActiveMQDestination; 37 import org.apache.activemq.command.BrokerId; 38 import org.apache.activemq.command.ConnectionInfo; 39 import org.apache.activemq.command.ConsumerInfo; 40 import org.apache.activemq.command.DataStructure; 41 import org.apache.activemq.command.DestinationInfo; 42 import org.apache.activemq.command.JournalTrace; 43 import org.apache.activemq.command.Message; 44 import org.apache.activemq.command.MessageAck; 45 import org.apache.activemq.command.MessageDispatch; 46 import org.apache.activemq.command.MessageDispatchNotification; 47 import org.apache.activemq.command.MessagePull; 48 import org.apache.activemq.command.ProducerInfo; 49 import org.apache.activemq.command.RemoveSubscriptionInfo; 50 import org.apache.activemq.command.Response; 51 import org.apache.activemq.command.SessionInfo; 52 import org.apache.activemq.command.TransactionId; 53 import org.apache.activemq.command.TransactionInfo; 54 import org.apache.activemq.openwire.OpenWireFormatFactory; 55 import org.apache.activemq.util.ByteArrayOutputStream; 56 import org.apache.activemq.util.ByteSequence; 57 import org.apache.activemq.wireformat.WireFormat; 58 import org.apache.activemq.wireformat.WireFormatFactory; 59 import org.apache.commons.logging.Log; 60 import org.apache.commons.logging.LogFactory; 61 62 69 public class UDPTraceBrokerPlugin extends BrokerPluginSupport { 70 71 static final private Log log = LogFactory.getLog(UDPTraceBrokerPlugin.class); 72 protected WireFormat wireFormat; 73 protected WireFormatFactory wireFormatFactory; 74 protected int maxTraceDatagramSize = 1024*4; 75 protected URI destination; 76 protected DatagramSocket socket; 77 78 protected BrokerId brokerId; 79 protected SocketAddress address; 80 protected boolean broadcast; 81 82 public UDPTraceBrokerPlugin() { 83 try { 84 destination = new URI ("udp://127.0.0.1:61616"); 85 } catch (URISyntaxException wontHappen) { 86 } 87 } 88 89 public void start() throws Exception { 90 super.start(); 91 if( getWireFormat() == null ) 92 throw new IllegalArgumentException ("Wireformat must be specifed."); 93 if( address == null ) { 94 address = createSocketAddress(destination); 95 } 96 socket = createSocket(); 97 98 brokerId = super.getBrokerId(); 99 trace(new JournalTrace("START")); 100 } 101 102 protected DatagramSocket createSocket() throws IOException { 103 DatagramSocket s = new DatagramSocket (); 104 s.setSendBufferSize(maxTraceDatagramSize); 105 s.setBroadcast(broadcast); 106 return s; 107 } 108 109 public void stop() throws Exception { 110 trace(new JournalTrace("STOP")); 111 socket.close(); 112 super.stop(); 113 } 114 115 private void trace(DataStructure command) { 116 try { 117 118 ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize); 119 DataOutputStream out = new DataOutputStream (baos); 120 wireFormat.marshal(brokerId, out); 121 wireFormat.marshal(command, out); 122 out.close(); 123 ByteSequence sequence = baos.toByteSequence(); 124 DatagramPacket datagram = new DatagramPacket ( sequence.getData(), sequence.getOffset(), sequence.getLength(), address); 125 socket.send(datagram); 126 127 } catch ( Throwable e) { 128 log.debug("Failed to trace: "+command, e); 129 } 130 } 131 132 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 133 trace(messageSend); 134 super.send(producerExchange, messageSend); 135 } 136 137 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 138 trace(ack); 139 super.acknowledge(consumerExchange, ack); 140 } 141 142 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 143 trace(info); 144 super.addConnection(context, info); 145 } 146 147 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 148 trace(info); 149 return super.addConsumer(context, info); 150 } 151 152 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 153 trace(info); 154 super.addDestinationInfo(context, info); 155 } 156 157 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 158 trace(info); 159 super.addProducer(context, info); 160 } 161 162 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 163 trace(info); 164 super.addSession(context, info); 165 } 166 167 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 168 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN)); 169 super.beginTransaction(context, xid); 170 } 171 172 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 173 trace(new TransactionInfo(context.getConnectionId(), xid, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE)); 174 super.commitTransaction(context, xid, onePhase); 175 } 176 177 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { 178 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET)); 179 super.forgetTransaction(context, xid); 180 } 181 182 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 183 trace(pull); 184 return super.messagePull(context, pull); 185 } 186 187 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 188 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE)); 189 return super.prepareTransaction(context, xid); 190 } 191 192 public void processDispatch(MessageDispatch messageDispatch) { 193 trace(messageDispatch); 194 super.processDispatch(messageDispatch); 195 } 196 197 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 198 trace(messageDispatchNotification); 199 super.processDispatchNotification(messageDispatchNotification); 200 } 201 202 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 203 trace(info.createRemoveCommand()); 204 super.removeConnection(context, info, error); 205 } 206 207 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 208 trace(info.createRemoveCommand()); 209 super.removeConsumer(context, info); 210 } 211 212 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 213 super.removeDestination(context, destination, timeout); 214 } 215 216 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 217 trace(info); 218 super.removeDestinationInfo(context, info); 219 } 220 221 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 222 trace(info.createRemoveCommand()); 223 super.removeProducer(context, info); 224 } 225 226 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 227 trace(info.createRemoveCommand()); 228 super.removeSession(context, info); 229 } 230 231 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 232 trace(info); 233 super.removeSubscription(context, info); 234 } 235 236 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 237 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK)); 238 super.rollbackTransaction(context, xid); 239 } 240 241 public WireFormat getWireFormat() { 242 if( wireFormat == null ) { 243 wireFormat = createWireFormat(); 244 } 245 return wireFormat; 246 } 247 248 protected WireFormat createWireFormat() { 249 return getWireFormatFactory().createWireFormat(); 250 } 251 252 public void setWireFormat(WireFormat wireFormat) { 253 this.wireFormat = wireFormat; 254 } 255 256 public WireFormatFactory getWireFormatFactory() { 257 if( wireFormatFactory == null ) { 258 wireFormatFactory = createWireFormatFactory(); 259 } 260 return wireFormatFactory; 261 } 262 263 protected OpenWireFormatFactory createWireFormatFactory() { 264 OpenWireFormatFactory wf = new OpenWireFormatFactory(); 265 wf.setCacheEnabled(false); 266 wf.setVersion(1); 267 wf.setTightEncodingEnabled(true); 268 wf.setSizePrefixDisabled(true); 269 return wf; 270 } 271 272 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 273 this.wireFormatFactory = wireFormatFactory; 274 } 275 276 277 protected SocketAddress createSocketAddress(URI location) throws UnknownHostException { 278 InetAddress a = InetAddress.getByName(location.getHost()); 279 int port = location.getPort(); 280 return new InetSocketAddress (a, port); 281 } 282 283 public URI getDestination() { 284 return destination; 285 } 286 287 public void setDestination(URI destination) { 288 this.destination = destination; 289 } 290 291 public int getMaxTraceDatagramSize() { 292 return maxTraceDatagramSize; 293 } 294 295 public void setMaxTraceDatagramSize(int maxTraceDatagramSize) { 296 this.maxTraceDatagramSize = maxTraceDatagramSize; 297 } 298 299 public boolean isBroadcast() { 300 return broadcast; 301 } 302 303 public void setBroadcast(boolean broadcast) { 304 this.broadcast = broadcast; 305 } 306 307 public SocketAddress getAddress() { 308 return address; 309 } 310 311 public void setAddress(SocketAddress address) { 312 this.address = address; 313 } 314 315 316 } 317 | Popular Tags |