KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > util > UDPTraceBrokerPlugin


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.util;
19
20 import java.io.DataOutputStream JavaDoc;
21 import java.io.IOException JavaDoc;
22 import java.net.DatagramPacket JavaDoc;
23 import java.net.DatagramSocket JavaDoc;
24 import java.net.InetAddress JavaDoc;
25 import java.net.InetSocketAddress JavaDoc;
26 import java.net.SocketAddress JavaDoc;
27 import java.net.URI JavaDoc;
28 import java.net.URISyntaxException JavaDoc;
29 import java.net.UnknownHostException JavaDoc;
30
31 import org.apache.activemq.broker.BrokerPluginSupport;
32 import org.apache.activemq.broker.ConnectionContext;
33 import org.apache.activemq.broker.ConsumerBrokerExchange;
34 import org.apache.activemq.broker.ProducerBrokerExchange;
35 import org.apache.activemq.broker.region.Subscription;
36 import org.apache.activemq.command.ActiveMQDestination;
37 import org.apache.activemq.command.BrokerId;
38 import org.apache.activemq.command.ConnectionInfo;
39 import org.apache.activemq.command.ConsumerInfo;
40 import org.apache.activemq.command.DataStructure;
41 import org.apache.activemq.command.DestinationInfo;
42 import org.apache.activemq.command.JournalTrace;
43 import org.apache.activemq.command.Message;
44 import org.apache.activemq.command.MessageAck;
45 import org.apache.activemq.command.MessageDispatch;
46 import org.apache.activemq.command.MessageDispatchNotification;
47 import org.apache.activemq.command.MessagePull;
48 import org.apache.activemq.command.ProducerInfo;
49 import org.apache.activemq.command.RemoveSubscriptionInfo;
50 import org.apache.activemq.command.Response;
51 import org.apache.activemq.command.SessionInfo;
52 import org.apache.activemq.command.TransactionId;
53 import org.apache.activemq.command.TransactionInfo;
54 import org.apache.activemq.openwire.OpenWireFormatFactory;
55 import org.apache.activemq.util.ByteArrayOutputStream;
56 import org.apache.activemq.util.ByteSequence;
57 import org.apache.activemq.wireformat.WireFormat;
58 import org.apache.activemq.wireformat.WireFormatFactory;
59 import org.apache.commons.logging.Log;
60 import org.apache.commons.logging.LogFactory;
61
62 /**
63  * A Broker interceptor which allows you to trace all operations to a UDP socket.
64  *
65  * @org.apache.xbean.XBean element="udpTraceBrokerPlugin"
66  *
67  * @version $Revision: 427613 $
68  */

