KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > xsocket > datagram > AbstractEndpoint


1 // $Id: AbstractEndpoint.java 1540 2007-07-21 13:09:27Z grro $
2
/*
3  * Copyright (c) xsocket.org, 2006 - 2007. All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
20  * The latest copy of this software may be found on http://www.xsocket.org/
21  */

22 package org.xsocket.datagram;
23
24
25 import java.io.IOException JavaDoc;
26 import java.net.DatagramSocket JavaDoc;
27 import java.net.InetAddress JavaDoc;
28 import java.net.SocketAddress JavaDoc;
29 import java.net.SocketException JavaDoc;
30 import java.net.SocketOptions JavaDoc;
31 import java.net.SocketTimeoutException JavaDoc;
32 import java.nio.ByteBuffer JavaDoc;
33 import java.util.ArrayList JavaDoc;
34 import java.util.List JavaDoc;
35 import java.util.Random JavaDoc;
36 import java.util.concurrent.Executor JavaDoc;
37 import java.util.concurrent.Executors JavaDoc;
38 import java.util.logging.Level JavaDoc;
39 import java.util.logging.Logger JavaDoc;
40
41 import org.xsocket.DataConverter;
42 import org.xsocket.IWorkerPool;
43
44
45
46 /**
47  * Endpoint implementation base
48  *
49  * @author grro@xsocket.org
50  */

