1 24 25 package org.objectweb.tribe.channel.tcp; 26 27 import java.io.IOException ; 28 import java.util.HashMap ; 29 30 import org.objectweb.tribe.channel.AbstractChannelPool; 31 import org.objectweb.tribe.channel.AbstractReliableFifoChannel; 32 import org.objectweb.tribe.channel.AbstractServerChannel; 33 import org.objectweb.tribe.common.Address; 34 import org.objectweb.tribe.common.IpAddress; 35 import org.objectweb.tribe.common.log.Trace; 36 import org.objectweb.tribe.exceptions.ChannelException; 37 38 44 public class TcpChannelPool extends AbstractChannelPool 45 { 46 47 private static TcpChannelPool pool = new TcpChannelPool(); 48 private HashMap readerThreads; 49 private HashMap accepterThreads; 50 private static Trace logger = Trace 51 .getLogger("org.objectweb.tribe.channel"); 52 53 56 public TcpChannelPool() 57 { 58 super(); 59 readerThreads = new HashMap (); 60 accepterThreads = new HashMap (); 61 } 62 63 66 public AbstractReliableFifoChannel getChannel(Address destination) 67 throws ChannelException 68 { 69 synchronized (channels) 70 { 71 AbstractReliableFifoChannel channel = (AbstractReliableFifoChannel) channels 72 .get(destination); 73 if (channel == null) 74 { 75 try 76 { 77 if (logger.isDebugEnabled()) 78 logger.debug("Getting new channel for " + destination); 79 channel = new TcpChannel(); 80 channel.connect(destination); 81 channels.put(destination, channel); 82 TcpReaderThread thread = new TcpReaderThread((TcpChannel) channel, 83 keyBuffers); 84 thread.start(); 85 synchronized (readerThreads) 86 { 87 readerThreads.put(channel, thread); 88 } 89 } 90 catch (IOException e) 91 { 92 throw new ChannelException("Failed to create a new serverSocket to " 93 + destination, e); 94 } 95 } 96 return channel; 97 } 98 } 99 100 103 public AbstractServerChannel getServerChannel(Address serverAddress) 104 throws ChannelException 105 { 106 synchronized (serverChannels) 107 { 108 AbstractServerChannel channel = (AbstractServerChannel) serverChannels 109 .get(serverAddress); 110 if (channel == null) 111 { 112 try 113 { 114 if (logger.isDebugEnabled()) 115 logger.debug("Getting new server channel for " + serverAddress); 116 int port = ((IpAddress) serverAddress).getPort(); 117 if (port == 0) 118 channel = new TcpServerChannel(port); 119 else 120 { 121 channel = new TcpServerChannel(); 122 channel.bind(serverAddress); 123 } 124 serverChannels.put(serverAddress, channel); 125 TcpServerAccepterThread thread = new TcpServerAccepterThread(channel, 126 channels, readerThreads, keyBuffers); 127 thread.start(); 128 synchronized (accepterThreads) 129 { 130 accepterThreads.put(channel, thread); 131 } 132 } 133 catch (IOException e) 134 { 135 throw new ChannelException( 136 "Failed to create a new server serverSocket on " + serverAddress, 137 e); 138 } 139 } 140 return channel; 141 } 142 } 143 144 149 public static AbstractChannelPool getChannelPool() 150 { 151 return pool; 152 } 153 154 159 public boolean removeChannelFromPool(AbstractReliableFifoChannel channel) 160 { 161 TcpReaderThread thread; 162 synchronized (readerThreads) 163 { 164 thread = (TcpReaderThread) readerThreads.remove(channel); 165 } 166 if (thread != null) 167 thread.kill(); 168 return super.removeChannelFromPool(channel); 169 } 170 171 176 public boolean removeServerChannelFromPool(AbstractServerChannel channel) 177 { 178 TcpServerAccepterThread thread; 179 synchronized (accepterThreads) 180 { 181 thread = (TcpServerAccepterThread) accepterThreads.remove(channel); 182 } 183 if (thread != null) 184 thread.kill(); 185 return super.removeServerChannelFromPool(channel); 186 } 187 } | Popular Tags |