69 public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
70
71     static final private Log log = LogFactory.getLog(UDPTraceBrokerPlugin.class);
72     protected WireFormat wireFormat;
73     protected WireFormatFactory wireFormatFactory;
74     protected int maxTraceDatagramSize = 1024*4;
75     protected URI JavaDoc destination;
76     protected DatagramSocket JavaDoc socket;
77         
78     protected BrokerId brokerId;
79     protected SocketAddress JavaDoc address;
80     protected boolean broadcast;
81     
82     public UDPTraceBrokerPlugin() {
83         try {
84             destination = new URI JavaDoc("udp://127.0.0.1:61616");
85         } catch (URISyntaxException JavaDoc wontHappen) {
86         }
87     }
88
89     public void start() throws Exception JavaDoc {
90         super.start();
91         if( getWireFormat() == null )
92             throw new IllegalArgumentException JavaDoc("Wireformat must be specifed.");
93         if( address == null ) {
94             address = createSocketAddress(destination);
95         }
96         socket = createSocket();
97         
98         brokerId = super.getBrokerId();
99         trace(new JournalTrace("START"));
100     }
101
102     protected DatagramSocket JavaDoc createSocket() throws IOException JavaDoc {
103         DatagramSocket JavaDoc s = new DatagramSocket JavaDoc();
104         s.setSendBufferSize(maxTraceDatagramSize);
105         s.setBroadcast(broadcast);
106         return s;
107     }
108
109     public void stop() throws Exception JavaDoc {
110         trace(new JournalTrace("STOP"));
111         socket.close();
112         super.stop();
113     }
114     
115     private void trace(DataStructure command) {
116         try {
117             
118             ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize);
119             DataOutputStream JavaDoc out = new DataOutputStream JavaDoc(baos);
120             wireFormat.marshal(brokerId, out);
121             wireFormat.marshal(command, out);
122             out.close();
123             ByteSequence sequence = baos.toByteSequence();
124             DatagramPacket JavaDoc datagram = new DatagramPacket JavaDoc( sequence.getData(), sequence.getOffset(), sequence.getLength(), address);
125             socket.send(datagram);
126             
127         } catch ( Throwable JavaDoc e) {
128             log.debug("Failed to trace: "+command, e);
129         }
130     }
131     
132     public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception JavaDoc {
133         trace(messageSend);
134         super.send(producerExchange, messageSend);
135     }
136
137     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception JavaDoc {
138         trace(ack);
139         super.acknowledge(consumerExchange, ack);
140     }
141   
142     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception JavaDoc {
143         trace(info);
144         super.addConnection(context, info);
145     }
146
147     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc {
148         trace(info);
149         return super.addConsumer(context, info);
150     }
151
152     public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception JavaDoc {
153         trace(info);
154         super.addDestinationInfo(context, info);
155     }
156
157     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception JavaDoc {
158         trace(info);
159         super.addProducer(context, info);
160     }
161
162     public void addSession(ConnectionContext context, SessionInfo info) throws Exception JavaDoc {
163         trace(info);
164         super.addSession(context, info);
165     }
166
167     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
168         trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN));
169         super.beginTransaction(context, xid);
170     }
171
172     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception JavaDoc {
173         trace(new TransactionInfo(context.getConnectionId(), xid, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE));
174         super.commitTransaction(context, xid, onePhase);
175     }
176
177     public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
178         trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET));
179         super.forgetTransaction(context, xid);
180     }
181
182     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception JavaDoc {
183         trace(pull);
184         return super.messagePull(context, pull);
185     }
186
187     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
188         trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE));
189         return super.prepareTransaction(context, xid);
190     }
191
192     public void processDispatch(MessageDispatch messageDispatch) {
193         trace(messageDispatch);
194         super.processDispatch(messageDispatch);
195     }
196
197     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception JavaDoc {
198         trace(messageDispatchNotification);
199         super.processDispatchNotification(messageDispatchNotification);
200     }
201
202     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable JavaDoc error) throws Exception JavaDoc {
203         trace(info.createRemoveCommand());
204         super.removeConnection(context, info, error);
205     }
206
207     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc {
208         trace(info.createRemoveCommand());
209         super.removeConsumer(context, info);
210     }
211
212     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception JavaDoc {
213         super.removeDestination(context, destination, timeout);
214     }
215
216     public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception JavaDoc {
217         trace(info);
218         super.removeDestinationInfo(context, info);
219     }
220
221     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception JavaDoc {
222         trace(info.createRemoveCommand());
223         super.removeProducer(context, info);
224     }
225
226     public void removeSession(ConnectionContext context, SessionInfo info) throws Exception JavaDoc {
227         trace(info.createRemoveCommand());
228         super.removeSession(context, info);
229     }
230
231     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception JavaDoc {
232         trace(info);
233         super.removeSubscription(context, info);
234     }
235
236     public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
237         trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK));
238         super.rollbackTransaction(context, xid);
239     }
240
241     public WireFormat getWireFormat() {
242         if( wireFormat == null ) {
243             wireFormat = createWireFormat();
244         }
245         return wireFormat;
246     }
247
248     protected WireFormat createWireFormat() {
249         return getWireFormatFactory().createWireFormat();
250     }
251
252     public void setWireFormat(WireFormat wireFormat) {
253         this.wireFormat = wireFormat;
254     }
255
256     public WireFormatFactory getWireFormatFactory() {
257         if( wireFormatFactory == null ) {
258             wireFormatFactory = createWireFormatFactory();
259         }
260         return wireFormatFactory;
261     }
262
263     protected OpenWireFormatFactory createWireFormatFactory() {
264         OpenWireFormatFactory wf = new OpenWireFormatFactory();
265         wf.setCacheEnabled(false);
266         wf.setVersion(1);
267         wf.setTightEncodingEnabled(true);
268         wf.setSizePrefixDisabled(true);
269         return wf;
270     }
271
272     public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
273         this.wireFormatFactory = wireFormatFactory;
274     }
275
276
277     protected SocketAddress JavaDoc createSocketAddress(URI JavaDoc location) throws UnknownHostException JavaDoc {
278         InetAddress JavaDoc a = InetAddress.getByName(location.getHost());
279         int port = location.getPort();
280         return new InetSocketAddress JavaDoc(a, port);
281     }
282
283     public URI JavaDoc getDestination() {
284         return destination;
285     }
286
287     public void setDestination(URI JavaDoc destination) {
288         this.destination = destination;
289     }
290
291     public int getMaxTraceDatagramSize() {
292         return maxTraceDatagramSize;
293     }
294
295     public void setMaxTraceDatagramSize(int maxTraceDatagramSize) {
296         this.maxTraceDatagramSize = maxTraceDatagramSize;
297     }
298
299     public boolean isBroadcast() {
300         return broadcast;
301     }
302
303     public void setBroadcast(boolean broadcast) {
304         this.broadcast = broadcast;
305     }
306
307     public SocketAddress JavaDoc getAddress() {
308         return address;
309     }
310
311     public void setAddress(SocketAddress JavaDoc address) {
312         this.address = address;
313     }
314
315
316 }
317
Popular Tags