KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > perf > transports > JmsTransport


1 package org.jgroups.tests.perf.transports;
2
3 import org.jgroups.tests.perf.Receiver;
4 import org.jgroups.tests.perf.Transport;
5
6 import javax.jms.*;
7 import javax.naming.InitialContext JavaDoc;
8 import java.util.Properties JavaDoc;
9
10 /**
11  * @author Bela Ban Jan 22
12  * @author 2004
13  * @version $Id: JmsTransport.java,v 1.1 2004/01/24 16:54:27 belaban Exp $
14  */

15 public class JmsTransport implements Transport, MessageListener {
16     Receiver receiver=null;
17     Properties JavaDoc config=null;
18     Object JavaDoc local_addr=null;
19     ConnectionFactory factory;
20     InitialContext JavaDoc ctx;
21     TopicConnection conn;
22     TopicSession session;
23     TopicPublisher pub;
24     TopicSubscriber sub;
25     Topic topic;
26     String JavaDoc topic_name="topic/testTopic";
27
28
29     public JmsTransport() {
30     }
31
32     public Object JavaDoc getLocalAddress() {
33         return local_addr;
34     }
35
36     public void create(Properties JavaDoc properties) throws Exception JavaDoc {
37         this.config=properties;
38
39         String JavaDoc tmp=config.getProperty("topic");
40         if(tmp != null)
41             topic_name=tmp;
42
43         ctx=new InitialContext JavaDoc();
44         factory=(ConnectionFactory)ctx.lookup("ConnectionFactory");
45
46
47         // local_addr=new IpAddress(ucast_sock.getLocalAddress(), ucast_sock.getLocalPort());
48
System.out.println("-- local_addr is " + local_addr);
49     }
50
51
52     public void start() throws Exception JavaDoc {
53         this.local_addr=conn.getClientID();
54         conn=((TopicConnectionFactory)factory).createTopicConnection();
55         session=conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
56         topic=(Topic)ctx.lookup(topic_name);
57         pub=session.createPublisher(topic);
58         sub=session.createSubscriber(topic);
59         sub.setMessageListener(this);
60         conn.start();
61     }
62
63     public void stop() {
64         try {
65             conn.stop();
66         }
67         catch(JMSException e) {
68             e.printStackTrace();
69         }
70     }
71
72     public void destroy() {
73     }
74
75     public void setReceiver(Receiver r) {
76         this.receiver=r;
77     }
78
79     public void send(Object JavaDoc destination, byte[] payload) throws Exception JavaDoc {
80         if(destination != null)
81             throw new Exception JavaDoc("JmsTransport.send(): unicast destination is not supported");
82         BytesMessage msg=session.createBytesMessage();
83
84         //todo: write the sender (maybe use ObjectMessage instead of BytesMessage)
85

86         msg.writeInt(payload.length);
87         msg.writeBytes(payload, 0, payload.length);
88         pub.publish(topic, msg);
89     }
90
91     public void onMessage(Message message) {
92         Object JavaDoc sender=null;
93         if(message == null || !(message instanceof BytesMessage)) {
94             System.err.println("JmsTransport.onMessage(): received a non BytesMessage (" + message + "), discarding");
95             return;
96         }
97         BytesMessage msg=(BytesMessage)message;
98         try {
99
100           // todo: read the sender
101

102             int len=msg.readInt();
103             byte[] payload=new byte[len];
104             msg.readBytes(payload, len);
105             if(receiver != null)
106                 receiver.receive(sender, payload);
107         }
108         catch(JMSException e) {
109             e.printStackTrace();
110         }
111
112     }
113
114
115 }
116
Popular Tags