1 3 package org.jgroups.stack; 4 5 6 import org.jgroups.*; 7 import org.jgroups.blocks.GroupRequest; 8 import org.jgroups.blocks.RequestCorrelator; 9 import org.jgroups.blocks.RequestHandler; 10 import org.jgroups.util.Rsp; 11 import org.jgroups.util.RspList; 12 import org.jgroups.util.Util; 13 14 import java.util.Vector ; 15 16 17 18 19 34 public abstract class MessageProtocol extends Protocol implements RequestHandler { 35 protected RequestCorrelator _corr=null; 36 protected final Vector members=new Vector (); 37 38 39 public void start() throws Exception { 40 if(_corr == null) 41 _corr=new RequestCorrelator(getName(), this, this); 42 } 43 44 public void stop() { 45 if(_corr != null) { 46 _corr.stop(); 47 _corr=null; 48 } 49 } 50 51 52 73 public RspList castMessage(Vector dests, Message msg, int mode, long timeout) { 74 GroupRequest _req=null; 75 Vector real_dests=dests != null? (Vector )dests.clone() : (Vector )members.clone(); 76 77 80 _req=new GroupRequest(msg, _corr, real_dests, mode, timeout, 0); 81 _req.execute(); 82 83 return _req.getResults(); 84 } 85 86 87 91 public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException { 92 Vector mbrs=new Vector (); 93 RspList rsp_list=null; 94 Object dest=msg.getDest(); 95 Rsp rsp; 96 GroupRequest _req=null; 97 98 if(dest == null) { 99 System.out.println("MessageProtocol.sendMessage(): the message's destination is null ! " + 100 "Cannot send message !"); 101 return null; 102 } 103 104 105 mbrs.addElement(dest); 107 108 _req=new GroupRequest(msg, _corr, mbrs, mode, timeout, 0); 109 _req.execute(); 110 111 if(mode == GroupRequest.GET_NONE) 112 return null; 113 114 115 rsp_list=_req.getResults(); 116 117 if(rsp_list.size() == 0) { 118 if(log.isErrorEnabled()) log.error("response list is empty"); 119 return null; 120 } 121 if(rsp_list.size() > 1) 122 if(log.isErrorEnabled()) log.error("response list contains " + 123 "more that 1 response; returning first response"); 124 rsp=(Rsp)rsp_list.elementAt(0); 125 if(rsp.wasSuspected()) 126 throw new SuspectedException(dest); 127 if(!rsp.wasReceived()) 128 throw new TimeoutException(); 129 return rsp.getValue(); 130 } 131 132 133 136 public Object handle(Message req) { 137 System.out.println("MessageProtocol.handle(): this method should be overridden !"); 138 return null; 139 } 140 141 142 145 public final void up(Event evt) { 146 Message msg; 147 Object hdr; 148 149 switch(evt.getType()) { 150 case Event.VIEW_CHANGE: 151 updateView((View)evt.getArg()); 152 break; 153 default: 154 if(!handleUpEvent(evt)) return; 155 156 if(evt.getType() == Event.MSG) { 157 msg=(Message)evt.getArg(); 158 hdr=msg.getHeader(getName()); 159 if(!(hdr instanceof RequestCorrelator.Header)) 160 break; 161 } 162 if(_corr != null) { 168 _corr.receive(evt); 169 return; 170 } 171 else 172 if(log.isWarnEnabled()) log.warn("Request correlator is null, evt=" + Util.printEvent(evt)); 173 174 break; 175 } 176 177 passUp(evt); 178 } 179 180 181 186 public final void down(Event evt) { 187 switch(evt.getType()) { 188 case Event.VIEW_CHANGE: 189 updateView((View)evt.getArg()); 190 if(!handleDownEvent(evt)) return; 191 break; 192 case Event.MSG: 193 if(!handleDownEvent(evt)) return; 194 break; 195 default: 196 if(!handleDownEvent(evt)) return; 197 break; 198 } 199 200 passDown(evt); 201 } 202 203 204 protected void updateView(View new_view) { 205 Vector new_mbrs=new_view.getMembers(); 206 if(new_mbrs != null) { 207 synchronized(members) { 208 members.removeAllElements(); 209 members.addAll(new_mbrs); 210 } 211 } 212 } 213 214 215 218 protected boolean handleUpEvent(Event evt) { 219 return true; 221 } 222 223 226 protected boolean handleDownEvent(Event evt) { 227 return true; 229 } 230 231 232 } 233 | Popular Tags |