1 6 7 package org.jfox.ioc.connector; 8 9 import java.rmi.MarshalledObject ; 10 import java.util.ArrayList ; 11 import java.util.Collections ; 12 import java.util.HashMap ; 13 import java.util.List ; 14 import java.util.Map ; 15 16 import org.jfox.ioc.common.AbstractService; 17 import org.jfox.ioc.util.Marshaller; 18 import org.jgroups.Address; 19 import org.jgroups.Channel; 20 import org.jgroups.JChannel; 21 import org.jgroups.MembershipListener; 22 import org.jgroups.Message; 23 import org.jgroups.View; 24 import org.jgroups.blocks.GroupRequest; 25 import org.jgroups.blocks.MessageDispatcher; 26 import org.jgroups.blocks.RequestHandler; 27 import org.jgroups.stack.IpAddress; 28 29 33 34 public class ClusterServer extends AbstractService implements RequestHandler,MembershipListener { 35 36 public final static String CHANNEL_NAME = "__JFOX_CLUSTER__"; 37 38 41 protected String jgroupProps = 42 "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" + 43 "PING(timeout=3000;num_initial_members=6):" + 44 "FD(timeout=3000):" + 45 "VERIFY_SUSPECT(timeout=1500):" + 46 "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):" + 47 "UNICAST(timeout=600,1200,2400,4800):" + 48 "pbcast.STABLE(desired_avg_gossip=10000):" + 49 "FRAG:" + 50 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 51 "shun=true;print_local_addr=true)"; 52 53 56 private ServerNode theNode = ServerNode.THE_NODE; 57 58 62 private Map <String , ServerNode> clusterNodes = new HashMap <String , ServerNode>(); 63 64 private JChannel channel = null; 66 67 private MessageDispatcher disp = null; 69 70 public ClusterServer() { 71 } 72 73 public String getChannelName() { 74 return CHANNEL_NAME; 75 } 76 77 public String getJGroupProps() { 78 return jgroupProps; 79 } 80 81 public void setJGroupProps(String jgroupProps) { 82 this.jgroupProps = jgroupProps; 83 } 84 85 88 public synchronized List <ServerNode> getClusterNodes() { 89 return Collections.unmodifiableList(new ArrayList <ServerNode>(clusterNodes.values())); 90 } 91 92 public Object handle(Message msg) { 93 Object wrappedObject = msg.getObject(); 95 String debugMsg = "handle message " + wrappedObject; 96 try { 97 if(wrappedObject instanceof MarshalledObject ) { 98 wrappedObject = ((MarshalledObject ) wrappedObject).get(); 99 debugMsg += wrappedObject.toString(); 100 } 101 } 102 catch(Exception e) { 103 e.printStackTrace(); 104 } 105 logger.debug(debugMsg); 106 107 if(wrappedObject instanceof ServerNode) { ServerNode node = (ServerNode) wrappedObject; 110 collectServerNode(node); 111 } 112 else if(wrappedObject instanceof ClusterInvocation) { 113 ClusterInvocation invocation = (ClusterInvocation) wrappedObject; 115 ServerNode sourceNode = invocation.getClusterNode(); 117 collectServerNode(sourceNode); 118 try { 119 HandlerManager.getInstance().execute(invocation); 120 } 121 catch(Throwable e) { 122 logger.warn("cluster synchronize error!", e); 123 } 124 } 125 return null; 126 } 127 128 public void viewAccepted(View view) { 129 try { 130 logger.debug("viewAccepted: " + view); 131 Map <String , ServerNode> newClusterNodes = new HashMap <String , ServerNode>(); 133 List list = view.getMembers(); 134 for(Object addr : list) { 135 IpAddress ipAddress = (IpAddress) addr; 136 String key = ipAddress.getIpAddress().getHostAddress() + ":" + ipAddress.getPort(); 137 if(ipAddress.getIpAddress().getHostAddress().equals(theNode.getIp()) 138 && (ipAddress.getPort() == theNode.getJgport())) { 139 continue; 140 } 141 if(clusterNodes.containsKey(key)) { 143 newClusterNodes.put(key, clusterNodes.get(key)); 144 } 145 } 146 clusterNodes = newClusterNodes; 147 Message myNodeMessage = new Message(null, null, theNode); 149 disp.castMessage(null,myNodeMessage,GroupRequest.GET_NONE,0); 151 } 152 catch(Exception e) { 153 logger.warn(e.getMessage(), e); 154 } 155 156 } 157 158 public void suspect(Address suspected_mbr) { 159 logger.debug("suspect: " + suspected_mbr); 160 } 161 162 public void block() { 163 logger.debug("block invoked."); 164 } 165 166 172 public void cast(Invocation invocation) throws Exception { 173 if(invocation != null && (invocation instanceof ClusterInvocation)) { 175 MarshalledObject mobj = Marshaller.marshall(invocation); 177 Message msg = new Message(null, null, mobj); 178 disp.castMessage(null,msg, GroupRequest.GET_ALL,0); 181 } 182 } 183 184 protected void doInit() throws Exception { 185 this.channel = new JChannel(jgroupProps); 186 channel.setOpt(Channel.GET_STATE_EVENTS, new Boolean (true)); 187 channel.setOpt(Channel.AUTO_RECONNECT, new Boolean (true)); 188 channel.setOpt(Channel.AUTO_GETSTATE, new Boolean (true)); 189 channel.setOpt(Channel.LOCAL, new Boolean (false)); 190 191 logger.debug("Channel " + CHANNEL_NAME + " created."); 192 } 193 194 protected void doStart() throws Exception { 195 channel.connect(CHANNEL_NAME); 196 logger.info("Channel " + CHANNEL_NAME + " connected."); 197 theNode.setJgport(((IpAddress) channel.getLocalAddress()).getPort()); 198 disp = new MessageDispatcher(channel, null, this, this, true); 199 logger.error("run here impossible."); 200 } 201 202 protected void doStop() throws Exception { 203 disp.stop(); 204 channel.close(); 205 } 206 207 protected void doDestroy() throws Exception { 208 channel = null; 209 disp = null; 210 } 211 212 public void run() { 213 try { 214 } 215 catch(Exception e){ 216 logger.error(e.getMessage(),e); 217 } 218 } 219 220 224 private void collectServerNode(ServerNode node){ 225 String key = node.getIp() + ":" + node.getJgport(); 226 if(!node.equals(theNode) && !clusterNodes.containsKey(key)) { 227 logger.info("collect new ServerNode: " + key); 228 clusterNodes.put(key, node); 229 } 230 } 231 232 public static void main(String [] args) { 233 234 } 235 } 236 | Popular Tags |