1 24 25 package org.objectweb.tribe.channel; 26 27 import java.util.ArrayList ; 28 import java.util.Collection ; 29 import java.util.HashMap ; 30 31 import org.objectweb.tribe.common.Address; 32 import org.objectweb.tribe.common.Member; 33 import org.objectweb.tribe.common.log.Trace; 34 import org.objectweb.tribe.exceptions.ChannelException; 35 import org.objectweb.tribe.messages.ChannelMessage; 36 37 43 public abstract class AbstractChannelPool 44 { 45 protected HashMap channels; 47 protected HashMap serverChannels; 49 protected HashMap keyBuffers; 51 52 private static Trace logger = Trace.getLogger("org.objectweb.tribe.channel"); 53 54 57 public AbstractChannelPool() 58 { 59 channels = new HashMap (); 60 serverChannels = new HashMap (); 61 keyBuffers = new HashMap (); 62 } 63 64 74 public abstract AbstractReliableFifoChannel getChannel(Address destination) 75 throws ChannelException; 76 77 84 public boolean removeChannelFromPool(AbstractReliableFifoChannel channel) 85 { 86 synchronized (channels) 87 { 88 Collection values = channels.values(); 89 if (values == null) 90 return false; 91 return values.remove(channel); 92 } 93 } 94 95 105 public abstract AbstractServerChannel getServerChannel(Address serverAddress) 106 throws ChannelException; 107 108 115 public boolean removeServerChannelFromPool(AbstractServerChannel channel) 116 { 117 synchronized (serverChannels) 118 { 119 Collection values = serverChannels.values(); 120 if (values == null) 121 return false; 122 return values.remove(channel); 123 } 124 } 125 126 132 public void registerReceiveBuffer(ReceiveBuffer buffer) 133 { 134 Object key = buffer.getBufferKey(); 135 synchronized (keyBuffers) 136 { 137 if (logger.isDebugEnabled()) 138 logger.debug("Registering new receive buffer: " + key); 139 ArrayList buffers = (ArrayList ) keyBuffers.get(key); 141 if (buffers == null) 142 { buffers = new ArrayList (); 144 keyBuffers.put(key, buffers); 145 } 146 synchronized (buffers) 148 { 149 buffers.add(buffer); 150 } 151 } 152 } 153 154 162 public boolean unregisterReceiveBuffer(ReceiveBuffer buffer) 163 { 164 Object key = buffer.getBufferKey(); 165 synchronized (keyBuffers) 166 { 167 if (logger.isDebugEnabled()) 168 logger.debug("Unregistering receive buffer: " + key); 169 ArrayList buffers = (ArrayList ) keyBuffers.get(key); 171 if (buffers == null) 172 return false; synchronized (buffers) 175 { 176 boolean wasUnregistered = buffers.remove(buffer); 177 if (buffer.isEmpty()) 178 { Object [] channelsToClose = channels.values().toArray(); 180 for (int i = 0; i < channelsToClose.length; i++) 181 { 182 AbstractReliableFifoChannel channel = (AbstractReliableFifoChannel) channelsToClose[i]; 183 removeChannelFromPool(channel); 184 } 185 channels.clear(); 186 } 187 if (wasUnregistered) 188 buffer.addMessage(null); return wasUnregistered; 191 } 192 } 193 } 194 195 202 public ArrayList send(ChannelMessage msg, ArrayList members) 203 { 204 ArrayList failed = null; 205 Address lastAddress = null; 206 synchronized (members) 207 { 208 int size = members.size(); 209 for (int i = 0; i < size; i++) 210 { 211 Member m = (Member) members.get(i); 212 if (!m.getAddress().equals(lastAddress)) 214 { 215 AbstractReliableFifoChannel channel = null; 216 try 217 { 218 channel = getChannel(m.getAddress()); 219 channel.send(msg); 220 lastAddress = m.getAddress(); 221 } 222 catch (Exception e) 223 { 224 logger.debug("Failed to send message to member " + m, e); 225 if (failed == null) 226 failed = new ArrayList (); 227 failed.add(m); 228 removeChannelFromPool(channel); 229 lastAddress = null; 230 } 231 } 232 } 233 } 234 return failed; 235 } 236 237 } | Popular Tags |