1 10 11 package org.mule.providers.tcp.protocols; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 14 15 import org.mule.providers.tcp.TcpProtocol; 16 17 import java.io.IOException ; 18 import java.io.InputStream ; 19 import java.io.OutputStream ; 20 import java.io.PushbackInputStream ; 21 import java.net.SocketException ; 22 import java.net.SocketTimeoutException ; 23 import java.util.Map ; 24 25 56 public class XmlMessageProtocol implements TcpProtocol 57 { 58 private static String XML_PATTERN = "<?xml"; 59 60 private static int READ_BUFFER_SIZE = 4096; 61 private static int PUSHBACK_BUFFER_SIZE = READ_BUFFER_SIZE * 2; 62 63 private Map pbMap = new ConcurrentHashMap(); 64 65 70 public byte[] read(InputStream is) throws IOException 71 { 72 PushbackInputStream pbis = (PushbackInputStream )pbMap.get(is); 75 if (pbis == null) 76 { 77 pbis = new PushbackInputStream (is, PUSHBACK_BUFFER_SIZE); 78 pbMap.put(is, pbis); 79 } 80 81 byte[] buffer = new byte[READ_BUFFER_SIZE]; 84 int len = 0; 85 86 try 87 { 88 while ((len = pbis.read(buffer)) == 0) 89 { 90 } 92 } 93 catch (SocketException e) 94 { 95 return null; 96 } 97 catch (SocketTimeoutException e) 98 { 99 return null; 100 } 101 finally 102 { 103 if (len <= 0) 104 { 105 pbMap.remove(is); 107 return null; 108 } 109 } 110 111 StringBuffer out = new StringBuffer (READ_BUFFER_SIZE); 112 int patternIndex = -1; 113 114 do 115 { 116 out.append(new String (buffer, 0, len)); 118 119 patternIndex = out.toString().indexOf(XML_PATTERN, 1); 122 if (patternIndex > 0 || len < buffer.length || pbis.available() == 0) 123 { 124 break; 125 } 126 } 127 while ((len = pbis.read(buffer)) > 0); 128 129 if (patternIndex > 0) 130 { 131 pbis.unread(out.substring(patternIndex, out.length()).getBytes()); 134 out.setLength(patternIndex); 135 } 136 137 return out.toString().getBytes(); 138 } 139 140 public void write(OutputStream os, byte[] data) throws IOException 142 { 143 os.write(data); 144 } 145 146 } 147 | Popular Tags |