1 55 package org.lateralnz.messaging.lrmp; 56 57 import java.io.ByteArrayInputStream ; 58 import java.io.IOException ; 59 import java.io.Serializable ; 60 import java.util.HashMap ; 61 62 import org.apache.log4j.Logger; 63 64 import inria.net.lrmp.*; 65 66 import org.lateralnz.common.model.TimestampedObject; 67 import org.lateralnz.common.util.Constants; 68 import org.lateralnz.common.util.NumericUtils; 69 import org.lateralnz.common.util.ObjectUtils; 70 import org.lateralnz.messaging.AbstractMessageHandler; 71 import org.lateralnz.messaging.Message; 72 import org.lateralnz.messaging.MessageHandler; 73 import org.lateralnz.messaging.util.PacketUtils; 74 75 80 public class LRMPMessageHandler extends AbstractMessageHandler implements LrmpEventHandler, MessageHandler, Constants, Serializable { 81 private static final Logger log = Logger.getLogger(LRMPMessageHandler.class.getName()); 82 83 Lrmp lrmp; 84 LrmpEntity sender = null; 85 String outfile = null; 86 87 private HashMap receivedPackets = new HashMap (); 88 89 protected LRMPMessageHandler(String ipaddress, int port) throws Exception { 90 LrmpProfile profile = new LrmpProfile(); 91 92 profile.setEventHandler(this); 93 94 profile.reliability = LrmpProfile.NoLoss; 95 96 profile.ordered = true; 97 98 profile.throughput = LrmpProfile.AdaptedThroughput; 99 100 profile.minRate = 8; 101 profile.maxRate = 64; 102 103 profile.sendWindowSize = 64; 104 profile.rcvWindowSize = 64; 105 106 try { 107 lrmp = new Lrmp(ipaddress, port, 32, profile); 108 } 109 catch (LrmpException e) { 110 System.err.println("Failed to create Lrmp - " + e); 111 System.exit(1); 112 } 113 114 lrmp.start(); 115 116 } 117 118 public void send(Message msg) throws IOException { 119 if (!willTransmit(msg.getGroup())) { 120 if (log.isDebugEnabled()) { 121 log.debug("transmit disabled for " + msg.getGroup()); 122 } 123 return; 124 } 125 else if (log.isDebugEnabled()) { 126 log.debug("sending message " + msg.toString()); 127 } 128 129 try { 130 131 LrmpPacket pack = new LrmpPacket(); 132 int offset = pack.getOffset(); 133 byte buffer[] = pack.getDataBuffer(); 134 135 byte[] packetid = PacketUtils.getPacketID(); 138 int headerLen = 1 + packetid.length; 139 143 ByteArrayInputStream in = new ByteArrayInputStream (ObjectUtils.serialize(msg)); 144 int pos = 0; 145 while (in.available() > 0) { 146 pack = new LrmpPacket(); 147 offset = pack.getOffset(); 148 buffer = pack.getDataBuffer(); 149 150 int maxLen = pack.getMaxDataLength() - headerLen; 151 152 int len = in.read(buffer, offset + headerLen, maxLen); 153 pack.setDataLength(len + headerLen); 154 155 System.arraycopy(packetid, 0, buffer, offset+1, packetid.length); 156 if (in.available() > 0) { 157 buffer[offset] = 1; 158 } 159 else { 160 buffer[offset] = 2; 161 } 162 163 lrmp.send(pack); 164 } 165 } 166 catch (IOException ioe) { 167 throw ioe; 168 } 169 catch (Exception e) { 170 e.printStackTrace(); 171 throw new IOException (e.getMessage()); 172 } 173 } 174 175 public void processData(LrmpPacket pack) { 176 if (!pack.isReliable()) { 177 log.error("unreliable packet " + pack); 178 return; 179 } 180 181 if (sender == null) { 182 sender = pack.getSource(); 183 } 184 189 byte buffer[] = pack.getDataBuffer(); 190 int offset = pack.getOffset(); 191 int length = pack.getDataLength(); 192 int packetid = NumericUtils.toInt(buffer, offset+1, 4); 193 String key = pack.getAddress().getHostAddress() + packetid; 194 195 byte[] tmpbuf = new byte[buffer.length - (offset + 5)]; 197 System.arraycopy(buffer, offset + 5, tmpbuf, 0, tmpbuf.length); 198 199 TimestampedObject to; 200 byte[] receivedBuf; 201 if (receivedPackets.containsKey(key)) { 202 to = (TimestampedObject)receivedPackets.get(key); 203 receivedBuf = (byte[])to.obj; 204 } 205 else { 206 receivedBuf = new byte[0]; 207 to = new TimestampedObject(); 208 } 209 210 byte[] newbuf = new byte[receivedBuf.length + tmpbuf.length]; 211 System.arraycopy(receivedBuf, 0, newbuf, 0, receivedBuf.length); 212 System.arraycopy(tmpbuf, 0, newbuf, receivedBuf.length, tmpbuf.length); 213 to.obj = newbuf; 214 215 if (buffer[offset] == 0) { 216 receivedPackets.put(key, to); 217 } 218 else { 219 receivedPackets.remove(key); 220 221 try { 222 Object obj = ObjectUtils.deserialize(newbuf); 223 Message msg = (Message)obj; 224 if (log.isDebugEnabled()) { 225 log.debug("received event with group " + msg.getGroup()); 226 } 227 notifyListeners(msg); 228 } 229 catch (Exception e) { 230 e.printStackTrace(); 231 } 232 } 233 } 234 235 public void processEvent(int event, Object obj) { 236 switch (event) { 237 case LrmpEventHandler.UNRECOVERABLE_SEQUENCE_ERROR: 238 LrmpErrorEvent err = (LrmpErrorEvent) obj; 239 if (err.source == sender) { 240 log.error("reception failure"); 241 sender = null; 242 } 243 break; 244 case LrmpEventHandler.END_OF_SEQUENCE: 245 LrmpEntity s = (LrmpEntity) obj; 246 if (s == sender) { 247 log.error("sender gone"); 248 sender = null; 249 } 250 break; 251 default: 252 break; 253 } 254 } 255 } | Popular Tags |