1 package com.ubermq.jms.client.multicast; 2 3 import com.ubermq.jms.common.datagram.impl.*; 4 import com.ubermq.kernel.*; 5 import inria.net.lrmp.*; 6 import java.io.*; 7 import java.nio.*; 8 9 13 public class LrmpConnectionInfo 14 extends AbstractConnectionInfo 15 implements 16 IConnectionInfo, 17 LrmpEventHandler 18 { 19 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(LrmpConnectionInfo.class); 20 21 private javax.jms.Connection cxn; 22 private Lrmp lrmp; 23 private LrmpProfile profile; 24 25 35 public LrmpConnectionInfo(javax.jms.Connection cxn, 36 String group, 37 int port, 38 int ttl, 39 IMessageProcessor proc) 40 throws java.io.IOException 41 { 42 super(proc, DatagramFactory.getInstance()); 43 this.cxn = cxn; 44 45 profile = new LrmpProfile(); 47 profile.setEventHandler(this); 48 49 profile.reliability = LrmpProfile.NoLoss; 50 profile.ordered = false; 51 profile.rcvReportSelection = LrmpProfile.RandomReceiverReport; 52 53 54 profile.throughput = LrmpProfile.AdaptedThroughput; 55 56 57 profile.minRate = 8; 58 profile.maxRate = 64; 59 60 61 profile.sendWindowSize = 64; 62 profile.rcvWindowSize = 64; 63 64 try { 66 lrmp = new Lrmp(group, 67 port, 68 ttl, 69 profile); 70 } catch(LrmpException e) { 71 log.error("", e); 72 throw new java.io.IOException (e.toString()); 73 } 74 75 lrmp.start(); 77 } 78 79 public void close() 80 { 81 lrmp.stop(); 82 lrmp = null; 83 } 84 85 public String toString() 86 { 87 return "lrmp://" + lrmp.getAddress() + ":" + lrmp.getPort(); 88 } 89 90 public void start() 91 { 92 lrmp.start(); 93 } 94 95 public void stop() 96 { 97 lrmp.stop(); 98 } 99 100 public int doWrite(ByteBuffer writeBuffer) throws java.io.IOException 101 { 102 LrmpPacket packet = new LrmpPacket(); 104 ByteBuffer packetBuffer = ByteBuffer.wrap(packet.getDataBuffer(), 105 packet.getOffset(), 106 packet.getMaxDataLength()); 107 packetBuffer.put(writeBuffer); 108 packet.setDataLength(packetBuffer.position() - packet.getOffset()); 109 110 try { 112 lrmp.send(packet); 113 return packet.getDataLength(); 114 } catch(LrmpException e) { 115 throw new IOException(e.getMessage()); 116 } 117 } 118 119 public synchronized void processData(LrmpPacket pack) 120 { 121 ByteBuffer readBuffer = null; 122 try { 123 readBuffer = getReadBuffer(); 124 125 if (readBuffer.position() != 0) 126 log.fatal("processData() position is nonzero"); 127 readBuffer.put(pack.getDataBuffer(), pack.getOffset(), pack.getDataLength()); 128 } 129 catch(InterruptedException ie) {} 130 finally { 131 releaseReadBuffer(readBuffer); 132 } 133 134 processData(); 136 } 137 138 public void processEvent(int event, Object obj) 139 { 140 switch (event) { 141 case LrmpEventHandler.UNRECOVERABLE_SEQUENCE_ERROR: 142 try 143 { 144 cxn.getExceptionListener().onException( 145 new MulticastSequenceException(obj.toString())); 146 } 147 catch (javax.jms.JMSException e) { 148 log.error("", e); 149 } 150 break; 151 default: 152 break; 153 } 154 } 155 156 } 157 | Popular Tags |