KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > xsocket > datagram > AbstractChannelBasedEndpoint


1 //$Id: AbstractChannelBasedEndpoint.java 1546 2007-07-23 06:07:56Z grro $
2
/*
3  * Copyright (c) xsocket.org, 2006 - 2007. All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
20  * The latest copy of this software may be found on http://www.xsocket.org/
21  */

22 package org.xsocket.datagram;
23
24 import java.io.IOException JavaDoc;
25 import java.net.DatagramSocket JavaDoc;
26 import java.net.InetAddress JavaDoc;
27 import java.net.InetSocketAddress JavaDoc;
28 import java.net.SocketAddress JavaDoc;
29 import java.net.SocketOptions JavaDoc;
30 import java.nio.ByteBuffer JavaDoc;
31 import java.nio.ByteOrder JavaDoc;
32 import java.nio.channels.DatagramChannel JavaDoc;
33 import java.nio.channels.SelectableChannel JavaDoc;
34 import java.nio.channels.SelectionKey JavaDoc;
35 import java.util.Collections JavaDoc;
36 import java.util.HashMap JavaDoc;
37 import java.util.LinkedList JavaDoc;
38 import java.util.List JavaDoc;
39 import java.util.Map JavaDoc;
40 import java.util.Map.Entry;
41 import java.util.concurrent.Executor JavaDoc;
42 import java.util.logging.Level JavaDoc;
43 import java.util.logging.Logger JavaDoc;
44
45 import org.xsocket.Dispatcher;
46 import org.xsocket.IEventHandler;
47 import org.xsocket.IHandle;
48
49
50
51
52
53
54 /**
55  * Endpoint implementation base
56  *
57  * @author grro@xsocket.org
58  */

