KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > udp > CommandDatagramSocket


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.udp;
19
20 import java.io.DataInputStream JavaDoc;
21 import java.io.DataOutputStream JavaDoc;
22 import java.io.IOException JavaDoc;
23 import java.net.DatagramPacket JavaDoc;
24 import java.net.DatagramSocket JavaDoc;
25 import java.net.SocketAddress JavaDoc;
26
27 import org.apache.activemq.command.Command;
28 import org.apache.activemq.command.Endpoint;
29 import org.apache.activemq.command.LastPartialCommand;
30 import org.apache.activemq.command.PartialCommand;
31 import org.apache.activemq.openwire.BooleanStream;
32 import org.apache.activemq.openwire.OpenWireFormat;
33 import org.apache.activemq.transport.reliable.ReplayBuffer;
34 import org.apache.activemq.util.ByteArrayInputStream;
35 import org.apache.activemq.util.ByteArrayOutputStream;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38
39 /**
40  * A strategy for reading datagrams and de-fragmenting them together.
41  *
42  * @version $Revision: 439111 $
43  */

44 public class CommandDatagramSocket extends CommandChannelSupport {
45
46     private static final Log log = LogFactory.getLog(CommandDatagramSocket.class);
47
48     private DatagramSocket JavaDoc channel;
49     private Object JavaDoc readLock = new Object JavaDoc();
50     private Object JavaDoc writeLock = new Object JavaDoc();
51
52     public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
53             SocketAddress JavaDoc targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket JavaDoc channel) {
54         super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
55         this.channel = channel;
56     }
57
58     public void start() throws Exception JavaDoc {
59     }
60
61     public void stop() throws Exception JavaDoc {
62     }
63
64     public Command read() throws IOException JavaDoc {
65         Command answer = null;
66         Endpoint from = null;
67         synchronized (readLock) {
68             while (true) {
69                 DatagramPacket JavaDoc datagram = createDatagramPacket();
70                 channel.receive(datagram);
71
72                 // TODO could use a DataInput implementation that talks direct
73
// to the byte[] to avoid object allocation
74
DataInputStream JavaDoc dataIn = new DataInputStream JavaDoc(new ByteArrayInputStream(datagram.getData()));
75
76                 from = headerMarshaller.createEndpoint(datagram, dataIn);
77                 answer = (Command) wireFormat.unmarshal(dataIn);
78                 break;
79             }
80         }
81         if (answer != null) {
82             answer.setFrom(from);
83
84             if (log.isDebugEnabled()) {
85                 log.debug("Channel: " + name + " about to process: " + answer);
86             }
87         }
88         return answer;
89     }
90
91     public void write(Command command, SocketAddress JavaDoc address) throws IOException JavaDoc {
92         synchronized (writeLock) {
93
94             ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
95             DataOutputStream JavaDoc dataOut = new DataOutputStream JavaDoc(writeBuffer);
96             headerMarshaller.writeHeader(command, dataOut);
97
98             int offset = writeBuffer.size();
99
100             wireFormat.marshal(command, dataOut);
101
102             if (remaining(writeBuffer) >= 0) {
103                 sendWriteBuffer(address, writeBuffer, command.getCommandId());
104             }
105             else {
106                 // lets split the command up into chunks
107
byte[] data = writeBuffer.toByteArray();
108                 boolean lastFragment = false;
109                 for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
110                     writeBuffer = createByteArrayOutputStream();
111                     headerMarshaller.writeHeader(command, dataOut);
112
113                     int chunkSize = remaining(writeBuffer);
114
115                     // we need to remove the amount of overhead to write the
116
// partial command
117

118                     // lets write the flags in there
119
BooleanStream bs = null;
120                     if (wireFormat.isTightEncodingEnabled()) {
121                         bs = new BooleanStream();
122                         bs.writeBoolean(true); // the partial data byte[] is
123
// never null
124
}
125
126                     // lets remove the header of the partial command
127
// which is the byte for the type and an int for the size of
128
// the byte[]
129
chunkSize -= 1 // the data type
130
+ 4 // the command ID
131
+ 4; // the size of the partial data
132

133                     // the boolean flags
134
if (bs != null) {
135                         chunkSize -= bs.marshalledSize();
136                     }
137                     else {
138                         chunkSize -= 1;
139                     }
140
141                     if (!wireFormat.isSizePrefixDisabled()) {
142                         // lets write the size of the command buffer
143
dataOut.writeInt(chunkSize);
144                         chunkSize -= 4;
145                     }
146
147                     lastFragment = offset + chunkSize >= length;
148                     if (chunkSize + offset > length) {
149                         chunkSize = length - offset;
150                     }
151
152                     if (lastFragment) {
153                         dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE);
154                     }
155                     else {
156                         dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
157                     }
158
159                     if (bs != null) {
160                         bs.marshal(dataOut);
161                     }
162
163                     int commandId = command.getCommandId();
164                     if (fragment > 0) {
165                         commandId = sequenceGenerator.getNextSequenceId();
166                     }
167                     dataOut.writeInt(commandId);
168                     if (bs == null) {
169                         dataOut.write((byte) 1);
170                     }
171
172                     // size of byte array
173
dataOut.writeInt(chunkSize);
174
175                     // now the data
176
dataOut.write(data, offset, chunkSize);
177
178                     offset += chunkSize;
179                     sendWriteBuffer(address, writeBuffer, commandId);
180                 }
181             }
182         }
183     }
184
185     public int getDatagramSize() {
186         return datagramSize;
187     }
188
189     public void setDatagramSize(int datagramSize) {
190         this.datagramSize = datagramSize;
191     }
192
193     // Implementation methods
194
// -------------------------------------------------------------------------
195
protected void sendWriteBuffer(SocketAddress JavaDoc address, ByteArrayOutputStream writeBuffer, int commandId)
196             throws IOException JavaDoc {
197         byte[] data = writeBuffer.toByteArray();
198         sendWriteBuffer(commandId, address, data, false);
199     }
200
201     protected void sendWriteBuffer(int commandId, SocketAddress JavaDoc address, byte[] data, boolean redelivery)
202             throws IOException JavaDoc {
203         // lets put the datagram into the replay buffer first to prevent timing
204
// issues
205
ReplayBuffer bufferCache = getReplayBuffer();
206         if (bufferCache != null && !redelivery) {
207             bufferCache.addBuffer(commandId, data);
208         }
209
210         if (log.isDebugEnabled()) {
211             String JavaDoc text = (redelivery) ? "REDELIVERING" : "sending";
212             log.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
213         }
214         DatagramPacket JavaDoc packet = new DatagramPacket JavaDoc(data, 0, data.length, address);
215         channel.send(packet);
216     }
217
218     public void sendBuffer(int commandId, Object JavaDoc buffer) throws IOException JavaDoc {
219         if (buffer != null) {
220             byte[] data = (byte[]) buffer;
221             sendWriteBuffer(commandId, replayAddress, data, true);
222         }
223         else {
224             if (log.isWarnEnabled()) {
225                 log.warn("Request for buffer: " + commandId + " is no longer present");
226             }
227         }
228     }
229
230     protected DatagramPacket JavaDoc createDatagramPacket() {
231         return new DatagramPacket JavaDoc(new byte[datagramSize], datagramSize);
232     }
233
234     protected int remaining(ByteArrayOutputStream buffer) {
235         return datagramSize - buffer.size();
236     }
237
238     protected ByteArrayOutputStream createByteArrayOutputStream() {
239         return new ByteArrayOutputStream(datagramSize);
240     }
241 }
242
Popular Tags