1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.Protocol; 7 8 import java.io.IOException ; 9 import java.io.ObjectInput ; 10 import java.io.ObjectOutput ; 11 import java.util.LinkedList ; 12 import java.util.ListIterator ; 13 import java.util.Vector ; 14 15 16 17 18 68 69 70 public class CAUSAL extends Protocol 71 { 72 73 public static class CausalHeader extends Header 74 { 75 78 private TransportedVectorTime t; 79 80 83 public CausalHeader() 84 { 85 } 86 87 public CausalHeader(TransportedVectorTime timeVector) 88 { 89 t = timeVector; 90 } 91 92 96 public TransportedVectorTime getVectorTime() 97 { 98 return t; 99 } 100 101 105 public long size() 106 { 107 108 110 return 231 + (t.size() * 4); 111 } 112 113 118 public void writeExternal(ObjectOutput out) throws IOException 119 { 120 out.writeObject(t); 121 } 122 123 127 public void readExternal(ObjectInput in) throws IOException , 128 ClassNotFoundException 129 { 130 t = (TransportedVectorTime) in.readObject(); 131 } 132 133 public String toString() 134 { 135 return "[CAUSALHEADER:" + t + ']'; 136 } 137 } 138 139 140 143 private VectorTime localVector; 144 145 148 private LinkedList delayQueue; 149 150 153 private Address localAddress; 154 155 156 159 public CAUSAL() 160 { 161 } 162 163 167 private void addToDelayQueue(TransportedVectorTime tvt) 168 { 169 ListIterator i = delayQueue.listIterator(0); 170 TransportedVectorTime current = null; 171 while (i.hasNext()) 172 { 173 current = (TransportedVectorTime) i.next(); 174 if (tvt.lessThanOrEqual(current)) 175 { 176 delayQueue.add(i.previousIndex(), tvt); 177 return; 178 } 179 } 180 delayQueue.add(tvt); 181 } 182 183 187 public void down(Event evt) 188 { 189 switch (evt.getType()) 190 { 191 case Event.MSG: 192 Message msg = (Message) evt.getArg(); 193 194 if (msg.getDest() != null && !msg.getDest().isMulticastAddress()) 196 break; 197 198 Message causalMsg=new Message(msg.getDest(), msg.getSrc(), msg); 199 synchronized (this) 200 { 201 localVector.increment(); 202 causalMsg.putHeader(getName(), new CausalHeader(localVector.getTransportedVectorTime())); 203 } 204 passDown(new Event(Event.MSG, causalMsg)); 205 return; 206 } 207 passDown(evt); 208 } 209 210 214 public void up(Event evt) 215 { 216 switch (evt.getType()) 217 { 218 case Event.SET_LOCAL_ADDRESS: 219 localAddress = (Address) evt.getArg(); 220 localVector = new VectorTime(localAddress); 221 delayQueue = new LinkedList (); 222 break; 223 224 case Event.VIEW_CHANGE: 225 Vector newViewMembers = ((View) evt.getArg()).getMembers(); 226 localVector.merge((Vector ) newViewMembers.clone()); 227 localVector.reset(); 228 break; 229 230 case Event.MSG: 231 Object obj = null; 232 Message msg = (Message) evt.getArg(); 233 234 if (!((obj = msg.getHeader(getName())) instanceof CausalHeader)) 235 { 236 if(log.isErrorEnabled()) log.error("NO CAUSAL.Header found"); 237 passUp(evt); 238 return; 239 } 240 241 CausalHeader header = (CausalHeader) obj; 242 TransportedVectorTime messageVector = header.getVectorTime(); 243 244 synchronized (this) 245 { 246 if (localVector.isCausallyNext(messageVector)) 247 { 248 Object tmp=msg.getObject(); 249 passUp(new Event(Event.MSG, tmp)); 250 localVector.max(messageVector); 251 } 252 else 253 { 254 messageVector.setAssociatedMessage(msg); 255 addToDelayQueue(messageVector); 256 } 257 TransportedVectorTime queuedVector = null; 258 while ((delayQueue.size() > 0) && 259 localVector.isCausallyNext((queuedVector = (TransportedVectorTime) delayQueue.getFirst()))) 260 { 261 delayQueue.remove(queuedVector); 262 Object tmp=queuedVector.getAssociatedMessage().getObject(); 263 passUp(new Event(Event.MSG, tmp)); 264 localVector.max(queuedVector); 265 } 266 return; 267 } 268 269 } 270 passUp(evt); 271 } 272 273 277 public String getName() 278 { 279 return "CAUSAL"; 280 } 281 282 283 } 284 | Popular Tags |