1 46 package org.mr.core.net; 47 48 58 59 import java.io.IOException ; 60 import java.net.InetAddress ; 61 import java.net.InetSocketAddress ; 62 import java.net.SocketAddress ; 63 import java.nio.channels.SocketChannel ; 64 import java.util.*; 65 66 import org.apache.commons.logging.Log; 67 import org.apache.commons.logging.LogFactory; 68 69 public class TransportTable { 70 private HashMap agentTable; 71 private HashMap addressTable; 72 private HashMap udpAddressTable; 73 private HashMap localTCPTable; 74 private HashMap pendingTable; 75 private HashMap indirectTable; 76 private HashMap agentAccessibleCache; 77 private Log log; 78 79 81 84 public TransportTable() { 85 this.agentTable = new HashMap(); 86 this.addressTable = new HashMap(); 87 this.localTCPTable = new HashMap(); 88 this.pendingTable = new HashMap(); 89 this.indirectTable = new HashMap(); 90 this.agentAccessibleCache = new HashMap(); 91 this.log = LogFactory.getLog("TransportTable"); 92 } 94 public synchronized void addAgent(String agent, Set transports) { 95 Iterator i = transports.iterator(); 96 while (i.hasNext()) { 97 Transport t = (Transport) i.next(); 98 TransportInfo info = t.getInfo(); 99 addressTable.put(info.getSocketAddress(), t); 100 } 101 agentTable.put(agent, transports); 102 agentAccessibleCache.remove(agent); 103 } 104 105 public synchronized void addTransport(String agent, Transport t) { 106 Set transports = (Set) agentTable.get(agent); 107 if (transports != null) { 108 transports.add(t); 109 110 if (t.isIndirect()) { 111 this.indirectTable.put(agent, t); 112 } else { 113 TransportInfo info = t.getInfo(); 114 if (!t.isPassive()) { 115 Transport old = (Transport) 116 addressTable.put(info.getSocketAddress(), t); 117 if (old != null) { 118 old.shutdown(); 121 } 122 } 123 } 124 } 125 agentAccessibleCache.remove(agent); 126 } 127 128 public synchronized Set removeAgent(String agent) { 129 Set transports = (Set) agentTable.remove(agent); 130 if (transports != null) { 131 Iterator i = transports.iterator(); 132 while (i.hasNext()) { 133 TransportInfo info = ((Transport) i.next()).getInfo(); 134 addressTable.remove(info.getSocketAddress()); 135 } 136 } 137 agentAccessibleCache.remove(agent); 138 139 return transports; 140 } 141 142 public synchronized Transport removeTransport(String agent, 143 SocketAddress addr, 144 boolean indirect) 145 { 146 Set transports = (Set) agentTable.get(agent); 147 Transport t = null; 148 149 if (indirect) { 150 t = (Transport) indirectTable.remove(agent); 151 } else { 152 t = (Transport) addressTable.remove(addr); 153 if (t == null) { 154 t = (Transport) udpAddressTable.remove(((InetSocketAddress ) addr).getAddress()); 155 } 156 } 157 if (t != null) { 158 transports.remove(t); 159 } 160 agentAccessibleCache.remove(agent); 161 162 return t; 163 } 164 165 public synchronized Collection getLocalTransports() { 166 return this.localTCPTable.values(); 167 } 168 169 public synchronized void addLocalTransport(LocalTransport t) { 170 SocketAddress addr = t.getSocketAddress(); 171 InetSocketAddress iaddr = (InetSocketAddress ) addr; 172 localTCPTable.put(addr, t); 173 agentAccessibleCache.clear(); 174 } 175 176 public synchronized LocalTransport removeLocalTransport(SocketAddress addr) 177 { 178 agentAccessibleCache.clear(); 179 return (LocalTransport) localTCPTable.remove(addr); 180 } 181 182 public synchronized LocalTransport getLocalTransport(SocketAddress addr) { 183 return (LocalTransport) localTCPTable.get(addr); 184 } 185 186 public synchronized TransportImpl 187 addPendingTransport(SocketChannel channel, NetworkListener listener) { 188 SocketAddress localAddr = channel.socket().getLocalSocketAddress(); 189 SocketAddress remoteAddr = channel.socket().getRemoteSocketAddress(); 190 LocalTCPTransport localTran = 191 (LocalTCPTransport) localTCPTable.get(localAddr); 192 TransportImpl impl = null; 193 194 if (localTran == null) { 195 try { 196 int port = ((InetSocketAddress ) localAddr).getPort(); 197 localAddr = new InetSocketAddress ("0.0.0.0", port); 198 localTran = (LocalTCPTransport) localTCPTable.get(localAddr); 199 } catch (Exception e) {} 200 } 201 if(log.isInfoEnabled()){ 202 log.info("AddPending: local = " + 203 (localAddr == null ? "null" : localAddr.toString()) + 204 "; remote = " + 205 (remoteAddr == null ? "null" : remoteAddr.toString()) + 206 "; localTran is " + (localTran == null ? "null" : 207 localTran.getInfo().toString())+"."); 208 } 209 210 if (remoteAddr != null) { 211 try { 212 impl = TransportProvider.createImpl(localTran.getInfo(). 213 getTransportInfoType(), 214 channel); 215 impl.setListener(listener); 216 pendingTable.put(remoteAddr, impl); 217 } catch (IOException e) { 218 e.printStackTrace(); 220 } 221 return impl; 222 } 223 return null; 224 } 225 226 public synchronized void addPendingTransport(TransportImpl impl) { 227 pendingTable.put(impl.getRemoteSocketAddress(), impl); 228 } 229 230 public synchronized TransportImpl 231 removePendingTransport(SocketAddress addr) { 232 return (TransportImpl) pendingTable.remove(addr); 233 } 234 235 251 public synchronized boolean associatePending(SocketAddress addr, 252 String remoteName, 253 String myName, boolean initId) 254 { 255 TransportImpl pending = removePendingTransport(addr); 256 Set transports = getTransports(remoteName); 257 258 if (transports != null) { 259 Iterator i = transports.iterator(); 260 while (i.hasNext()) { 261 Transport t = (Transport) i.next(); 262 TransportType type = t.getInfo().getTransportInfoType(); 263 if (type.equals(pending.getType())) { 264 t.mergeImpl(pending, initId); 265 return true; 266 } 267 } 268 } 269 pending.shutdown(); 270 return false; 271 } 272 273 public synchronized boolean isPending(SocketAddress addr) { 274 return pendingTable.containsKey(addr); 275 } 276 277 public synchronized Set getTransports(String agent) { 278 return (Set) agentTable.get(agent); 279 } 280 281 public synchronized Transport getTransport(SocketAddress addr) { 282 return (Transport) addressTable.get(addr); 283 } 284 285 public synchronized Transport getUdpTransport(InetAddress addr) { 286 return (Transport) udpAddressTable.get(addr); 287 } 288 289 public synchronized SocketAddress getLocalAddress(String myName, 290 Transport t) { 291 Set transports = getTransports(myName); 292 Iterator i = transports.iterator(); 293 SocketAddress addr = null; 294 295 if (i.hasNext()) { 296 addr = ((Transport) i.next()).getInfo().getSocketAddress(); 297 } 298 299 return addr; 300 } 301 302 public synchronized boolean isLocalType(TransportType type) { 303 307 if (type == TransportType.MWB) { 308 return true; 309 } 310 311 Iterator i = this.localTCPTable.values().iterator(); 312 while (i.hasNext()) { 313 if (((TransportTypeable) i.next()).getTransportType() == type) { 314 return true; 315 } 316 } 317 318 return false; 319 } 320 321 public synchronized boolean exists(String agent, TransportInfo info, 322 boolean indirect) 323 { 324 if (indirect && this.indirectTable.get(agent) != null) { 325 return true; 326 } 327 if (!indirect) { 328 Transport t = 329 (Transport) this.addressTable.get(info.getSocketAddress()); 330 if (t != null && t.getRemoteAgentName().equals(agent)) { 331 return true; 332 } 333 } 334 return false; 335 } 336 337 343 public List getConnections(String myName) { 344 List result = new LinkedList(); 345 String remoteIP, localIP; 346 HashSet copy = new HashSet(); 347 synchronized(agentTable){ 348 349 copy.addAll( this.agentTable.keySet()) ; 350 } 351 Iterator i = copy.iterator(); 352 while (i.hasNext()) { 353 String agent = (String ) i.next(); 354 Iterator ii = getTransports(agent).iterator(); 355 while (ii.hasNext()) { 356 Transport t = (Transport) ii.next(); 357 Iterator iii = t.getConnectedImpls().iterator(); 358 while (iii.hasNext()) { 359 TransportImpl impl = (TransportImpl) iii.next(); 360 InetSocketAddress local = impl.getLocalSocketAddress(); 361 InetSocketAddress remote = impl.getRemoteSocketAddress(); 362 if (local == null || remote == null) { 363 continue; 364 } 365 Link link = 366 new Link(t.getInfo().getTransportInfoType().toString(), 367 myName, local.getAddress().getHostAddress(), 368 local.getPort(), agent, 369 remote.getAddress().getHostAddress(), 370 remote.getPort(),t.getTotalMessages(), 371 t.getTotalBytes(),t.getFiveMinMessages(), 372 t.getFiveMinBytes()); 373 result.add(link); 374 } 375 } 376 } 377 378 return result; 379 } 380 381 public synchronized boolean isAccessible(String agent) { 382 Boolean cacheVal = (Boolean ) this.agentAccessibleCache.get(agent); 383 boolean retVal = false; 384 385 if (cacheVal == null) { 386 Set transports = (Set) this.agentTable.get(agent); 387 if(transports != null){ 388 Iterator i = transports.iterator(); 389 while (i.hasNext()) { 390 Transport t = (Transport) i.next(); 391 if (isLocalType(t.getInfo().getTransportInfoType())) { 392 retVal = true; 393 break; 394 } 395 } 396 this.agentAccessibleCache.put(agent, new Boolean (retVal)); 397 } 398 399 } else { 400 retVal = cacheVal.booleanValue(); 401 } 402 403 return retVal; 404 } 405 406 public synchronized InetAddress getLocalInterface(String agent) { 407 Set transports = (Set) this.agentTable.get(agent); 408 if (transports != null) { 409 Iterator i = transports.iterator(); 410 while (i.hasNext()) { 411 Transport t = (Transport) i.next(); 412 InetSocketAddress saddr = t.getLocalSocketAddress(); 413 if (saddr != null) { 414 return saddr.getAddress(); 415 } 416 } 417 } 418 return null; 419 } 420 } | Popular Tags |