1 package org.jgroups.protocols; 2 3 import org.jgroups.Address; 4 import org.jgroups.Event; 5 import org.jgroups.Message; 6 import org.jgroups.stack.Protocol; 7 8 import java.util.Vector ; 9 10 11 12 13 22 public class FLOWCONTROL extends Protocol { 23 24 final Vector queuedMsgs = new Vector (); 25 int sentMsgs = 0; 26 static final int MAXSENTMSGS = 1; 27 Address myAddr; 28 29 30 public FLOWCONTROL() { 31 } 32 public String getName() { 33 return "FLOWCONTROL"; 34 } 35 36 40 public void up(Event evt) { 41 Message msg; 42 switch (evt.getType()) { 43 case Event.SET_LOCAL_ADDRESS: myAddr = (Address) evt.getArg(); 44 break; 45 46 case Event.MSG: msg = (Message) evt.getArg(); 47 if(log.isDebugEnabled()) log.debug("Message received"); 48 if (msg.getSrc().equals(myAddr)) { 49 if (queuedMsgs.size() > 0) { 50 if(log.isDebugEnabled()) log.debug("Message from me received - Queue size was " + queuedMsgs.size()); 51 passDown((Event) queuedMsgs.remove(0)); 52 } else { 53 if(log.isDebugEnabled()) log.debug("Message from me received - No messages in queue"); 54 sentMsgs--; 55 } 56 } 57 } 58 passUp(evt); 59 } 60 61 64 public void down(Event evt) { 65 Message msg; 66 if (evt.getType()==Event.MSG) { 67 msg = (Message) evt.getArg(); 68 if ((msg.getDest() == null) || (msg.getDest().equals(myAddr))) { 69 if (sentMsgs < MAXSENTMSGS) { 70 sentMsgs++; 71 if(log.isDebugEnabled()) log.debug("Message " + sentMsgs + " sent"); 72 } else { 73 queuedMsgs.add(evt); return; 75 } 76 } 77 } 78 passDown(evt); 79 } 80 81 } 82 | Popular Tags |