KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > tcp > protocols > XmlMessageProtocol


1 /*
2  * $Id: XmlMessageProtocol.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.tcp.protocols;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
14
15 import org.mule.providers.tcp.TcpProtocol;
16
17 import java.io.IOException JavaDoc;
18 import java.io.InputStream JavaDoc;
19 import java.io.OutputStream JavaDoc;
20 import java.io.PushbackInputStream JavaDoc;
21 import java.net.SocketException JavaDoc;
22 import java.net.SocketTimeoutException JavaDoc;
23 import java.util.Map JavaDoc;
24
25 /**
26  * <p>
27  * The XmlMessageProtocol is an application level tcp protocol that can be used to
28  * read streaming xml documents. The only requirement is that each document include
29  * an xml declaration at the beginning of the document of the form "<?xml...". In
30  * section 2.8, the xml 1.0 standard contains "Definition: XML documents
31  * <strong>SHOULD</strong> begin with an XML declaration which specifies the version
32  * of XML being used" while the xml 1.1 standard contains "Definition: XML 1.1
33  * documents <strong>MUST</strong> begin with an XML declaration which specifies the
34  * version of XML being used". The SHOULD indicates a recommendation that, if not
35  * followed, needs to be carefully checked for unintended consequences. MUST
36  * indicates a mandatory requirement for a well-formed document. Please make sure
37  * that the xml documents being streamed begin with an xml declaration when using
38  * this class.
39  * </p>
40  * <p>
41  * Also, the default character encoding for the platform is used to decode the
42  * message bytes when looking for the XML declaration. Some caution with message
43  * character encodings is warranted.
44  * </p>
45  * <p>
46  * Finally, this class uses a PushbackInputStream to enable parsing of individual
47  * messages. The stream stores any pushed-back bytes into it's own internal buffer
48  * and not the original stream. Therefore, the read buffer size is intentionally
49  * limited to insure that unread characters remain on the stream so that all data may
50  * be read later.
51  * </p>
52  *
53  * @author <a HREF="mailto:rlucente@xecu.net">Rich Lucente</a>
54  * @version $Revision: 3798 $
55  */

56 public class XmlMessageProtocol implements TcpProtocol
57 {
58     private static String JavaDoc XML_PATTERN = "<?xml";
59
60     private static int READ_BUFFER_SIZE = 4096;
61     private static int PUSHBACK_BUFFER_SIZE = READ_BUFFER_SIZE * 2;
62
63     private Map pbMap = new ConcurrentHashMap();
64
65     /**
66      * Adapted from DefaultProtocol
67      *
68      * @see DefaultProtocol#read(java.io.InputStream)
69      */

70     public byte[] read(InputStream JavaDoc is) throws IOException JavaDoc
71     {
72         // look for existing pushback wrapper for the given stream
73
// if not found, create a wrapper
74
PushbackInputStream JavaDoc pbis = (PushbackInputStream JavaDoc)pbMap.get(is);
75         if (pbis == null)
76         {
77             pbis = new PushbackInputStream JavaDoc(is, PUSHBACK_BUFFER_SIZE);
78             pbMap.put(is, pbis);
79         }
80
81         // read until xml pattern is seen (and then pushed back) or no more data
82
// to read. return all data as message
83
byte[] buffer = new byte[READ_BUFFER_SIZE];
84         int len = 0;
85
86         try
87         {
88             while ((len = pbis.read(buffer)) == 0)
89             {
90                 // feed me!
91
}
92         }
93         catch (SocketException JavaDoc e)
94         {
95             return null;
96         }
97         catch (SocketTimeoutException JavaDoc e)
98         {
99             return null;
100         }
101         finally
102         {
103             if (len <= 0)
104             {
105                 // remove exhausted stream
106
pbMap.remove(is);
107                 return null;
108             }
109         }
110
111         StringBuffer JavaDoc out = new StringBuffer JavaDoc(READ_BUFFER_SIZE);
112         int patternIndex = -1;
113
114         do
115         {
116             // TODO take encoding into account, ideally from the incoming XML
117
out.append(new String JavaDoc(buffer, 0, len));
118
119             // start search at 2nd character in buffer (index=1) to
120
// indicate whether we have reached a new document.
121
patternIndex = out.toString().indexOf(XML_PATTERN, 1);
122             if (patternIndex > 0 || len < buffer.length || pbis.available() == 0)
123             {
124                 break;
125             }
126         }
127         while ((len = pbis.read(buffer)) > 0);
128
129         if (patternIndex > 0)
130         {
131             // push back the start of the next message and
132
// ignore the pushed-back characters in the return buffer
133
pbis.unread(out.substring(patternIndex, out.length()).getBytes());
134             out.setLength(patternIndex);
135         }
136
137         return out.toString().getBytes();
138     }
139
140     // simply write the data (which SHOULD begin with an XML declaration!)
141
public void write(OutputStream JavaDoc os, byte[] data) throws IOException JavaDoc
142     {
143         os.write(data);
144     }
145
146 }
147
Popular Tags