KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > xmpp > XmppTransport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.xmpp;
19
20 import ietf.params.xml.ns.xmpp_sasl.Mechanisms;
21 import org.apache.activemq.command.BrokerInfo;
22 import org.apache.activemq.command.Command;
23 import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
24 import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
25 import org.apache.activemq.transport.tcp.TcpTransport;
26 import org.apache.activemq.util.IOExceptionSupport;
27 import org.apache.activemq.util.ServiceStopper;
28 import org.apache.activemq.wireformat.WireFormat;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.jabber.etherx.streams.Features;
32
33 import javax.net.SocketFactory;
34 import javax.xml.bind.JAXBContext;
35 import javax.xml.bind.JAXBException;
36 import javax.xml.bind.Marshaller;
37 import javax.xml.bind.Unmarshaller;
38 import javax.xml.namespace.QName JavaDoc;
39 import javax.xml.stream.Location;
40 import javax.xml.stream.XMLEventReader;
41 import javax.xml.stream.XMLInputFactory;
42 import javax.xml.stream.XMLOutputFactory;
43 import javax.xml.stream.XMLReporter;
44 import javax.xml.stream.XMLStreamException;
45 import javax.xml.stream.XMLStreamWriter;
46 import javax.xml.stream.events.Attribute;
47 import javax.xml.stream.events.StartElement;
48 import javax.xml.stream.events.XMLEvent;
49 import java.io.IOException JavaDoc;
50 import java.io.InputStream JavaDoc;
51 import java.io.OutputStream JavaDoc;
52 import java.net.Socket JavaDoc;
53 import java.net.URI JavaDoc;
54
55 /**
56  * @version $Revision: 468026 $
57  */