59 abstract class AbstractChannelBasedEndpoint extends AbstractEndpoint implements IEndpoint {
60     
61     private static final Logger JavaDoc LOG = Logger.getLogger(AbstractChannelBasedEndpoint.class.getName());
62
63     private static final MemoryManager memoryManager = new MemoryManager(65536, false);
64     private static Dispatcher<DispatcherHandle> dispatcher = createDispatcher();
65     
66     private static final Map JavaDoc<String JavaDoc ,Class JavaDoc> SUPPORTED_OPTIONS = new HashMap JavaDoc<String JavaDoc, Class JavaDoc>();
67     
68     static {
69         SUPPORTED_OPTIONS.put(SO_RCVBUF, Integer JavaDoc.class);
70         SUPPORTED_OPTIONS.put(SO_SNDBUF, Integer JavaDoc.class);
71         SUPPORTED_OPTIONS.put(IP_TOS, Integer JavaDoc.class);
72         SUPPORTED_OPTIONS.put(SO_REUSEADDR, Boolean JavaDoc.class);
73     }
74     
75
76     
77     // socket
78
private DatagramSocket JavaDoc socket = null;
79     private DatagramChannel JavaDoc channel = null;
80     private ByteOrder JavaDoc byteOrder = ByteOrder.BIG_ENDIAN;
81     
82     
83     // send queue
84
private final List JavaDoc<UserDatagram> sendQueue = Collections.synchronizedList(new LinkedList JavaDoc<UserDatagram>());
85
86     
87     private DispatcherHandle dispatcherHandle = null;
88     
89
90
91     /**
92      * constructor
93      *
94      * @param address the local address
95      * @param port the local port which must be between 0 and 65535 inclusive.
96      * @param options the socket options
97      * @param datagramHandler the datagram handler
98      * @param receivePacketSize the receive packet size
99      * @param workerPool the workerpool to use
100      * @throws IOException If some I/O error occurs
101      */

102     AbstractChannelBasedEndpoint(InetAddress JavaDoc address, int port, Map JavaDoc<String JavaDoc, Object JavaDoc> options, IDatagramHandler datagramHandler, int receivePacketSize, Executor JavaDoc workerPool) throws IOException JavaDoc {
103         super(datagramHandler, receivePacketSize, workerPool);
104             
105         channel = DatagramChannel.open();
106         channel.configureBlocking(false);
107             
108         socket = channel.socket();
109         
110         for (Entry<String JavaDoc, Object JavaDoc> entry : options.entrySet()) {
111             setOption(entry.getKey(), entry.getValue());
112         }
113
114         InetSocketAddress JavaDoc addr = new InetSocketAddress JavaDoc(address, port);
115         socket.bind(addr);
116         
117         dispatcherHandle = new DispatcherHandle(this);
118         dispatcher.register(dispatcherHandle, SelectionKey.OP_READ);
119                         
120         logFine("enpoint has been bound to locale port " + getLocalPort() + " (server mode)");
121     }
122     
123     
124     /**
125      * @deprecated
126      */

127     final SocketOptions JavaDoc getSocketOptions() {
128         return getSocketOptions(channel.socket());
129     }
130     
131         
132     @SuppressWarnings JavaDoc("unchecked")
133     private static Dispatcher<DispatcherHandle> createDispatcher() {
134         Dispatcher<DispatcherHandle> disp = new Dispatcher<DispatcherHandle>(new DispatcherEventHandler());
135         Thread JavaDoc t = new Thread JavaDoc(disp);
136         t.setName("DispatcherThread#" + disp.hashCode());
137         t.setDaemon(true);
138         t.start();
139         
140         return disp;
141     }
142     
143         
144     protected final DatagramChannel JavaDoc getChannel() {
145         return channel;
146     }
147         
148         
149         
150     
151     /**
152      * {@inheritDoc}
153      */

154     public final void close() {
155         if (isOpen()) {
156             try {
157                 logFine("closing " + toCompactString());
158                 channel.close();
159             } catch (IOException JavaDoc ioe) {
160                 logFine("error occured by closing connection. Reason " + ioe.toString());
161             }
162             
163             super.close();
164         }
165     }
166         
167     
168     /**
169      * {@inheritDoc}
170      */

171     public final SocketAddress JavaDoc getLocalSocketAddress() {
172         return socket.getLocalSocketAddress();
173     }
174     
175
176     /**
177      * {@inheritDoc}
178      */

179     public final InetAddress JavaDoc getLocalAddress() {
180         return socket.getLocalAddress();
181     }
182
183     
184     /**
185      * {@inheritDoc}
186      */

187     public final int getLocalPort() {
188         return socket.getLocalPort();
189     }
190     
191     
192     /**
193      * {@inheritDoc}
194      */

195     public final boolean isOpen() {
196         return channel.isOpen();
197     }
198         
199
200
201     /**
202      * log a fine msg
203      *
204      * @param msg the log message
205      */

206     private void logFine(String JavaDoc msg) {
207         if (LOG.isLoggable(Level.FINE)) {
208             LOG.fine("[" + "/:" + getLocalPort() + " " + getId() + "] " + msg);
209         }
210     }
211
212     
213     /**
214      * {@inheritDoc}
215      */

216     public void send(UserDatagram packet) throws IOException JavaDoc {
217         if (packet.getRemoteAddress() == null) {
218             throw new IOException JavaDoc("remote socket adress has to be set");
219         }
220         
221         logFine("add datagram packet (" + packet + ") to write queue");
222         
223         packet.prepareForSend();
224
225         sendQueue.add(packet);
226         logFine("update interest ops to write");
227         updateInteresSet(SelectionKey.OP_WRITE);
228     }
229     
230         
231         
232     /**
233      * write the outgoing data to socket
234      *
235      */

236     private void writePhysical() {
237         if (!sendQueue.isEmpty()) {
238             synchronized(sendQueue) {
239                 for (UserDatagram packet : sendQueue) {
240                     try {
241                         if (LOG.isLoggable(Level.FINE)) {
242                             LOG.fine("[" + "/:" + getLocalPort() + " " + getId() + "] sending datagram " + packet.toString());
243                         }
244
245                         int dataToSend = packet.getSize();
246                         int written = channel.send(packet.getData(), packet.getRemoteSocketAddress());
247                         
248                         if (LOG.isLoggable(Level.FINE)) {
249                             if (dataToSend != written) {
250                                 LOG.fine("Error occured by sending datagram. Size DataToSend=" + dataToSend + ", written=" + written);
251                             }
252                         }
253                     
254                         incNumberOfHandledOutgoingDatagram();
255                     } catch (IOException JavaDoc ioe) {
256                         LOG.warning("couldn't write datagram to " + packet.getRemoteAddress() + " .Reason: " + ioe.toString());
257                     }
258                 }
259                 
260                 sendQueue.clear();
261             }
262         }
263     }
264
265     
266     private void updateInteresSet(int intOps) throws IOException JavaDoc {
267         dispatcher.updateInterestSet(dispatcherHandle, intOps);
268     }
269     
270         
271     /**
272      * a compact string of this endpoint
273      */

274     public String JavaDoc toCompactString() {
275         return this.getClass().getSimpleName() + " " + socket.getLocalAddress().getCanonicalHostName() + ":" + getLocalPort();
276     }
277              
278     
279     
280     private final static class DispatcherHandle implements IHandle {
281             
282         private AbstractChannelBasedEndpoint endpoint = null;
283         
284         DispatcherHandle(AbstractChannelBasedEndpoint endpoint) {
285             this.endpoint = endpoint;
286         }
287             
288         public SelectableChannel JavaDoc getChannel() {
289             return endpoint.channel;
290         }
291     }
292         
293         
294
295
296     private final static class DispatcherEventHandler<T extends IEndpoint> implements IEventHandler<DispatcherHandle> {
297     
298             
299         /**
300          * {@inheritDoc}
301          */

302         public void onHandleRegisterEvent(DispatcherHandle handle) throws IOException JavaDoc {
303             
304         }
305         
306
307         /**
308          * {@inheritDoc}
309          */

310         @SuppressWarnings JavaDoc("unchecked")
311         public void onHandleReadableEvent(final DispatcherHandle handle) {
312             if (handle.endpoint.isOpen()) {
313             
314                 try {
315                     // perform non-blocking read operation
316
if (handle.endpoint.getReceiveSize() > 0) {
317                         ByteBuffer JavaDoc readBuffer = memoryManager.acquireMemory(handle.endpoint.getReceiveSize());
318                         readBuffer.order(handle.endpoint.byteOrder);
319                         SocketAddress JavaDoc address = handle.endpoint.channel.receive(readBuffer);
320                                     
321                         // datagram is not immediately available
322
if (address == null) {
323                             return;
324                                 
325                         // datagram is available
326
} else {
327                                 
328                             // nothing has been read
329
if (readBuffer.position() == 0) {
330                                 return;
331                             }
332                         
333                             readBuffer.flip();
334                             handle.endpoint.onData(address, readBuffer);
335                         }
336                     }
337                 } catch (IOException JavaDoc ioe) {
338                     handle.endpoint.logFine("error occured while receiving. Reason: " + ioe.toString());
339                 }
340             }
341         }
342         
343             
344         /**
345          * {@inheritDoc}
346          */

347         public void onHandleWriteableEvent(DispatcherHandle handle) throws IOException JavaDoc {
348             handle.endpoint.writePhysical();
349             handle.endpoint.updateInteresSet(SelectionKey.OP_READ);
350         }
351         
352         
353         /**
354          * {@inheritDoc}
355          */

356         public void onDispatcherCloseEvent(final DispatcherHandle handle) {
357             handle.endpoint.close();
358         }
359     }
360     
361
362     
363     /**
364      * {@inheritDoc}
365      */

366     protected AbstractChannelBasedEndpoint setOption(String JavaDoc name, Object JavaDoc value) throws IOException JavaDoc {
367     
368         if (name.equals(IEndpoint.SO_SNDBUF)) {
369             socket.setSendBufferSize((Integer JavaDoc) value);
370             
371         } else if (name.equals(IEndpoint.SO_REUSEADDR)) {
372             socket.setReuseAddress((Boolean JavaDoc) value);
373             
374         } else if (name.equals(IEndpoint.SO_RCVBUF)) {
375             socket.setReceiveBufferSize((Integer JavaDoc) value);
376
377         } else if (name.equals(IEndpoint.IP_TOS)) {
378             socket.setTrafficClass((Integer JavaDoc) value);
379             
380         } else {
381             LOG.warning("option " + name + " is not supproted for " + this.getClass().getName());
382         }
383         
384         return this;
385     }
386     
387     
388     /**
389      * {@inheritDoc}
390      */

391     public Object JavaDoc getOption(String JavaDoc name) throws IOException JavaDoc {
392
393         if (name.equals(IEndpoint.SO_SNDBUF)) {
394             return socket.getSendBufferSize();
395             
396         } else if (name.equals(IEndpoint.SO_REUSEADDR)) {
397             return socket.getReuseAddress();
398             
399         } else if (name.equals(IEndpoint.SO_RCVBUF)) {
400             return socket.getReceiveBufferSize();
401
402         } else if (name.equals(IEndpoint.IP_TOS)) {
403             return socket.getTrafficClass();
404             
405         } else {
406             LOG.warning("option " + name + " is not supproted for " + this.getClass().getName());
407             return null;
408         }
409     }
410     
411     public Map JavaDoc<String JavaDoc, Class JavaDoc> getOptions() {
412         return Collections.unmodifiableMap(SUPPORTED_OPTIONS);
413     }
414 }
415
Popular Tags