KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > lateralnz > messaging > lrmp > LRMPMessageHandler


1 /* ====================================================================
2  * The LateralNZ Software License, Version 1.0
3  *
4  * Copyright (c) 2003 LateralNZ. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  *
13  * 2. Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in
15  * the documentation and/or other materials provided with the
16  * distribution.
17  *
18  * 3. The end-user documentation included with the redistribution,
19  * if any, must include the following acknowledgment:
20  * "This product includes software developed by
21  * LateralNZ (http://www.lateralnz.org/) and other third parties."
22  * Alternately, this acknowledgment may appear in the software itself,
23  * if and wherever such third-party acknowledgments normally appear.
24  *
25  * 4. The names "LateralNZ" must not be used to endorse or promote
26  * products derived from this software without prior written
27  * permission. For written permission, please
28  * contact oss@lateralnz.org.
29  *
30  * 5. Products derived from this software may not be called "Panther",
31  * or "Lateral" or "LateralNZ", nor may "PANTHER" or "LATERAL" or
32  * "LATERALNZ" appear in their name, without prior written
33  * permission of LateralNZ.
34  *
35  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
36  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
37  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
38  * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
39  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
40  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
41  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
42  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
43  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
44  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
45  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
46  * SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of LateralNZ. For more
51  * information on Lateral, please see http://www.lateralnz.com/ or
52  * http://www.lateralnz.org
53  *
54  */

55 package org.lateralnz.messaging.lrmp;
56
57 import java.io.ByteArrayInputStream JavaDoc;
58 import java.io.IOException JavaDoc;
59 import java.io.Serializable JavaDoc;
60 import java.util.HashMap JavaDoc;
61
62 import org.apache.log4j.Logger;
63
64 import inria.net.lrmp.*;
65
66 import org.lateralnz.common.model.TimestampedObject;
67 import org.lateralnz.common.util.Constants;
68 import org.lateralnz.common.util.NumericUtils;
69 import org.lateralnz.common.util.ObjectUtils;
70 import org.lateralnz.messaging.AbstractMessageHandler;
71 import org.lateralnz.messaging.Message;
72 import org.lateralnz.messaging.MessageHandler;
73 import org.lateralnz.messaging.util.PacketUtils;
74
75 /**
76  * a messaging service that uses IP multicast to transmit messages
77  *
78  * @author J R Briggs
79  */