58 public class XmppTransport extends TcpTransport {
59     protected static final QName JavaDoc ATTRIBUTE_TO = new QName JavaDoc("to");
60
61     private static final transient Log log = LogFactory.getLog(XmppTransport.class);
62
63     private JAXBContext context;
64     private XMLEventReader xmlReader;
65     private Unmarshaller unmarshaller;
66     private Marshaller marshaller;
67     private XMLStreamWriter xmlWriter;
68     private String JavaDoc to = "client";
69     protected OutputStream outputStream;
70     protected InputStream inputStream;
71     private ProtocolConverter converter;
72     private String JavaDoc from = "localhost";
73     private String JavaDoc brokerId = "broker-id-1";
74
75     public XmppTransport(WireFormat wireFormat, Socket JavaDoc socket) throws IOException JavaDoc {
76         super(wireFormat, socket);
77         init();
78     }
79
80     public XmppTransport(WireFormat wireFormat, SocketFactory socketFactory, URI JavaDoc uri, URI JavaDoc uri1) throws IOException JavaDoc {
81         super(wireFormat, socketFactory, uri, uri1);
82         init();
83     }
84
85     private void init() {
86         converter = new ProtocolConverter(this);
87     }
88
89
90     @Override JavaDoc
91     public void oneway(Object JavaDoc object) throws IOException JavaDoc {
92         if (object instanceof Command) {
93             Command command = (Command) object;
94
95             if (command instanceof BrokerInfo) {
96                 BrokerInfo brokerInfo = (BrokerInfo) command;
97
98                 brokerId = brokerInfo.getBrokerId().toString();
99                 from = brokerInfo.getBrokerName();
100                 try {
101                     writeOpenStream(brokerId, from);
102                 }
103                 catch (XMLStreamException e) {
104                     throw IOExceptionSupport.create(e);
105                 }
106             }
107             else {
108                 try {
109                     converter.onActiveMQCommad(command);
110                 }
111                 catch (IOException JavaDoc e) {
112                     throw e;
113                 }
114                 catch (Exception JavaDoc e) {
115                     throw IOExceptionSupport.create(e);
116                 }
117             }
118         }
119         else {
120             log.warn("Unkown command: " + object);
121         }
122     }
123
124
125     /**
126      * Marshalls the given POJO to the client
127      */

128     public void marshall(Object JavaDoc command) throws IOException JavaDoc {
129         if (isStopped() || isStopping()) {
130             log.warn("Not marshalling command as shutting down: " + command);
131             return;
132         }
133         try {
134             marshaller.marshal(command, xmlWriter);
135             xmlWriter.flush();
136             outputStream.flush();
137         }
138         catch (JAXBException e) {
139             throw IOExceptionSupport.create(e);
140         }
141         catch (XMLStreamException e) {
142             throw IOExceptionSupport.create(e);
143         }
144     }
145
146     @Override JavaDoc
147     public void run() {
148         log.debug("XMPP consumer thread starting");
149
150         try {
151             XMLInputFactory xif = XMLInputFactory.newInstance();
152             xif.setXMLReporter(new XMLReporter() {
153                 public void report(String JavaDoc message, String JavaDoc errorType, Object JavaDoc relatedInformation, Location location) throws XMLStreamException {
154                     log.warn(message + " errorType: " + errorType + " relatedInfo: " + relatedInformation);
155                 }
156             });
157
158             xmlReader = xif.createXMLEventReader(inputStream);
159
160             XMLEvent docStart = xmlReader.nextEvent();
161
162             XMLEvent rootElement = xmlReader.nextTag();
163
164             if (rootElement instanceof StartElement) {
165                 StartElement startElement = (StartElement) rootElement;
166                 Attribute toAttribute = startElement.getAttributeByName(ATTRIBUTE_TO);
167                 if (toAttribute != null) {
168                     to = toAttribute.getValue();
169                 }
170             }
171             while (true) {
172                 if (isStopped()) {
173                     break;
174                 }
175
176                 XMLEvent event = xmlReader.peek();
177                 if (event.isStartElement()) {
178                     // unmarshal a new object
179
Object JavaDoc object = unmarshaller.unmarshal(xmlReader);
180                     if (object != null) {
181                         converter.onXmppCommand(object);
182                     }
183                 }
184                 else {
185                     if (event.getEventType() == XMLEvent.END_ELEMENT) {
186                         break;
187                     }
188                     else
189                     if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
190                         break;
191                     }
192                     else {
193                         xmlReader.nextEvent();
194                     }
195
196                 }
197             }
198         }
199         catch (XMLStreamException e) {
200             log.error("XMPP Reader thread caught: " + e, e);
201         }
202         catch (Exception JavaDoc e) {
203             log.error("XMPP Reader thread caught: " + e, e);
204         }
205         try {
206             stop();
207         }
208         catch (Exception JavaDoc e) {
209             log.error("Failed to stop XMPP transport: " + e, e);
210         }
211     }
212
213
214     public String JavaDoc getFrom() {
215         return from;
216     }
217
218     @Override JavaDoc
219     protected void doStop(ServiceStopper stopper) throws Exception JavaDoc {
220         if (xmlWriter != null) {
221             try {
222                 xmlWriter.writeEndElement();
223                 xmlWriter.writeEndDocument();
224                 xmlWriter.close();
225             }
226             catch (XMLStreamException e) {
227                 // the client may have closed first so ignore this
228
log.info("Caught trying to close transport: " + e, e);
229             }
230         }
231         if (xmlReader != null) {
232             try {
233                 xmlReader.close();
234             }
235             catch (XMLStreamException e) {
236                 // the client may have closed first so ignore this
237
log.info("Caught trying to close transport: " + e, e);
238             }
239         }
240         super.doStop(stopper);
241     }
242
243     @Override JavaDoc
244     protected void initializeStreams() throws Exception JavaDoc {
245         // TODO it would be preferable to use class discovery here!
246
context = JAXBContext.newInstance("jabber.client"
247                 /*
248                 + ":jabber.server"
249                 + ":jabber.iq.gateway"
250                 + ":jabber.iq.last"
251                 + ":jabber.iq.oob"
252                 + ":jabber.iq.pass"
253                 + ":jabber.iq.time"
254                 + ":jabber.iq.version"
255                 + ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
256                 + ":org.jabber.protocol.amp" + ":org.jabber.protocol.amp_errors"
257                 + ":org.jabber.protocol.muc_admin"
258                 + ":org.jabber.protocol.muc_unique"
259                 */

260                 + ":jabber.iq._private"
261                 + ":jabber.iq.auth"
262                 + ":jabber.iq.roster"
263                 + ":org.jabber.etherx.streams"
264                 + ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
265                 + ":org.jabber.protocol.muc"
266                 + ":org.jabber.protocol.muc_user"
267                 + ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
268                 + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls"
269         );
270
271         inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
272         outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
273
274         unmarshaller = context.createUnmarshaller();
275         marshaller = context.createMarshaller();
276         marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
277     }
278
279     protected void writeOpenStream(String JavaDoc id, String JavaDoc from) throws IOException JavaDoc, XMLStreamException {
280         log.debug("Sending initial stream element");
281         XMLOutputFactory factory = XMLOutputFactory.newInstance();
282         //factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
283
xmlWriter = factory.createXMLStreamWriter(outputStream);
284
285         // write the dummy start tag
286
xmlWriter.writeStartDocument();
287         xmlWriter.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams");
288         xmlWriter.writeDefaultNamespace("jabber:client");
289         xmlWriter.writeNamespace("stream", "http://etherx.jabber.org/streams");
290         xmlWriter.writeAttribute("version", "1.0");
291         xmlWriter.writeAttribute("id", id);
292         if (to == null) {
293             to = "client";
294         }
295         xmlWriter.writeAttribute("to", to);
296         xmlWriter.writeAttribute("from", from);
297
298
299         // now lets write the features
300
Features features = new Features();
301
302         // TODO support TLS
303
//features.getAny().add(new Starttls());
304

305         Mechanisms mechanisms = new Mechanisms();
306
307         // TODO support SASL
308
//mechanisms.getMechanism().add("DIGEST-MD5");
309
//mechanisms.getMechanism().add("PLAIN");
310
features.getAny().add(mechanisms);
311         marshall(features);
312
313         log.debug("Initial stream element sent!");
314     }
315
316 }
317
Popular Tags