1 package org.jgroups.debug; 2 3 import org.jgroups.Address; 4 import org.jgroups.Event; 5 import org.jgroups.Message; 6 import org.jgroups.View; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.Queue; 9 import org.jgroups.util.QueueClosedException; 10 11 import java.util.HashMap ; 12 import java.util.Iterator ; 13 14 19 public class Simulator { 20 private Protocol[] protStack=null; 21 private ProtocolAdapter ad=new ProtocolAdapter(); 22 private Receiver r=null; 23 private Protocol top=null, bottom=null; 24 private Queue send_queue=new Queue(); 25 private Thread send_thread; 26 private Queue recv_queue=new Queue(); 27 private Thread recv_thread; 28 29 30 31 public static HashMap addrTable=new HashMap (); 32 private Address local_addr=null; 33 private View view; 34 35 public interface Receiver { 36 void receive(Event evt); 37 } 38 39 40 public void setProtocolStack(Protocol[] stack) { 41 this.protStack=stack; 42 this.protStack[0].setUpProtocol(ad); 43 this.protStack[this.protStack.length-1].setDownProtocol(ad); 44 top=protStack[0]; 45 bottom=this.protStack[this.protStack.length-1]; 46 47 if(protStack.length > 1) { 48 for(int i=0; i < protStack.length; i++) { 49 Protocol p1=protStack[i]; 50 Protocol p2=i+1 >= protStack.length? null : protStack[i+1]; 51 if(p2 != null) { 52 p1.setDownProtocol(p2); 53 p2.setUpProtocol(p1); 54 } 55 } 56 } 57 } 58 59 public void setLocalAddress(Address addr) { 60 this.local_addr=addr; 61 } 62 63 public void setView(View v) { 64 this.view=v; 65 } 66 67 public void setReceiver(Receiver r) { 68 this.r=r; 69 } 70 71 public void send(Event evt) { 72 top.down(evt); 73 } 74 75 public void receive(Event evt) { 76 try { 77 recv_queue.add(evt); 78 } 79 catch(QueueClosedException e) { 80 } 81 } 82 83 public void start() throws Exception { 84 if(local_addr == null) 85 throw new Exception ("local_addr has to be non-null"); 86 if(protStack == null) 87 throw new Exception ("protocol stack is null"); 88 89 bottom.up(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 90 if(view != null) 91 top.down(new Event(Event.VIEW_CHANGE, view)); 92 93 94 send_thread=new Thread () { 95 public void run() { 96 Event evt; 97 while(send_thread != null) { 98 try { 99 evt=(Event)send_queue.remove(); 100 if(evt.getType() == Event.MSG) { 101 Message msg=(Message)evt.getArg(); 102 Address dst=msg.getDest(); 103 if(msg.getSrc() == null) 104 ((Message)evt.getArg()).setSrc(local_addr); 105 Simulator s; 106 if(dst == null) { 107 for(Iterator it=addrTable.values().iterator(); it.hasNext();) { 108 s=(Simulator)it.next(); 109 s.receive(evt); 110 } 111 } 112 else { 113 s=(Simulator)addrTable.get(dst); 114 if(s != null) 115 s.receive(evt); 116 } 117 } 118 } 119 catch(QueueClosedException e) { 120 send_thread=null; 121 break; 122 } 123 } 124 } 125 }; 126 send_thread.start(); 127 128 129 recv_thread=new Thread () { 130 public void run() { 131 Event evt; 132 while(recv_thread != null) { 133 try { 134 evt=(Event)recv_queue.remove(); 135 bottom.up(evt); 136 } 137 catch(QueueClosedException e) { 138 recv_thread=null; 139 break; 140 } 141 } 142 } 143 }; 144 recv_thread.start(); 145 } 146 147 public void stop() { 148 recv_thread=null; 149 recv_queue.close(false); 150 send_thread=null; 151 send_queue.close(false); 152 } 153 154 155 156 157 class ProtocolAdapter extends Protocol { 158 159 public String getName() { 160 return "ProtocolAdapter"; 161 } 162 163 public void up(Event evt) { 164 if(r != null) 165 r.receive(evt); 166 } 167 168 169 public void down(Event evt) { 170 try { 171 send_queue.add(evt); 172 } 173 catch(QueueClosedException e) { 174 } 175 } 176 } 177 178 179 180 } 181 | Popular Tags |