80 public class LRMPMessageHandler extends AbstractMessageHandler implements LrmpEventHandler, MessageHandler, Constants, Serializable JavaDoc {
81   private static final Logger log = Logger.getLogger(LRMPMessageHandler.class.getName());
82   
83   Lrmp lrmp;
84   LrmpEntity sender = null;
85   String JavaDoc outfile = null;
86   
87   private HashMap JavaDoc receivedPackets = new HashMap JavaDoc();
88   
89   protected LRMPMessageHandler(String JavaDoc ipaddress, int port) throws Exception JavaDoc {
90     LrmpProfile profile = new LrmpProfile();
91     
92     profile.setEventHandler(this);
93     
94     profile.reliability = LrmpProfile.NoLoss;
95     
96     profile.ordered = true;
97     
98     profile.throughput = LrmpProfile.AdaptedThroughput;
99     
100     profile.minRate = 8;
101     profile.maxRate = 64;
102     
103     profile.sendWindowSize = 64;
104     profile.rcvWindowSize = 64;
105     
106     try {
107       lrmp = new Lrmp(ipaddress, port, 32, profile);
108     }
109     catch (LrmpException e) {
110       System.err.println("Failed to create Lrmp - " + e);
111       System.exit(1);
112     }
113     
114     lrmp.start();
115
116   }
117   
118   public void send(Message msg) throws IOException JavaDoc {
119     if (!willTransmit(msg.getGroup())) {
120       if (log.isDebugEnabled()) {
121         log.debug("transmit disabled for " + msg.getGroup());
122       }
123       return;
124     }
125     else if (log.isDebugEnabled()) {
126       log.debug("sending message " + msg.toString());
127     }
128     
129     try {
130
131       LrmpPacket pack = new LrmpPacket();
132       int offset = pack.getOffset();
133       byte buffer[] = pack.getDataBuffer();
134
135 // buffer[offset] = 0;
136
// offset += headerLen;
137
byte[] packetid = PacketUtils.getPacketID();
138       int headerLen = 1 + packetid.length;
139 // System.arraycopy(packetid, 0, buffer, offset, packetid.length);
140
// pack.setDataLength(packetid.length + headerLen);
141
// lrmp.send(pack);
142

143       ByteArrayInputStream JavaDoc in = new ByteArrayInputStream JavaDoc(ObjectUtils.serialize(msg));
144       int pos = 0;
145       while (in.available() > 0) {
146         pack = new LrmpPacket();
147         offset = pack.getOffset();
148         buffer = pack.getDataBuffer();
149
150         int maxLen = pack.getMaxDataLength() - headerLen;
151
152         int len = in.read(buffer, offset + headerLen, maxLen);
153         pack.setDataLength(len + headerLen);
154
155         System.arraycopy(packetid, 0, buffer, offset+1, packetid.length);
156         if (in.available() > 0) {
157           buffer[offset] = 1;
158         }
159         else {
160           buffer[offset] = 2;
161         }
162
163         lrmp.send(pack);
164       }
165     }
166     catch (IOException JavaDoc ioe) {
167       throw ioe;
168     }
169     catch (Exception JavaDoc e) {
170       e.printStackTrace();
171       throw new IOException JavaDoc(e.getMessage());
172     }
173   }
174   
175   public void processData(LrmpPacket pack) {
176     if (!pack.isReliable()) {
177       log.error("unreliable packet " + pack);
178       return;
179     }
180
181     if (sender == null) {
182       sender = pack.getSource();
183     }
184 // else if (sender.getID() != pack.getSource().getID()) {
185
// log.warn("wrong sender " + pack.getSource());
186
// return;
187
// }
188

189     byte buffer[] = pack.getDataBuffer();
190     int offset = pack.getOffset();
191     int length = pack.getDataLength();
192     int packetid = NumericUtils.toInt(buffer, offset+1, 4);
193     String JavaDoc key = pack.getAddress().getHostAddress() + packetid;
194     
195     // actual data
196
byte[] tmpbuf = new byte[buffer.length - (offset + 5)];
197     System.arraycopy(buffer, offset + 5, tmpbuf, 0, tmpbuf.length);
198     
199     TimestampedObject to;
200     byte[] receivedBuf;
201     if (receivedPackets.containsKey(key)) {
202       to = (TimestampedObject)receivedPackets.get(key);
203       receivedBuf = (byte[])to.obj;
204     }
205     else {
206       receivedBuf = new byte[0];
207       to = new TimestampedObject();
208     }
209     
210     byte[] newbuf = new byte[receivedBuf.length + tmpbuf.length];
211     System.arraycopy(receivedBuf, 0, newbuf, 0, receivedBuf.length);
212     System.arraycopy(tmpbuf, 0, newbuf, receivedBuf.length, tmpbuf.length);
213     to.obj = newbuf;
214     
215     if (buffer[offset] == 0) {
216       receivedPackets.put(key, to);
217     }
218     else {
219       receivedPackets.remove(key);
220       
221       try {
222         Object JavaDoc obj = ObjectUtils.deserialize(newbuf);
223         Message msg = (Message)obj;
224         if (log.isDebugEnabled()) {
225           log.debug("received event with group " + msg.getGroup());
226         }
227         notifyListeners(msg);
228       }
229       catch (Exception JavaDoc e) {
230         e.printStackTrace();
231       }
232     }
233   }
234   
235   public void processEvent(int event, Object JavaDoc obj) {
236     switch (event) {
237       case LrmpEventHandler.UNRECOVERABLE_SEQUENCE_ERROR:
238         LrmpErrorEvent err = (LrmpErrorEvent) obj;
239         if (err.source == sender) {
240           log.error("reception failure");
241           sender = null;
242         }
243         break;
244       case LrmpEventHandler.END_OF_SEQUENCE:
245         LrmpEntity s = (LrmpEntity) obj;
246         if (s == sender) {
247           log.error("sender gone");
248           sender = null;
249         }
250         break;
251       default:
252         break;
253     }
254   }
255 }
Popular Tags