1 package org.jgroups.protocols; 2 3 import org.jgroups.Address; 4 import org.jgroups.Event; 5 import org.jgroups.Message; 6 import org.jgroups.stack.IpAddress; 7 8 import java.util.Map ; 9 import java.util.concurrent.ConcurrentHashMap ; 10 11 12 18 public class SHARED_LOOPBACK extends TP { 19 private static int next_port=10000; 20 21 22 private static final Map <String ,Map <Address,SHARED_LOOPBACK>> routing_table=new ConcurrentHashMap <String ,Map <Address,SHARED_LOOPBACK>>(); 23 24 25 public SHARED_LOOPBACK() { 26 } 27 28 29 30 public String toString() { 31 return "SHARED_LOOPBACK(local address: " + local_addr + ')'; 32 } 33 34 public void sendToAllMembers(byte[] data, int offset, int length) throws Exception { 35 Map <Address,SHARED_LOOPBACK> dests=routing_table.get(channel_name); 36 if(dests == null) { 37 if(log.isWarnEnabled()) 38 log.warn("no destination found for " + channel_name); 39 return; 40 } 41 for(Map.Entry <Address,SHARED_LOOPBACK> entry: dests.entrySet()) { 42 Address dest=entry.getKey(); 43 SHARED_LOOPBACK target=entry.getValue(); 44 try { 45 target.receive(dest, local_addr, data, offset, length); 46 } 47 catch(Throwable t) { 48 log.error("failed sending message to " + dest, t); 49 } 50 } 51 } 52 53 public void sendToSingleMember(Address dest, byte[] data, int offset, int length) throws Exception { 54 Map <Address,SHARED_LOOPBACK> dests=routing_table.get(channel_name); 55 if(dests == null) { 56 if(log.isWarnEnabled()) 57 log.warn("no destination found for " + channel_name); 58 return; 59 } 60 SHARED_LOOPBACK target=dests.get(dest); 61 if(target == null) { 62 if(log.isWarnEnabled()) 63 log.warn("destination address " + dest + " not found"); 64 return; 65 } 66 target.receive(dest, local_addr, data, offset, length); 67 } 68 69 public String getInfo() { 70 return toString(); 71 } 72 73 public void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast) { 74 } 75 76 public void postUnmarshallingList(Message msg, Address dest, boolean multicast) { 77 } 78 79 80 81 public String getName() { 82 return "SHARED_LOOPBACK"; 83 } 84 85 94 95 public void init() throws Exception { 96 local_addr=new IpAddress("127.0.0.1", next_port++); 97 super.init(); 98 } 99 100 public void start() throws Exception { 101 super.start(); 102 } 103 104 public void stop() { 105 super.stop(); 106 } 107 108 109 public Object down(Event evt) { 110 Object retval=super.down(evt); 111 112 switch(evt.getType()) { 113 case Event.CONNECT: 114 register(channel_name, local_addr, this); 115 break; 116 117 case Event.DISCONNECT: 118 unregister(channel_name, local_addr); 119 break; 120 } 121 122 return retval; 123 } 124 125 private void register(String channel_name, Address local_addr, SHARED_LOOPBACK shared_loopback) { 126 Map <Address,SHARED_LOOPBACK> map=routing_table.get(channel_name); 127 if(map == null) { 128 map=new ConcurrentHashMap <Address,SHARED_LOOPBACK>(); 129 routing_table.put(channel_name, map); 130 } 131 map.put(local_addr, shared_loopback); 132 } 133 134 private void unregister(String channel_name, Address local_addr) { 135 Map <Address,SHARED_LOOPBACK> map=routing_table.get(channel_name); 136 if(map != null) { 137 map.remove(local_addr); 138 if(map.isEmpty()) { 139 routing_table.remove(channel_name); 140 } 141 } 142 } 143 144 } 145 | Popular Tags |