1 24 25 package org.objectweb.tribe.channel.tcp; 26 27 import java.io.ByteArrayInputStream ; 28 import java.io.DataInputStream ; 29 import java.io.IOException ; 30 import java.io.ObjectInputStream ; 31 import java.util.HashMap ; 32 33 import org.objectweb.tribe.common.log.Trace; 34 import org.objectweb.tribe.exceptions.ChannelException; 35 import org.objectweb.tribe.exceptions.NoReceiverException; 36 import org.objectweb.tribe.messages.ChannelMessage; 37 38 47 public class TcpReaderThread extends Thread 48 { 49 boolean isKilled = false; 50 private TcpChannel channel; 51 private HashMap keyBuffers; 52 53 private static Trace logger = Trace 54 .getLogger("org.objectweb.tribe.channel"); 55 56 63 public TcpReaderThread(TcpChannel channel, HashMap keyBuffers) 64 { 65 super("TcpReaderThread"); 66 this.channel = channel; 67 this.keyBuffers = keyBuffers; 68 } 69 70 73 public void run() 74 { 75 if (logger.isDebugEnabled()) 76 logger.debug("TcpReaderThread started on " + channel.getSourceAddress() 77 + "->" + channel.getDestinationAddress()); 78 79 DataInputStream inStream = channel.getInStream(); 80 81 while (!isKilled) 82 { 83 try 84 { 85 int size = inStream.readInt(); int totalRead = 0; byte[] buf = new byte[size]; 92 do 93 { 94 int read = inStream.read(buf, totalRead, size); 95 totalRead += read; 96 size -= read; 97 } 98 while (size > 0); 99 100 ChannelMessage msg = (ChannelMessage) new ObjectInputStream ( 101 new ByteArrayInputStream (buf)).readObject(); 102 if (logger.isDebugEnabled()) 103 logger.debug("TcpReaderThread received message: " + msg); 104 msg.deliverMessage(keyBuffers); 105 } 106 catch (RuntimeException e) 107 { 108 if (logger.isDebugEnabled()) 109 logger 110 .debug("TcpReaderThread: Error while receiving message, terminating thread and channel (" 111 + e + ")"); 112 try 113 { 114 channel.close(); 115 } 116 catch (ChannelException ignore) 117 { 118 } 119 isKilled = true; 120 } 121 catch (IOException e) 122 { 123 if (logger.isDebugEnabled()) 124 logger 125 .debug("TcpReaderThread: Error while receiving message, terminating thread and channel (" 126 + e + ")"); 127 try 128 { 129 channel.close(); 130 } 131 catch (ChannelException ignore) 132 { 133 } 134 isKilled = true; 135 } 136 catch (ClassNotFoundException e) 137 { 138 logger.error("TcpReaderThread: Error while unmarshalling message", e); 139 } 140 catch (NoReceiverException e) 141 { 142 logger.info("TcpReaderThread: Error while delivering message", e); 143 } 144 } 145 146 if (logger.isDebugEnabled()) 147 logger.debug("TcpReaderThread terminated."); 148 } 149 150 153 public void kill() 154 { 155 isKilled = true; 156 try 158 { 159 channel.getInStream().close(); 160 } 161 catch (IOException ignore) 162 { 163 } 164 try 165 { 166 channel.close(); 167 } 168 catch (ChannelException ignore) 169 { 170 } 171 this.interrupt(); 172 } 173 174 } | Popular Tags |