1 package org.jgroups.tests.perf.transports; 2 3 import org.jgroups.tests.perf.Receiver; 4 import org.jgroups.tests.perf.Transport; 5 6 import javax.jms.*; 7 import javax.naming.InitialContext ; 8 import java.util.Properties ; 9 10 15 public class JmsTransport implements Transport, MessageListener { 16 Receiver receiver=null; 17 Properties config=null; 18 Object local_addr=null; 19 ConnectionFactory factory; 20 InitialContext ctx; 21 TopicConnection conn; 22 TopicSession session; 23 TopicPublisher pub; 24 TopicSubscriber sub; 25 Topic topic; 26 String topic_name="topic/testTopic"; 27 28 29 public JmsTransport() { 30 } 31 32 public Object getLocalAddress() { 33 return local_addr; 34 } 35 36 public void create(Properties properties) throws Exception { 37 this.config=properties; 38 39 String tmp=config.getProperty("topic"); 40 if(tmp != null) 41 topic_name=tmp; 42 43 ctx=new InitialContext (); 44 factory=(ConnectionFactory)ctx.lookup("ConnectionFactory"); 45 46 47 System.out.println("-- local_addr is " + local_addr); 49 } 50 51 52 public void start() throws Exception { 53 this.local_addr=conn.getClientID(); 54 conn=((TopicConnectionFactory)factory).createTopicConnection(); 55 session=conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 56 topic=(Topic)ctx.lookup(topic_name); 57 pub=session.createPublisher(topic); 58 sub=session.createSubscriber(topic); 59 sub.setMessageListener(this); 60 conn.start(); 61 } 62 63 public void stop() { 64 try { 65 conn.stop(); 66 } 67 catch(JMSException e) { 68 e.printStackTrace(); 69 } 70 } 71 72 public void destroy() { 73 } 74 75 public void setReceiver(Receiver r) { 76 this.receiver=r; 77 } 78 79 public void send(Object destination, byte[] payload) throws Exception { 80 if(destination != null) 81 throw new Exception ("JmsTransport.send(): unicast destination is not supported"); 82 BytesMessage msg=session.createBytesMessage(); 83 84 86 msg.writeInt(payload.length); 87 msg.writeBytes(payload, 0, payload.length); 88 pub.publish(topic, msg); 89 } 90 91 public void onMessage(Message message) { 92 Object sender=null; 93 if(message == null || !(message instanceof BytesMessage)) { 94 System.err.println("JmsTransport.onMessage(): received a non BytesMessage (" + message + "), discarding"); 95 return; 96 } 97 BytesMessage msg=(BytesMessage)message; 98 try { 99 100 102 int len=msg.readInt(); 103 byte[] payload=new byte[len]; 104 msg.readBytes(payload, len); 105 if(receiver != null) 106 receiver.receive(sender, payload); 107 } 108 catch(JMSException e) { 109 e.printStackTrace(); 110 } 111 112 } 113 114 115 } 116 | Popular Tags |