KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > client > multicast > LrmpConnectionInfo


1 package com.ubermq.jms.client.multicast;
2
3 import com.ubermq.jms.common.datagram.impl.*;
4 import com.ubermq.kernel.*;
5 import inria.net.lrmp.*;
6 import java.io.*;
7 import java.nio.*;
8
9 /**
10  * Implements a connection processor using an LRMP reliable
11  * multicast transport layer.
12  */

13 public class LrmpConnectionInfo
14     extends AbstractConnectionInfo
15     implements
16     IConnectionInfo,
17     LrmpEventHandler
18 {
19     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(LrmpConnectionInfo.class);
20     
21     private javax.jms.Connection JavaDoc cxn;
22     private Lrmp lrmp;
23     private LrmpProfile profile;
24
25     /**
26      * Creates a connection with LRMP with some reasonable defaults. The
27      * LRMP connection is set up to be lossless, with adaptive throughput
28      * control, limited to 64 kb/s.
29      * <P>
30      * @param cxn the JMS connection object.
31      * @param group the multicast group (a class D address)
32      * @param port the port
33      * @param ttl the time-to-live, one of the LrmpClientSession.TTL_xxx values
34      */

35     public LrmpConnectionInfo(javax.jms.Connection JavaDoc cxn,
36                               String JavaDoc group,
37                               int port,
38                               int ttl,
39                               IMessageProcessor proc)
40         throws java.io.IOException JavaDoc
41     {
42         super(proc, DatagramFactory.getInstance());
43         this.cxn = cxn;
44
45         // open LRMP
46
profile = new LrmpProfile();
47         profile.setEventHandler(this);
48
49         profile.reliability = LrmpProfile.NoLoss;
50         profile.ordered = false;
51         profile.rcvReportSelection = LrmpProfile.RandomReceiverReport;
52
53         /* the flow control */
54         profile.throughput = LrmpProfile.AdaptedThroughput;
55
56         /* the data rate in kbits/sec if the throughput is set to AdaptedThroughput */
57         profile.minRate = 8;
58         profile.maxRate = 64;
59
60         /* the buffer space in kilo bytes */
61         profile.sendWindowSize = 64;
62         profile.rcvWindowSize = 64;
63
64         // create the LRMP object.
65
try {
66             lrmp = new Lrmp(group,
67                             port,
68                             ttl,
69                             profile);
70         } catch(LrmpException e) {
71             log.error("", e);
72             throw new java.io.IOException JavaDoc(e.toString());
73         }
74
75         // get an id
76
lrmp.start();
77     }
78
79     public void close()
80     {
81         lrmp.stop();
82         lrmp = null;
83     }
84
85     public String JavaDoc toString()
86     {
87         return "lrmp://" + lrmp.getAddress() + ":" + lrmp.getPort();
88     }
89
90     public void start()
91     {
92         lrmp.start();
93     }
94
95     public void stop()
96     {
97         lrmp.stop();
98     }
99
100     public int doWrite(ByteBuffer writeBuffer) throws java.io.IOException JavaDoc
101     {
102         // flush the write buffer out to the LRMP
103
LrmpPacket packet = new LrmpPacket();
104         ByteBuffer packetBuffer = ByteBuffer.wrap(packet.getDataBuffer(),
105                                                   packet.getOffset(),
106                                                   packet.getMaxDataLength());
107         packetBuffer.put(writeBuffer);
108         packet.setDataLength(packetBuffer.position() - packet.getOffset());
109
110         // send it
111
try {
112             lrmp.send(packet);
113             return packet.getDataLength();
114         } catch(LrmpException e) {
115             throw new IOException(e.getMessage());
116         }
117     }
118
119     public synchronized void processData(LrmpPacket pack)
120     {
121         ByteBuffer readBuffer = null;
122         try {
123             readBuffer = getReadBuffer();
124
125             if (readBuffer.position() != 0)
126                 log.fatal("processData() position is nonzero");
127             readBuffer.put(pack.getDataBuffer(), pack.getOffset(), pack.getDataLength());
128         }
129         catch(InterruptedException JavaDoc ie) {}
130         finally {
131             releaseReadBuffer(readBuffer);
132         }
133
134         // now we process the data
135
processData();
136     }
137
138     public void processEvent(int event, Object JavaDoc obj)
139     {
140         switch (event) {
141             case LrmpEventHandler.UNRECOVERABLE_SEQUENCE_ERROR:
142                 try
143                 {
144                     cxn.getExceptionListener().onException(
145                         new MulticastSequenceException(obj.toString()));
146                 }
147                 catch (javax.jms.JMSException JavaDoc e) {
148                     log.error("", e);
149                 }
150                 break;
151             default:
152                 break;
153         }
154     }
155
156 }
157
Popular Tags