KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jfox > ioc > connector > ClusterServer


1 /* JFox, the OpenSource J2EE Application Server
2  *
3  * Distributable under GNU LGPL license by gun.org
4  * more details please visit http://www.huihoo.org/jfox
5  */

6
7 package org.jfox.ioc.connector;
8
9 import java.rmi.MarshalledObject JavaDoc;
10 import java.util.ArrayList JavaDoc;
11 import java.util.Collections JavaDoc;
12 import java.util.HashMap JavaDoc;
13 import java.util.List JavaDoc;
14 import java.util.Map JavaDoc;
15
16 import org.jfox.ioc.common.AbstractService;
17 import org.jfox.ioc.util.Marshaller;
18 import org.jgroups.Address;
19 import org.jgroups.Channel;
20 import org.jgroups.JChannel;
21 import org.jgroups.MembershipListener;
22 import org.jgroups.Message;
23 import org.jgroups.View;
24 import org.jgroups.blocks.GroupRequest;
25 import org.jgroups.blocks.MessageDispatcher;
26 import org.jgroups.blocks.RequestHandler;
27 import org.jgroups.stack.IpAddress;
28
29 /**
30  * 负责接收JGroup消息,并派发给相应的Handler
31  * @author <a HREF="mailto:young_yy@hotmail.com">Young Yang</a>
32  */

