KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > lateralnz > messaging > broadcast > BroadcastMessageHandler


1 /* ====================================================================
2  * The LateralNZ Software License, Version 1.0
3  *
4  * Copyright (c) 2003 LateralNZ. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  *
13  * 2. Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in
15  * the documentation and/or other materials provided with the
16  * distribution.
17  *
18  * 3. The end-user documentation included with the redistribution,
19  * if any, must include the following acknowledgment:
20  * "This product includes software developed by
21  * LateralNZ (http://www.lateralnz.org/) and other third parties."
22  * Alternately, this acknowledgment may appear in the software itself,
23  * if and wherever such third-party acknowledgments normally appear.
24  *
25  * 4. The names "LateralNZ" must not be used to endorse or promote
26  * products derived from this software without prior written
27  * permission. For written permission, please
28  * contact oss@lateralnz.org.
29  *
30  * 5. Products derived from this software may not be called "Panther",
31  * or "Lateral" or "LateralNZ", nor may "PANTHER" or "LATERAL" or
32  * "LATERALNZ" appear in their name, without prior written
33  * permission of LateralNZ.
34  *
35  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
36  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
37  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
38  * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
39  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
40  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
41  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
42  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
43  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
44  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
45  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
46  * SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of LateralNZ. For more
51  * information on Lateral, please see http://www.lateralnz.com/ or
52  * http://www.lateralnz.org
53  *
54  */

55 package org.lateralnz.messaging.broadcast;
56
57 import java.io.IOException JavaDoc;
58 import java.io.Serializable JavaDoc;
59 import java.net.DatagramPacket JavaDoc;
60 import java.net.InetAddress JavaDoc;
61 import java.net.DatagramSocket JavaDoc;
62 import java.util.Arrays JavaDoc;
63
64 import org.apache.log4j.Logger;
65
66 import org.lateralnz.common.util.Constants;
67 import org.lateralnz.common.util.SystemUtils;
68 import org.lateralnz.common.wrapper.IntHolder;
69 import org.lateralnz.messaging.AbstractMessageHandler;
70 import org.lateralnz.messaging.Message;
71 import org.lateralnz.messaging.MessageHandler;
72 import org.lateralnz.messaging.util.PacketUtils;
73
74 /**
75  * a messaging service that uses IP multicast to transmit messages
76  *
77  * @author J R Briggs
78  */

79 public class BroadcastMessageHandler extends AbstractMessageHandler implements Runnable JavaDoc, MessageHandler, Constants, Serializable JavaDoc {
80   private static final Logger log = Logger.getLogger(BroadcastMessageHandler.class.getName());
81   private static final int HEADER_SIZE = 12; // size of the head (ID[3], MAX[3], ORDER[3])
82
private static final int SPACER_SIZE = 20; // a spacer (just in case)
83

84   private boolean running = true;
85   
86   private byte[] localhost2 = SystemUtils.getLocalhostIP();
87   private byte[] localhost1 = { (byte)127, (byte)0, (byte)0, (byte)1 };
88   protected DatagramSocket JavaDoc sock; // the socket to uses for comms
89
private int maxbuf; // calculated max buffer size
90
private int receiveBufSize; // the receive buffer size for the socket
91
private IntHolder msgID = new IntHolder(0); // ID number to use with fragmented packets
92
protected Thread JavaDoc listenerThread;
93    
94   protected BroadcastMessageHandler(String JavaDoc ipaddress, int port) throws Exception JavaDoc {
95     this(ipaddress, port, new DatagramSocket JavaDoc(port, InetAddress.getByName(ipaddress)), false);
96     sock.setBroadcast(true);
97     
98     listenerThread.start();
99   }
100   
101   protected BroadcastMessageHandler(String JavaDoc ipaddress, int port, DatagramSocket JavaDoc sock) throws Exception JavaDoc {
102     this(ipaddress, port, sock, true);
103   }
104   
105   protected BroadcastMessageHandler(String JavaDoc ipaddress, int port, DatagramSocket JavaDoc sock, boolean start) throws Exception JavaDoc {
106     setAddress(InetAddress.getByName(ipaddress), port);
107     this.sock = sock;
108     
109     maxbuf = sock.getSendBufferSize() - (HEADER_SIZE + SPACER_SIZE);
110     receiveBufSize = sock.getReceiveBufferSize();
111
112     listenerThread = new Thread JavaDoc(this);
113     
114     if (start) {
115       listenerThread.start();
116     }
117   }
118   
119   private final boolean isLocal(InetAddress JavaDoc addr) {
120     byte[] b = addr.getAddress();
121     return (Arrays.equals(b, localhost1) || Arrays.equals(b, localhost2));
122   }
123     
124  /**
125   * packets are received during the run loop
126   */

127   public void run() {
128     if (log.isDebugEnabled()) {
129       log.debug("packet listener running");
130     }
131     byte[] buf = new byte[receiveBufSize];
132     DatagramPacket JavaDoc packet = new DatagramPacket JavaDoc(buf, buf.length);
133     long time = System.currentTimeMillis();
134     
135     loop: while (running) {
136       try {
137         // get the next packet
138
sock.receive(packet);
139         
140         if (log.isDebugEnabled()) {
141           log.debug("received packet");
142         }
143
144         if (!isLocal(packet.getAddress())) {
145           Object JavaDoc obj = PacketUtils.reconstitute(packet, receiveBufSize);
146           if (obj != null) {
147             Message msg = (Message)obj;
148             if (log.isDebugEnabled()) {
149               log.debug("received event with group " + msg.getGroup());
150             }
151
152             notifyListeners(msg);
153           }
154         }
155         // reset the packet
156
packet.setLength(buf.length);
157       }
158       catch (Exception JavaDoc e) {
159         e.printStackTrace();
160       }
161       
162       PacketUtils.dumpOldData();
163       try {
164         Thread.sleep(10);
165         Thread.yield();
166       }
167       catch (InterruptedException JavaDoc ie) { }
168     }
169   }
170   
171  /**
172   * send a message (serialized, fragmented if necessary, then broadcast)
173   */

174   public void send(Message msg) throws IOException JavaDoc {
175     if (!willTransmit(msg.getGroup())) {
176       return;
177     }
178     else if (log.isDebugEnabled()) {
179       log.debug("sending message " + msg.toString());
180     }
181     
182     try {
183       DatagramPacket JavaDoc[] packets = PacketUtils.split(msg, addr, port, maxbuf);
184       for (int i = 0; i < packets.length; i++) {
185         if (log.isDebugEnabled()) {
186           log.debug("sending packet : " + (i + 1) + " of " + packets.length);
187         }
188         sock.send(packets[i]);
189       }
190     }
191     catch (Exception JavaDoc e) {
192       log.error(e);
193     }
194   }
195 }
Popular Tags