1 18 package org.apache.activemq.transport.stomp; 19 20 import java.io.DataInput ; 21 import java.io.DataInputStream ; 22 import java.io.DataOutput ; 23 import java.io.DataOutputStream ; 24 import java.io.IOException ; 25 import java.util.HashMap ; 26 import java.util.Iterator ; 27 import java.util.Map ; 28 29 import org.apache.activemq.util.ByteArrayInputStream; 30 import org.apache.activemq.util.ByteArrayOutputStream; 31 import org.apache.activemq.util.ByteSequence; 32 import org.apache.activemq.wireformat.WireFormat; 33 34 37 public class StompWireFormat implements WireFormat { 38 39 private static final byte[] NO_DATA = new byte[]{}; 40 private static final byte[] END_OF_FRAME = new byte[]{0,'\n'}; 41 42 private static final int MAX_COMMAND_LENGTH = 1024; 43 private static final int MAX_HEADER_LENGTH = 1024*10; 44 private static final int MAX_HEADERS = 1000; 45 private static final int MAX_DATA_LENGTH = 1024*1024*100; 46 47 private int version=1; 48 49 public ByteSequence marshal(Object command) throws IOException { 50 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 51 DataOutputStream dos = new DataOutputStream (baos); 52 marshal(command, dos); 53 dos.close(); 54 return baos.toByteSequence(); 55 } 56 57 public Object unmarshal(ByteSequence packet) throws IOException { 58 ByteArrayInputStream stream = new ByteArrayInputStream(packet); 59 DataInputStream dis = new DataInputStream (stream); 60 return unmarshal(dis); 61 } 62 63 public void marshal(Object command, DataOutput os) throws IOException { 64 StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame) command; 65 66 StringBuffer buffer = new StringBuffer (); 67 buffer.append(stomp.getAction()); 68 buffer.append(Stomp.NEWLINE); 69 70 for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) { 72 Map.Entry entry = (Map.Entry ) iter.next(); 73 buffer.append(entry.getKey()); 74 buffer.append(Stomp.Headers.SEPERATOR); 75 buffer.append(entry.getValue()); 76 buffer.append(Stomp.NEWLINE); 77 } 78 79 buffer.append(Stomp.NEWLINE); 81 82 os.write(buffer.toString().getBytes("UTF-8")); 83 os.write(stomp.getContent()); 84 os.write(END_OF_FRAME); 85 } 86 87 88 public Object unmarshal(DataInput in) throws IOException { 89 90 try { 91 String action = null; 92 93 while (true) { 95 action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); 96 if (action == null) { 97 throw new IOException ("connection was closed"); 98 } else { 99 action = action.trim(); 100 if (action.length() > 0) { 101 break; 102 } 103 } 104 } 105 106 HashMap headers = new HashMap (25); 108 while (true) { 109 String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); 110 if (line != null && line.trim().length() > 0) { 111 112 if( headers.size() > MAX_HEADERS ) 113 throw new ProtocolException("The maximum number of headers was exceeded", true); 114 115 try { 116 int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); 117 String name = line.substring(0, seperator_index).trim(); 118 String value = line.substring(seperator_index + 1, line.length()).trim(); 119 headers.put(name, value); 120 } 121 catch (Exception e) { 122 throw new ProtocolException("Unable to parser header line [" + line + "]", true); 123 } 124 } 125 else { 126 break; 127 } 128 } 129 130 byte[] data = NO_DATA; 132 String contentLength = (String )headers.get(Stomp.Headers.CONTENT_LENGTH); 133 if (contentLength!=null) { 134 135 int length; 137 try { 138 length = Integer.parseInt(contentLength.trim()); 139 } catch (NumberFormatException e) { 140 throw new ProtocolException("Specified content-length is not a valid integer", true); 141 } 142 143 if( length > MAX_DATA_LENGTH ) 144 throw new ProtocolException("The maximum data length was exceeded", true); 145 146 data = new byte[length]; 147 in.readFully(data); 148 149 if (in.readByte() != 0) { 150 throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true); 151 } 152 153 } else { 154 155 byte b; 157 ByteArrayOutputStream baos=null; 158 while ((b = in.readByte()) != 0) { 159 160 if( baos == null ) { 161 baos = new ByteArrayOutputStream(); 162 } else if( baos.size() > MAX_DATA_LENGTH ) { 163 throw new ProtocolException("The maximum data length was exceeded", true); 164 } 165 166 baos.write(b); 167 } 168 169 if( baos!=null ) { 170 baos.close(); 171 data = baos.toByteArray(); 172 } 173 174 } 175 176 return new StompFrame(action, headers, data); 177 178 } catch (ProtocolException e) { 179 return new StompFrameError(e); 180 } 181 182 } 183 184 private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException { 185 byte b; 186 ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength); 187 while ((b = in.readByte()) != '\n') { 188 if( baos.size() > maxLength ) 189 throw new ProtocolException(errorMessage, true); 190 baos.write(b); 191 } 192 ByteSequence sequence = baos.toByteSequence(); 193 return new String (sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8"); 194 } 195 196 public int getVersion() { 197 return version; 198 } 199 200 public void setVersion(int version) { 201 this.version = version; 202 } 203 204 } 205 | Popular Tags |