KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > tribe > channel > tcp > TcpReaderThread


1 /**
2  * Tribe: Group communication library.
3  * Copyright (C) 2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: tribe@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.tribe.channel.tcp;
26
27 import java.io.ByteArrayInputStream JavaDoc;
28 import java.io.DataInputStream JavaDoc;
29 import java.io.IOException JavaDoc;
30 import java.io.ObjectInputStream JavaDoc;
31 import java.util.HashMap JavaDoc;
32
33 import org.objectweb.tribe.common.log.Trace;
34 import org.objectweb.tribe.exceptions.ChannelException;
35 import org.objectweb.tribe.exceptions.NoReceiverException;
36 import org.objectweb.tribe.messages.ChannelMessage;
37
38 /**
39  * This class defines a TcpReaderThread.
40  * <p>
41  * This thread reads messages from a TCP connection and store them in the
42  * appropriate receive buffers.
43  *
44  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
45  * @version 1.0
46  */

47 public class TcpReaderThread extends Thread JavaDoc
48 {
49   boolean isKilled = false;
50   private TcpChannel channel;
51   private HashMap JavaDoc keyBuffers;
52
53   private static Trace logger = Trace
54                                     .getLogger("org.objectweb.tribe.channel");
55
56   /**
57    * Creates a new <code>TcpReaderThread</code> object
58    *
59    * @param logger logger for debug
60    * @param objectInputStream socket input stream to read from
61    * @param keyBuffers list of pool receive buffers
62    */

63   public TcpReaderThread(TcpChannel channel, HashMap JavaDoc keyBuffers)
64   {
65     super("TcpReaderThread");
66     this.channel = channel;
67     this.keyBuffers = keyBuffers;
68   }
69
70   /**
71    * @see java.lang.Runnable#run()
72    */

73   public void run()
74   {
75     if (logger.isDebugEnabled())
76       logger.debug("TcpReaderThread started on " + channel.getSourceAddress()
77           + "->" + channel.getDestinationAddress());
78
79     DataInputStream JavaDoc inStream = channel.getInStream();
80
81     while (!isKilled)
82     {
83       try
84       {
85         // Protocol is:
86
// 1. message size (as an int)
87
// 2. read an array of bytes of the given size and convert it to an
88
// object.
89
int size = inStream.readInt(); // number of bytes to read
90
int totalRead = 0; // number of bytes read
91
byte[] buf = new byte[size];
92         do
93         {
94           int read = inStream.read(buf, totalRead, size);
95           totalRead += read;
96           size -= read;
97         }
98         while (size > 0);
99
100         ChannelMessage msg = (ChannelMessage) new ObjectInputStream JavaDoc(
101             new ByteArrayInputStream JavaDoc(buf)).readObject();
102         if (logger.isDebugEnabled())
103           logger.debug("TcpReaderThread received message: " + msg);
104         msg.deliverMessage(keyBuffers);
105       }
106       catch (RuntimeException JavaDoc e)
107       {
108         if (logger.isDebugEnabled())
109           logger
110               .debug("TcpReaderThread: Error while receiving message, terminating thread and channel ("
111                   + e + ")");
112         try
113         {
114           channel.close();
115         }
116         catch (ChannelException ignore)
117         {
118         }
119         isKilled = true;
120       }
121       catch (IOException JavaDoc e)
122       {
123         if (logger.isDebugEnabled())
124           logger
125               .debug("TcpReaderThread: Error while receiving message, terminating thread and channel ("
126                   + e + ")");
127         try
128         {
129           channel.close();
130         }
131         catch (ChannelException ignore)
132         {
133         }
134         isKilled = true;
135       }
136       catch (ClassNotFoundException JavaDoc e)
137       {
138         logger.error("TcpReaderThread: Error while unmarshalling message", e);
139       }
140       catch (NoReceiverException e)
141       {
142         logger.info("TcpReaderThread: Error while delivering message", e);
143       }
144     }
145
146     if (logger.isDebugEnabled())
147       logger.debug("TcpReaderThread terminated.");
148   }
149
150   /**
151    * Terminates this thread
152    */

153   public void kill()
154   {
155     isKilled = true;
156     // Close the stream to wake up the thread
157
try
158     {
159       channel.getInStream().close();
160     }
161     catch (IOException JavaDoc ignore)
162     {
163     }
164     try
165     {
166       channel.close();
167     }
168     catch (ChannelException ignore)
169     {
170     }
171     this.interrupt();
172   }
173
174 }
Popular Tags