33
34 public class ClusterServer extends AbstractService implements RequestHandler,MembershipListener {
35
36     public final static String JavaDoc CHANNEL_NAME = "__JFOX_CLUSTER__";
37
38     /**
39      * JGroup Properties
40      */

41     protected String JavaDoc jgroupProps =
42             "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" +
43             "PING(timeout=3000;num_initial_members=6):" +
44             "FD(timeout=3000):" +
45             "VERIFY_SUSPECT(timeout=1500):" +
46             "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):" +
47             "UNICAST(timeout=600,1200,2400,4800):" +
48             "pbcast.STABLE(desired_avg_gossip=10000):" +
49             "FRAG:" +
50             "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
51             "shun=true;print_local_addr=true)";
52
53     /**
54      * 该节点的信息
55      */

56     private ServerNode theNode = ServerNode.THE_NODE;
57
58     /**
59      * 集群中的其它节点
60      * key=ip:udp_port
61      */

62     private Map JavaDoc<String JavaDoc, ServerNode> clusterNodes = new HashMap JavaDoc<String JavaDoc, ServerNode>();
63
64     // channel
65
private JChannel channel = null;
66
67     // message dispatcher
68
private MessageDispatcher disp = null;
69
70     public ClusterServer() {
71     }
72
73     public String JavaDoc getChannelName() {
74         return CHANNEL_NAME;
75     }
76
77     public String JavaDoc getJGroupProps() {
78         return jgroupProps;
79     }
80
81     public void setJGroupProps(String JavaDoc jgroupProps) {
82         this.jgroupProps = jgroupProps;
83     }
84
85     /**
86      * 取得当前集群系统中所有的节点
87      */

88     public synchronized List JavaDoc<ServerNode> getClusterNodes() {
89         return Collections.unmodifiableList(new ArrayList JavaDoc<ServerNode>(clusterNodes.values()));
90     }
91     
92     public Object JavaDoc handle(Message msg) {
93         // 正常情况下,wrappedObject 是 MarshalledObject,见 cast
94
Object JavaDoc wrappedObject = msg.getObject();
95         String JavaDoc debugMsg = "handle message " + wrappedObject;
96         try {
97             if(wrappedObject instanceof MarshalledObject JavaDoc) {
98                 wrappedObject = ((MarshalledObject JavaDoc) wrappedObject).get();
99                 debugMsg += wrappedObject.toString();
100             }
101         }
102         catch(Exception JavaDoc e) {
103             e.printStackTrace();
104         }
105         logger.debug(debugMsg);
106
107         if(wrappedObject instanceof ServerNode) { // 不用同步,JGroups会自动同步View
108
// 收集集群节点,增加一个新的节点
109
ServerNode node = (ServerNode) wrappedObject;
110             collectServerNode(node);
111         }
112         else if(wrappedObject instanceof ClusterInvocation) {
113             // 同步Bean
114
ClusterInvocation invocation = (ClusterInvocation) wrappedObject;
115             //从 ClusterInvocation 中提取 ServerNode,如果是新节点,收集该节点
116
ServerNode sourceNode = invocation.getClusterNode();
117             collectServerNode(sourceNode);
118             try {
119                 HandlerManager.getInstance().execute(invocation);
120             }
121             catch(Throwable JavaDoc e) {
122                 logger.warn("cluster synchronize error!", e);
123             }
124         }
125         return null;
126     }
127
128     public void viewAccepted(View view) {
129         try {
130             logger.debug("viewAccepted: " + view);
131             //对 View 的Members 和 ClusterNode 进行同步
132
Map JavaDoc<String JavaDoc, ServerNode> newClusterNodes = new HashMap JavaDoc<String JavaDoc, ServerNode>();
133             List JavaDoc list = view.getMembers();
134             for(Object JavaDoc addr : list) {
135                 IpAddress ipAddress = (IpAddress) addr;
136                 String JavaDoc key = ipAddress.getIpAddress().getHostAddress() + ":" + ipAddress.getPort();
137                 if(ipAddress.getIpAddress().getHostAddress().equals(theNode.getIp())
138                    && (ipAddress.getPort() == theNode.getJgport())) {
139                     continue;
140                 }
141                 // 复制已经含有并活着的 node
142
if(clusterNodes.containsKey(key)) {
143                     newClusterNodes.put(key, clusterNodes.get(key));
144                 }
145             }
146             clusterNodes = newClusterNodes;
147             //有可能来了一个新的节点,所以立即把自己的ServerNode发布出去,以便能够注册到新的节点中
148
Message myNodeMessage = new Message(null, null, theNode);
149             //此处不能使用 GET_ALL,否则会死锁
150
disp.castMessage(null,myNodeMessage,GroupRequest.GET_NONE,0);
151         }
152         catch(Exception JavaDoc e) {
153             logger.warn(e.getMessage(), e);
154         }
155
156     }
157
158     public void suspect(Address suspected_mbr) {
159         logger.debug("suspect: " + suspected_mbr);
160     }
161
162     public void block() {
163         logger.debug("block invoked.");
164     }
165
166     /**
167      * 往集群中发布需要同步的Invocation信息
168      *
169      * @param invocation
170      * @throws Exception
171      */

172     public void cast(Invocation invocation) throws Exception JavaDoc {
173         //只有Clusteable的Invocation才发布
174
if(invocation != null && (invocation instanceof ClusterInvocation)) {
175 // ((ClusterInvocation)invocation).getInvocation().setMethod(null);
176
MarshalledObject JavaDoc mobj = Marshaller.marshall(invocation);
177             Message msg = new Message(null, null, mobj);
178 // channel.send(msg);
179
//GroupRequest.GET_NONE 异步模式,不等待返回ReponseList
180
disp.castMessage(null,msg, GroupRequest.GET_ALL,0);
181         }
182     }
183
184     protected void doInit() throws Exception JavaDoc {
185         this.channel = new JChannel(jgroupProps);
186         channel.setOpt(Channel.GET_STATE_EVENTS, new Boolean JavaDoc(true));
187         channel.setOpt(Channel.AUTO_RECONNECT, new Boolean JavaDoc(true));
188         channel.setOpt(Channel.AUTO_GETSTATE, new Boolean JavaDoc(true));
189         channel.setOpt(Channel.LOCAL, new Boolean JavaDoc(false));
190
191         logger.debug("Channel " + CHANNEL_NAME + " created.");
192     }
193
194     protected void doStart() throws Exception JavaDoc {
195         channel.connect(CHANNEL_NAME);
196         logger.info("Channel " + CHANNEL_NAME + " connected.");
197         theNode.setJgport(((IpAddress) channel.getLocalAddress()).getPort());
198         disp = new MessageDispatcher(channel, null, this, this, true);
199         logger.error("run here impossible.");
200     }
201
202     protected void doStop() throws Exception JavaDoc {
203         disp.stop();
204         channel.close();
205     }
206
207     protected void doDestroy() throws Exception JavaDoc {
208         channel = null;
209         disp = null;
210     }
211
212     public void run() {
213         try {
214         }
215         catch(Exception JavaDoc e){
216             logger.error(e.getMessage(),e);
217         }
218     }
219
220     /**
221      * 收集节点信息
222      * @param node
223      */

224     private void collectServerNode(ServerNode node){
225         String JavaDoc key = node.getIp() + ":" + node.getJgport();
226         if(!node.equals(theNode) && !clusterNodes.containsKey(key)) {
227             logger.info("collect new ServerNode: " + key);
228             clusterNodes.put(key, node);
229         }
230     }
231
232     public static void main(String JavaDoc[] args) {
233
234     }
235 }
236
Popular Tags