1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.Protocol; 7 import org.jgroups.util.Queue; 8 import org.jgroups.util.QueueClosedException; 9 import org.jgroups.util.Util; 10 11 import java.io.IOException ; 12 import java.io.ObjectInput ; 13 import java.io.ObjectOutput ; 14 import java.util.Properties ; 15 import java.util.Vector ; 16 17 18 23 24 public class PIGGYBACK extends Protocol { 25 long max_wait_time=20; long max_size=8192; final Queue msg_queue=new Queue(); 28 Packer packer=null; 29 boolean packing=false; 30 Address local_addr=null; 31 32 33 class Packer implements Runnable { 34 Thread t=null; 35 36 37 public void start() { 38 if(t == null) { 39 t=new Thread (this, "Packer thread"); 40 t.setDaemon(true); 41 t.start(); 42 } 43 } 44 45 public void stop() { 46 t=null; 47 } 48 49 public void run() { 50 long current_size=0; 51 long start_time, time_to_wait=max_wait_time; 52 Message m, new_msg; 53 Vector msgs; 54 55 while(packer != null) { 56 try { 57 m=(Message)msg_queue.remove(); 58 m.setSrc(local_addr); 59 start_time=System.currentTimeMillis(); 60 current_size=0; 61 new_msg=new Message(); 62 msgs=new Vector (); 63 msgs.addElement(m); 64 current_size+=m.size(); 65 66 while(System.currentTimeMillis() - start_time <= max_wait_time && 67 current_size <= max_size) { 68 69 time_to_wait=max_wait_time - (System.currentTimeMillis() - start_time); 70 if(time_to_wait <= 0) 71 break; 72 73 try { 74 m=(Message)msg_queue.peek(time_to_wait); 75 m.setSrc(local_addr); 76 } 77 catch(TimeoutException timeout) { 78 break; 79 } 80 if(m == null || m.size() + current_size > max_size) 81 break; 82 m=(Message)msg_queue.remove(); 83 current_size+=m.size(); 84 msgs.addElement(m); 85 } 86 87 try { 88 new_msg.putHeader(getName(), new PiggybackHeader()); 89 new_msg.setBuffer(Util.objectToByteBuffer(msgs)); 90 passDown(new Event(Event.MSG, new_msg)); 91 92 if(log.isInfoEnabled()) log.info("combined " + msgs.size() + 93 " messages of a total size of " + current_size + " bytes"); 94 } 95 catch(Exception e) { 96 if(log.isWarnEnabled()) log.warn("exception is " + e); 97 } 98 } 99 catch(QueueClosedException closed) { 100 if(log.isInfoEnabled()) log.info("packer stopped as queue is closed"); 101 break; 102 } 103 } 104 } 105 } 106 107 108 111 public String getName() { 112 return "PIGGYBACK"; 113 } 114 115 116 public boolean setProperties(Properties props) {super.setProperties(props); 117 String str; 118 119 str=props.getProperty("max_wait_time"); 120 if(str != null) { 121 max_wait_time=Long.parseLong(str); 122 props.remove("max_wait_time"); 123 } 124 str=props.getProperty("max_size"); 125 if(str != null) { 126 max_size=Long.parseLong(str); 127 props.remove("max_size"); 128 } 129 130 if(props.size() > 0) { 131 System.err.println("PIGGYBACK.setProperties(): these properties are not recognized:"); 132 props.list(System.out); 133 return false; 134 } 135 return true; 136 } 137 138 139 public void start() throws Exception { 140 startPacker(); 141 } 142 143 public void stop() { 144 packing=false; 145 msg_queue.close(true); stopPacker(); } 148 149 150 public void up(Event evt) { 151 Message msg; 152 Object obj; 153 Vector messages; 154 155 switch(evt.getType()) { 156 157 case Event.SET_LOCAL_ADDRESS: 158 local_addr=(Address)evt.getArg(); 159 break; 160 161 case Event.MSG: 162 msg=(Message)evt.getArg(); 163 obj=msg.getHeader(getName()); 164 if(obj == null || !(obj instanceof PiggybackHeader)) 165 break; 166 167 msg.removeHeader(getName()); 168 try { 169 messages=(Vector )msg.getObject(); 170 if(log.isInfoEnabled()) log.info("unpacking " + messages.size() + " messages"); 171 for(int i=0; i < messages.size(); i++) 172 passUp(new Event(Event.MSG, messages.elementAt(i))); 173 } 174 catch(Exception e) { 175 if(log.isWarnEnabled()) log.warn("piggyback message does not contain a vector of " + 176 "piggybacked messages, discarding message ! Exception is " + e); 177 return; 178 } 179 180 return; } 182 183 passUp(evt); } 185 186 187 public void down(Event evt) { 188 Message msg; 189 190 switch(evt.getType()) { 191 192 case Event.MSG: 193 msg=(Message)evt.getArg(); 194 195 if(msg.getDest() != null && !msg.getDest().isMulticastAddress()) 196 break; 198 if(!packing) 199 break; 201 try { 202 msg_queue.add(msg); 203 } 204 catch(QueueClosedException closed) { 205 break; } 207 return; 208 } 209 210 passDown(evt); } 212 213 214 void startPacker() { 215 if(packer == null) { 216 packing=true; 217 packer=new Packer(); 218 packer.start(); 219 } 220 } 221 222 223 void stopPacker() { 224 if(packer != null) { 225 packer.stop(); 226 packing=false; 227 msg_queue.close(false); 228 packer=null; 229 } 230 } 231 232 233 public static class PiggybackHeader extends Header { 234 235 public PiggybackHeader() { 236 } 237 238 public String toString() { 239 return "[PIGGYBACK: <variables> ]"; 240 } 241 242 public void writeExternal(ObjectOutput out) throws IOException { 243 } 244 245 246 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 247 } 248 249 } 250 251 252 } 253 | Popular Tags |