1 package org.jgroups.tests.perf.transports; 2 3 import org.jgroups.Address; 4 import org.jgroups.Message; 5 import org.jgroups.stack.IpAddress; 6 import org.jgroups.tests.perf.Transport; 7 8 import java.util.ArrayList ; 9 import java.util.List ; 10 import java.util.Properties ; 11 import java.util.StringTokenizer ; 12 13 20 public class JGroupsClusterTransport extends JGroupsTransport implements Transport { 21 List members; 22 23 public JGroupsClusterTransport() { 24 super(); 25 } 26 27 public void create(Properties properties) throws Exception { 28 super.create(properties); 29 String cluster_def=config.getProperty("cluster"); 30 if(cluster_def == null) 31 throw new Exception ("property 'cluster' is not defined"); 32 members=parseCommaDelimitedList(cluster_def); 33 } 34 35 36 public void send(Object destination, byte[] payload, boolean oob) throws Exception { 37 if(destination != null) { 38 Message msg=new Message((Address)destination, null, payload); 39 if(oob) 40 msg.setFlag(Message.OOB); 41 channel.send(msg); 42 } 43 else { 44 Address mbr; 46 for(int i=0; i < members.size(); i++) { 47 mbr=(Address)members.get(i); 48 Message msg=new Message((Address)destination, null, payload); 49 if(oob) 50 msg.setFlag(Message.OOB); 51 msg.setDest(mbr); 52 channel.send(msg); 53 } 54 } 55 } 56 57 58 private List parseCommaDelimitedList(String s) throws Exception { 59 List retval=new ArrayList (); 60 StringTokenizer tok; 61 String hostname, tmp; 62 int port; 63 Address addr; 64 int index; 65 66 if(s == null) return null; 67 tok=new StringTokenizer (s, ","); 68 while(tok.hasMoreTokens()) { 69 tmp=tok.nextToken(); 70 index=tmp.indexOf(':'); 71 if(index == -1) 72 throw new Exception ("host must be in format <host:port>, was " + tmp); 73 hostname=tmp.substring(0, index); 74 port=Integer.parseInt(tmp.substring(index+1)); 75 addr=new IpAddress(hostname, port); 76 retval.add(addr); 77 } 78 return retval; 79 } 80 } 81 | Popular Tags |