51 abstract class AbstractEndpoint implements IEndpoint {
52     
53     private static final Logger JavaDoc LOG = Logger.getLogger(AbstractEndpoint.class.getName());
54     
55     
56     private static Executor JavaDoc GLOBAL_WORKERPOOL = Executors.newCachedThreadPool();
57     
58     private static String JavaDoc idPrefix = null;
59     
60     
61     // ids
62
private static long nextId = 0;
63     private String JavaDoc id = null;
64     
65     
66     
67     // encoding
68
private String JavaDoc defaultEncoding = "UTF-8";
69
70     
71     // receive data handling
72
private final Object JavaDoc readGuard = new Object JavaDoc();
73     private final ReceiveQueue receiveQueue = new ReceiveQueue();
74     private int receiveSize = -1;
75
76
77
78     // datagram handler
79
private IDatagramHandler datagramHandler = null;
80
81     
82     // worker pool
83
private Executor JavaDoc workerPool = null;
84
85     
86     // statistics & jmx
87
private long openTime = -1;
88     private long lastTimeReceived = System.currentTimeMillis();
89     private long receivedBytes = 0;
90
91     private long handleIncomingDatagrams = 0;
92     private long handleOutgoingDatagrams = 0;
93     
94  
95     static {
96         String JavaDoc base = null;
97         try {
98             base = InetAddress.getLocalHost().getCanonicalHostName();
99         } catch (Exception JavaDoc e) {
100             base = "locale";
101         }
102   
103         int random = 0;
104         do {
105             random = new Random JavaDoc().nextInt();
106         } while (random < 0);
107         idPrefix = Integer.toHexString(base.hashCode()) + "." + Long.toHexString(System.currentTimeMillis()) + "." + Integer.toHexString(random);
108     }
109     
110     
111     
112     
113     /**
114      * constructor
115      *
116      * @param useGlobalWorkerpool true, ifglobal worker pool should be used
117      * @param datagramHandler the datagram handler
118      * @param receiveSize the receive packet size
119      * @param workerPool the workerpool to use
120      */

121     AbstractEndpoint(IDatagramHandler datagramHandler, int receiveSize, Executor JavaDoc workerPool) {
122         this.datagramHandler = datagramHandler;
123         this.receiveSize = receiveSize;
124         this.workerPool = workerPool;
125         
126         id = idPrefix + "." + (++nextId);
127         
128         Runtime.getRuntime().addShutdownHook(new Thread JavaDoc() {
129             @Override JavaDoc
130             public void run() {
131                 close();
132             }
133         });
134         
135         openTime = System.currentTimeMillis();
136     }
137
138     
139     
140     protected static Executor JavaDoc getGlobalWorkerPool() {
141         return GLOBAL_WORKERPOOL;
142     }
143     
144     
145     public void close() {
146         
147     }
148     
149     
150     /**
151      * @deprecated
152      */

153     final SocketOptions JavaDoc getSocketOptions(final DatagramSocket JavaDoc socket) {
154         
155         return new SocketOptions JavaDoc() {
156                 
157             public Object JavaDoc getOption(int optID) throws SocketException JavaDoc {
158                 return DatagramSocketConfiguration.getOption(socket, optID);
159                 }
160                 
161                 public void setOption(int optID, Object JavaDoc value) throws SocketException JavaDoc {
162                     DatagramSocketConfiguration.setOption(socket, optID, value);
163                 }
164                 
165                 @Override JavaDoc
166                 public String JavaDoc toString() {
167                     try {
168                         return "TCP_NODELAY=" + getOption(TCP_NODELAY) + ", "
169                               + "SO_TIMEOUT=" + getOption(SO_TIMEOUT) + ", "
170                               + "SO_SNDBUF=" + getOption(SO_SNDBUF) + ", "
171                               + "SO_REUSEADDR=" + getOption(SO_REUSEADDR) + ", "
172                               + "SO_RCVBUF=" + getOption(SO_RCVBUF) + ", "
173                               + "IP_TOS=" + getOption(IP_TOS) + ", ";
174                     } catch (Exception JavaDoc e) {
175                         return super.toString();
176                     }
177                 }
178             };
179     }
180     
181     
182     
183     
184     
185     /**
186      * set the worker pool to use
187      * @deprecated
188      * @param workerPool the worker pool to use
189      */

190     public void setWorkerPool(IWorkerPool workerPool) {
191         this.workerPool = workerPool;
192     }
193     
194     
195
196     /**
197      * return the worker pool
198      *
199      * @deprecated
200      * @return the worker pool
201      */

202     public IWorkerPool getWorkerPool() {
203         return (IWorkerPool) workerPool;
204     }
205     
206
207     /**
208      * return the worker pool
209      *
210      * @return the worker pool
211      */

212     public Executor JavaDoc getWorkerpool() {
213         return workerPool;
214     }
215
216     
217     
218     /**
219      * {@inheritDoc}
220      */

221     public final void setReceiveSize(int receivePacketSize) {
222         this.receiveSize = receivePacketSize;
223     }
224     
225     /**
226      * {@inheritDoc}
227      */

228     public final int getReceiveSize() {
229         return receiveSize;
230     }
231     
232     protected final void onData(SocketAddress JavaDoc address, ByteBuffer JavaDoc data) {
233         UserDatagram packet = new UserDatagram(address, data, getDefaultEncoding());
234         receiveQueue.offer(packet);
235             
236         if (LOG.isLoggable(Level.FINE)) {
237             LOG.fine("[" + "/:" + getLocalPort() + " " + getId() + "] datagram received: " + packet.toString());
238         }
239         
240         handleIncomingDatagrams++;
241         lastTimeReceived = System.currentTimeMillis();
242         receivedBytes += data.remaining();
243
244         
245         if (datagramHandler != null) {
246             workerPool.execute(new HandlerProcessor());
247         }
248     }
249
250     
251     
252     /**
253      * {@inheritDoc}
254      */

255     public final UserDatagram receive(long timeoutMillis) throws IOException JavaDoc, SocketTimeoutException JavaDoc {
256         if (getReceiveSize() == -1) {
257             throw new IOException JavaDoc("the receive packet size hasn't been set");
258         }
259         
260         UserDatagram datagram = null;
261         
262         // no timeout set
263
if (timeoutMillis <= 0) {
264             datagram = receive();
265         
266             
267         // timeout set
268
} else {
269             long start = System.currentTimeMillis();
270             
271             synchronized (readGuard) {
272                 do {
273                     datagram = receive();
274                     if (datagram != null) {
275                         break;
276                     } else {
277                         try {
278                             readGuard.wait(timeoutMillis / 10);
279                         } catch (InterruptedException JavaDoc ignore) { }
280                     }
281                 } while (System.currentTimeMillis() < (start + timeoutMillis));
282             }
283         }
284             
285         if (datagram == null) {
286             throw new SocketTimeoutException JavaDoc("timeout " + DataConverter.toFormatedDuration(timeoutMillis) + " reached");
287         } else {
288             return datagram;
289         }
290     }
291     
292
293     public UserDatagram receive() {
294         return receiveQueue.poll();
295     }
296     
297
298     /**
299      * {@inheritDoc}
300      */

301     public final String JavaDoc getDefaultEncoding() {
302         return defaultEncoding;
303     }
304     
305     
306     /**
307      * {@inheritDoc}
308      */

309     public final void setDefaultEncoding(String JavaDoc defaultEncoding) {
310         this.defaultEncoding = defaultEncoding;
311     }
312     
313     
314     
315
316     /**
317      * increase the number of handled outgoing datagram
318      */

319     protected final void incNumberOfHandledOutgoingDatagram() {
320         handleOutgoingDatagrams++;
321     }
322     
323     /**
324      * return the id
325      *
326      * @return the id
327      */

328     public final String JavaDoc getId() {
329         return id;
330     }
331     
332     
333     /**
334      * {@inheritDoc}
335      */

336     @Override JavaDoc
337     public String JavaDoc toString() {
338         return " received=" + DataConverter.toFormatedBytesSize(receivedBytes)
339              + ", age=" + DataConverter.toFormatedDuration(System.currentTimeMillis() - openTime)
340              + ", lastReceived=" + DataConverter.toFormatedDate(lastTimeReceived)
341              + " [" + id + "]";
342     }
343     
344     
345     
346     
347     private static final class ReceiveQueue {
348         private List JavaDoc<UserDatagram> receiveQueue = new ArrayList JavaDoc<UserDatagram>();
349         private int modifyVersion = 0;
350     
351         public synchronized void offer(UserDatagram userDatagram) {
352             modifyVersion++;
353             receiveQueue.add(userDatagram);
354         }
355         
356         public synchronized UserDatagram poll() {
357             if (receiveQueue.isEmpty()) {
358                 return null;
359             } else {
360                 modifyVersion++;
361                 return receiveQueue.remove(0);
362             }
363         }
364         
365         public synchronized boolean isEmpty() {
366             modifyVersion++;
367             return receiveQueue.isEmpty();
368         }
369         
370         public int getModifyVersion() {
371             return modifyVersion;
372         }
373         
374         @Override JavaDoc
375         public String JavaDoc toString() {
376             return receiveQueue.size() + " (modifyVersion=" + modifyVersion + ")";
377         }
378     }
379     
380     
381     private final class HandlerProcessor implements Runnable JavaDoc {
382         
383         public void run() {
384             
385             try {
386                 if (!receiveQueue.isEmpty()) {
387                     datagramHandler.onDatagram(AbstractEndpoint.this);
388                 }
389                     
390                 
391             } catch (Throwable JavaDoc e) {
392                 if (LOG.isLoggable(Level.FINE)) {
393                     LOG.fine("error occured by performing onData task. Reason: " + e.toString());
394                 }
395             }
396         }
397     }
398 }
399
Popular Tags