1 package org.jgroups.tests.perf.transports; 2 3 import org.jgroups.*; 4 import org.jgroups.tests.perf.Receiver; 5 import org.jgroups.tests.perf.Transport; 6 7 import java.util.Properties ; 8 9 14 public class JGroupsTransport implements Transport, Runnable { 15 Properties config=null; 16 JChannel channel=null; 17 Thread t=null; 18 String props=null; 19 String group_name="PerfGroup"; 20 Receiver receiver=null; 21 22 public JGroupsTransport() { 23 24 } 25 26 public Object getLocalAddress() { 27 return channel != null? channel.getLocalAddress() : null; 28 } 29 30 public void create(Properties properties) throws Exception { 31 this.config=properties; 32 props=config.getProperty("props"); 33 channel=new JChannel(props); 34 } 35 36 public void start() throws Exception { 37 channel.connect(group_name); 38 t=new Thread (this, "JGroupsTransport receiver thread"); 39 t.start(); 40 } 41 42 public void stop() { 43 if(channel != null) { 44 channel.disconnect(); } 46 t=null; 47 } 48 49 public void destroy() { 50 if(channel != null) { 51 channel.close(); 52 channel=null; 53 } 54 } 55 56 public void setReceiver(Receiver r) { 57 this.receiver=r; 58 } 59 60 public void send(Object destination, byte[] payload) throws Exception { 61 Message msg=new Message((Address)destination, null, payload); 62 channel.send(msg); 63 } 64 65 public void run() { 66 Object obj; 67 Message msg; 68 Object sender; 69 byte[] payload; 70 71 while(t != null) { 72 try { 73 obj=channel.receive(0); 74 if(obj instanceof Message) { 75 msg=(Message)obj; 76 sender=msg.getSrc(); 77 payload=msg.getBuffer(); 78 if(receiver != null) { 79 try { 80 receiver.receive(sender, payload); 81 } 82 catch(Throwable t) { 83 t.printStackTrace(); 84 } 85 } 86 } 87 } 88 catch(ChannelNotConnectedException e) { 89 t=null; 90 } 91 catch(ChannelClosedException e) { 92 t=null; 93 } 94 catch(TimeoutException e) { 95 e.printStackTrace(); 96 } 97 } 98 } 99 } 100 | Popular Tags |