1 package org.lateralnz.messaging.util; 2 3 import java.io.Serializable ; 4 import java.net.DatagramPacket ; 5 import java.net.InetAddress ; 6 import java.util.HashMap ; 7 import java.util.Map ; 8 import java.util.Iterator ; 9 10 import org.apache.log4j.Logger; 11 12 import org.lateralnz.common.wrapper.IntHolder; 13 import org.lateralnz.common.util.Constants; 14 import org.lateralnz.common.util.NumericUtils; 15 import org.lateralnz.common.util.ObjectUtils; 16 17 public class PacketUtils implements Constants { 18 private static final Logger log = Logger.getLogger(PacketUtils.class.getName()); 19 private static final int THIRTY_MINUTES = 1800000; 20 private static final int TEN_MINUTES = 600000; 21 private static IntHolder MSG_ID = new IntHolder(0); private static HashMap receivedPackets = new HashMap (); 24 private static long time = System.currentTimeMillis(); 25 26 private PacketUtils() { 27 } 28 29 public static final void dumpOldData() { 30 long time2 = System.currentTimeMillis() - time; 31 if (time2 > THIRTY_MINUTES) { 33 if (log.isInfoEnabled()) { 34 log.info("packet dump processing"); 35 } 36 time = System.currentTimeMillis(); 38 Iterator iter = ((HashMap )receivedPackets.clone()).keySet().iterator(); 39 PacketDataWrapper dw; 40 while (iter.hasNext()) { 41 String tmpkey = (String )iter.next(); 42 dw = (PacketDataWrapper)receivedPackets.get(tmpkey); 43 if ((System.currentTimeMillis() - dw.getTimeCreated()) > TEN_MINUTES) { 45 if (log.isInfoEnabled()) { 46 log.info("dumping packet " + tmpkey); 47 } 48 iter.remove(); 49 } 50 } 51 } 52 } 53 54 public static final byte[] getPacketID() { 55 int id; 56 synchronized (MSG_ID) { 57 if (MSG_ID.value >= Integer.MAX_VALUE) { 58 MSG_ID.value = 0; 59 } 60 MSG_ID.value++; 61 id = MSG_ID.value; 62 } 63 return NumericUtils.toByteArray(id, false); 64 } 65 66 public static Object reconstitute(DatagramPacket packet, int receiveBufSize) throws Exception { 67 String addr = packet.getAddress().getHostAddress(); 68 int id = NumericUtils.toInt(packet.getData(), 0, 4); 69 int max = NumericUtils.toInt(packet.getData(), 4, 4); 70 int ord = NumericUtils.toInt(packet.getData(), 8, 4); 71 72 String key = addr + UNDERSCORE + id; 73 PacketDataWrapper dw; 76 if (receivedPackets.containsKey(key)) { 77 dw = (PacketDataWrapper)receivedPackets.get(key); 78 } 79 else { 80 dw = new PacketDataWrapper(max); 81 receivedPackets.put(key, dw); 82 } 83 byte[] tmp = new byte[packet.getData().length - 12]; 84 System.arraycopy(packet.getData(), 12, tmp, 0, tmp.length); 85 dw.set(ord, tmp); 86 87 if (dw.count >= max) { 90 receivedPackets.remove(key); 91 92 return ObjectUtils.deserialize(dw.get()); 93 } 94 else { 95 return null; 96 } 97 } 98 99 public static final DatagramPacket [] split(Serializable obj, InetAddress addr, int port, int maximumBufferSize) throws Exception { 100 byte[] data = ObjectUtils.serialize(obj); 101 byte[] bid = getPacketID(); 103 104 105 int packNum = (int)Math.ceil((float)data.length / (float)maximumBufferSize); 107 byte[] pnum = NumericUtils.toByteArray(packNum, false); 108 109 int size = (data.length / maximumBufferSize) + (data.length % maximumBufferSize > 0 ? 1 : 0); 110 DatagramPacket [] packets = new DatagramPacket [size]; 111 for (int k = 0, j = 0; j < data.length; j+= maximumBufferSize, k++) { 113 byte[] sendbuf = new byte[maximumBufferSize + 12]; 114 System.arraycopy(bid, 0, sendbuf, 0, 4); 115 System.arraycopy(pnum, 0, sendbuf, 4, 4); 116 117 int len = maximumBufferSize; 118 if (data.length - j < maximumBufferSize) { 119 len = data.length - j; 120 } 121 System.arraycopy(data, j, sendbuf, 12, len); 123 124 System.arraycopy(NumericUtils.toByteArray(k, false), 0, sendbuf, 8, 4); 126 127 if (addr == null) { 129 packets[k] = new DatagramPacket (sendbuf, len+12); 130 } 131 else { 132 packets[k] = new DatagramPacket (sendbuf, len+12, addr, port); 133 } 134 } 135 136 return packets; 137 } 138 139 } 140
| Popular Tags
|