KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > PIGGYBACK


1 // $Id: PIGGYBACK.java,v 1.8 2004/09/23 16:29:42 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.Protocol;
7 import org.jgroups.util.Queue;
8 import org.jgroups.util.QueueClosedException;
9 import org.jgroups.util.Util;
10
11 import java.io.IOException JavaDoc;
12 import java.io.ObjectInput JavaDoc;
13 import java.io.ObjectOutput JavaDoc;
14 import java.util.Properties JavaDoc;
15 import java.util.Vector JavaDoc;
16
17
18 /**
19  * Combines multiple messages into a single large one. As many messages as possible are combined into
20  * one, after a max timeout or when the msg size becomes too big the message is sent. On the receiving
21  * side, the large message is spliced into the smaller ones and delivered.
22  */

23
24 public class PIGGYBACK extends Protocol {
25     long max_wait_time=20; // milliseconds: max. wait between consecutive msgs
26
long max_size=8192; // don't piggyback if created msg would exceed this size (in bytes)
27
final Queue msg_queue=new Queue();
28     Packer packer=null;
29     boolean packing=false;
30     Address local_addr=null;
31
32
33     class Packer implements Runnable JavaDoc {
34         Thread JavaDoc t=null;
35
36
37         public void start() {
38             if(t == null) {
39                 t=new Thread JavaDoc(this, "Packer thread");
40                 t.setDaemon(true);
41                 t.start();
42             }
43         }
44
45         public void stop() {
46             t=null;
47         }
48
49         public void run() {
50             long current_size=0;
51             long start_time, time_to_wait=max_wait_time;
52             Message m, new_msg;
53             Vector JavaDoc msgs;
54
55             while(packer != null) {
56                 try {
57                     m=(Message)msg_queue.remove();
58                     m.setSrc(local_addr);
59                     start_time=System.currentTimeMillis();
60                     current_size=0;
61                     new_msg=new Message();
62                     msgs=new Vector JavaDoc();
63                     msgs.addElement(m);
64                     current_size+=m.size();
65
66                     while(System.currentTimeMillis() - start_time <= max_wait_time &&
67                             current_size <= max_size) {
68
69                         time_to_wait=max_wait_time - (System.currentTimeMillis() - start_time);
70                         if(time_to_wait <= 0)
71                             break;
72
73                         try {
74                             m=(Message)msg_queue.peek(time_to_wait);
75                             m.setSrc(local_addr);
76                         }
77                         catch(TimeoutException timeout) {
78                             break;
79                         }
80                         if(m == null || m.size() + current_size > max_size)
81                             break;
82                         m=(Message)msg_queue.remove();
83                         current_size+=m.size();
84                         msgs.addElement(m);
85                     }
86
87                     try {
88                         new_msg.putHeader(getName(), new PiggybackHeader());
89                         new_msg.setBuffer(Util.objectToByteBuffer(msgs));
90                         passDown(new Event(Event.MSG, new_msg));
91
92                             if(log.isInfoEnabled()) log.info("combined " + msgs.size() +
93                                     " messages of a total size of " + current_size + " bytes");
94                     }
95                     catch(Exception JavaDoc e) {
96                         if(log.isWarnEnabled()) log.warn("exception is " + e);
97                     }
98                 }
99                 catch(QueueClosedException closed) {
100                      if(log.isInfoEnabled()) log.info("packer stopped as queue is closed");
101                     break;
102                 }
103             }
104         }
105     }
106
107
108     /**
109      * All protocol names have to be unique !
110      */

111     public String JavaDoc getName() {
112         return "PIGGYBACK";
113     }
114
115
116     public boolean setProperties(Properties JavaDoc props) {super.setProperties(props);
117         String JavaDoc str;
118
119         str=props.getProperty("max_wait_time");
120         if(str != null) {
121             max_wait_time=Long.parseLong(str);
122             props.remove("max_wait_time");
123         }
124         str=props.getProperty("max_size");
125         if(str != null) {
126             max_size=Long.parseLong(str);
127             props.remove("max_size");
128         }
129
130         if(props.size() > 0) {
131             System.err.println("PIGGYBACK.setProperties(): these properties are not recognized:");
132             props.list(System.out);
133             return false;
134         }
135         return true;
136     }
137
138
139     public void start() throws Exception JavaDoc {
140         startPacker();
141     }
142
143     public void stop() {
144         packing=false;
145         msg_queue.close(true); // flush pending messages, this should also stop the packer ...
146
stopPacker(); // ... but for safety reasons, we stop it here again
147
}
148
149
150     public void up(Event evt) {
151         Message msg;
152         Object JavaDoc obj;
153         Vector JavaDoc messages;
154
155         switch(evt.getType()) {
156
157             case Event.SET_LOCAL_ADDRESS:
158                 local_addr=(Address)evt.getArg();
159                 break;
160
161             case Event.MSG:
162                 msg=(Message)evt.getArg();
163                 obj=msg.getHeader(getName());
164                 if(obj == null || !(obj instanceof PiggybackHeader))
165                     break;
166
167                 msg.removeHeader(getName());
168                 try {
169                     messages=(Vector JavaDoc)msg.getObject();
170                      if(log.isInfoEnabled()) log.info("unpacking " + messages.size() + " messages");
171                     for(int i=0; i < messages.size(); i++)
172                         passUp(new Event(Event.MSG, messages.elementAt(i)));
173                 }
174                 catch(Exception JavaDoc e) {
175                     if(log.isWarnEnabled()) log.warn("piggyback message does not contain a vector of " +
176                             "piggybacked messages, discarding message ! Exception is " + e);
177                     return;
178                 }
179
180                 return; // don't pass up !
181
}
182
183         passUp(evt); // Pass up to the layer above us
184
}
185
186
187     public void down(Event evt) {
188         Message msg;
189
190         switch(evt.getType()) {
191
192             case Event.MSG:
193                 msg=(Message)evt.getArg();
194
195                 if(msg.getDest() != null && !msg.getDest().isMulticastAddress())
196                     break; // unicast message, handle as usual
197

198                 if(!packing)
199                     break; // pass down as usual; we haven't started yet
200

201                 try {
202                     msg_queue.add(msg);
203                 }
204                 catch(QueueClosedException closed) {
205                     break; // pass down regularly
206
}
207                 return;
208         }
209
210         passDown(evt); // Pass on to the layer below us
211
}
212
213
214     void startPacker() {
215         if(packer == null) {
216             packing=true;
217             packer=new Packer();
218             packer.start();
219         }
220     }
221
222
223     void stopPacker() {
224         if(packer != null) {
225             packer.stop();
226             packing=false;
227             msg_queue.close(false);
228             packer=null;
229         }
230     }
231
232
233     public static class PiggybackHeader extends Header {
234
235         public PiggybackHeader() {
236         }
237
238         public String JavaDoc toString() {
239             return "[PIGGYBACK: <variables> ]";
240         }
241
242         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
243         }
244
245
246         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
247         }
248
249     }
250
251
252 }
253
Popular Tags