1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.Address; 7 import org.jgroups.Event; 8 import org.jgroups.Header; 9 import org.jgroups.Message; 10 import org.jgroups.stack.Protocol; 11 12 import java.io.IOException ; 13 import java.io.ObjectInput ; 14 import java.io.ObjectOutput ; 15 import java.util.Hashtable ; 16 import java.util.Properties ; 17 import java.util.Vector ; 18 19 20 21 28 29 public class PARTITIONER extends Protocol { 30 final Vector members=new Vector (); 31 Address local_addr=null; 32 int my_partition=1; 33 34 35 public String getName() {return "PARTITIONER";} 36 37 38 public boolean setProperties(Properties props) { 39 String str; 40 41 super.setProperties(props); 42 if(props.size() > 0) { 43 System.err.println("EXAMPLE.setProperties(): these properties are not recognized:"); 44 props.list(System.out); 45 return false; 46 } 47 return true; 48 } 49 50 51 52 public void reset() {} 53 54 55 59 60 public void up(Event evt) { 61 Message msg; 62 Integer num; 63 PartitionerHeader partHead=null; 64 65 switch(evt.getType()) { 66 67 case Event.SET_LOCAL_ADDRESS: 68 local_addr=(Address) evt.getArg(); 69 if(log.isInfoEnabled()) log.info("local address is " + local_addr); 70 break; 71 72 case Event.MSG: 73 msg=(Message)evt.getArg(); 74 partHead=(PartitionerHeader) msg.removeHeader(getName()); 75 if (partHead.type == PartitionerHeader.COMMAND) { 76 num = (Integer ) partHead.Destinations.get(local_addr); 77 if (num == null) return; 78 if(log.isInfoEnabled()) log.info("new partition = " + num); 79 my_partition =num.intValue(); 80 return; 81 } 82 if (partHead.type == PartitionerHeader.NORMAL && partHead.partition != my_partition ) return; 83 break; 84 } 85 86 passUp(evt); } 88 89 90 91 92 96 97 public void down(Event evt) { 98 Message msg; 99 Event newEvent; 100 PartitionerHeader partHeader; 101 102 switch(evt.getType()) { 103 104 case Event.SET_PARTITIONS: 105 if(log.isInfoEnabled()) log.info("SET_PARTITIONS received, argument " + evt.getArg().toString()); 107 msg = new Message(null,null,null); 108 partHeader = new PartitionerHeader(PartitionerHeader.COMMAND); 109 partHeader.Destinations = (Hashtable ) evt.getArg(); 110 msg.putHeader(getName(), partHeader); 111 passDown(new Event(Event.MSG,msg)); 112 break; 113 114 case Event.MSG: 115 msg=(Message)evt.getArg(); 116 msg.putHeader(getName(), new PartitionerHeader(PartitionerHeader.NORMAL,my_partition)); 117 break; 120 } 121 122 passDown(evt); } 124 125 126 127 128 129 135 136 public static class PartitionerHeader extends Header { 137 static final int NORMAL=0; static final int COMMAND=1; int type=0,partition=1; 141 Hashtable Destinations=null; 142 143 public PartitionerHeader () {} public PartitionerHeader (int type) { this.type= type; } 145 public PartitionerHeader (int type,int partition) { this.type= type; this.partition = partition; } 146 147 public String toString() { 148 switch (type) { 149 case NORMAL: return "NORMAL ->partition :" + partition; 150 case COMMAND: return "COMMAND ->hashtable :" + Destinations; 151 default: return "<unknown>"; 152 153 } 154 } 155 156 public void writeExternal(ObjectOutput out) throws IOException { 157 out.writeInt(type); 158 out.writeInt(partition); 159 out.writeObject(Destinations); 160 } 161 162 163 164 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 165 type=in.readInt(); 166 partition=in.readInt(); 167 Destinations=(Hashtable )in.readObject(); 168 } 169 } 170 171 172 173 174 } 175 | Popular Tags |