1 18 package org.apache.activemq.transport.xmpp; 19 20 import ietf.params.xml.ns.xmpp_sasl.Mechanisms; 21 import org.apache.activemq.command.BrokerInfo; 22 import org.apache.activemq.command.Command; 23 import org.apache.activemq.transport.tcp.TcpBufferedInputStream; 24 import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; 25 import org.apache.activemq.transport.tcp.TcpTransport; 26 import org.apache.activemq.util.IOExceptionSupport; 27 import org.apache.activemq.util.ServiceStopper; 28 import org.apache.activemq.wireformat.WireFormat; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.jabber.etherx.streams.Features; 32 33 import javax.net.SocketFactory; 34 import javax.xml.bind.JAXBContext; 35 import javax.xml.bind.JAXBException; 36 import javax.xml.bind.Marshaller; 37 import javax.xml.bind.Unmarshaller; 38 import javax.xml.namespace.QName ; 39 import javax.xml.stream.Location; 40 import javax.xml.stream.XMLEventReader; 41 import javax.xml.stream.XMLInputFactory; 42 import javax.xml.stream.XMLOutputFactory; 43 import javax.xml.stream.XMLReporter; 44 import javax.xml.stream.XMLStreamException; 45 import javax.xml.stream.XMLStreamWriter; 46 import javax.xml.stream.events.Attribute; 47 import javax.xml.stream.events.StartElement; 48 import javax.xml.stream.events.XMLEvent; 49 import java.io.IOException ; 50 import java.io.InputStream ; 51 import java.io.OutputStream ; 52 import java.net.Socket ; 53 import java.net.URI ; 54 55 58 public class XmppTransport extends TcpTransport { 59 protected static final QName ATTRIBUTE_TO = new QName ("to"); 60 61 private static final transient Log log = LogFactory.getLog(XmppTransport.class); 62 63 private JAXBContext context; 64 private XMLEventReader xmlReader; 65 private Unmarshaller unmarshaller; 66 private Marshaller marshaller; 67 private XMLStreamWriter xmlWriter; 68 private String to = "client"; 69 protected OutputStream outputStream; 70 protected InputStream inputStream; 71 private ProtocolConverter converter; 72 private String from = "localhost"; 73 private String brokerId = "broker-id-1"; 74 75 public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException { 76 super(wireFormat, socket); 77 init(); 78 } 79 80 public XmppTransport(WireFormat wireFormat, SocketFactory socketFactory, URI uri, URI uri1) throws IOException { 81 super(wireFormat, socketFactory, uri, uri1); 82 init(); 83 } 84 85 private void init() { 86 converter = new ProtocolConverter(this); 87 } 88 89 90 @Override 91 public void oneway(Object object) throws IOException { 92 if (object instanceof Command) { 93 Command command = (Command) object; 94 95 if (command instanceof BrokerInfo) { 96 BrokerInfo brokerInfo = (BrokerInfo) command; 97 98 brokerId = brokerInfo.getBrokerId().toString(); 99 from = brokerInfo.getBrokerName(); 100 try { 101 writeOpenStream(brokerId, from); 102 } 103 catch (XMLStreamException e) { 104 throw IOExceptionSupport.create(e); 105 } 106 } 107 else { 108 try { 109 converter.onActiveMQCommad(command); 110 } 111 catch (IOException e) { 112 throw e; 113 } 114 catch (Exception e) { 115 throw IOExceptionSupport.create(e); 116 } 117 } 118 } 119 else { 120 log.warn("Unkown command: " + object); 121 } 122 } 123 124 125 128 public void marshall(Object command) throws IOException { 129 if (isStopped() || isStopping()) { 130 log.warn("Not marshalling command as shutting down: " + command); 131 return; 132 } 133 try { 134 marshaller.marshal(command, xmlWriter); 135 xmlWriter.flush(); 136 outputStream.flush(); 137 } 138 catch (JAXBException e) { 139 throw IOExceptionSupport.create(e); 140 } 141 catch (XMLStreamException e) { 142 throw IOExceptionSupport.create(e); 143 } 144 } 145 146 @Override 147 public void run() { 148 log.debug("XMPP consumer thread starting"); 149 150 try { 151 XMLInputFactory xif = XMLInputFactory.newInstance(); 152 xif.setXMLReporter(new XMLReporter() { 153 public void report(String message, String errorType, Object relatedInformation, Location location) throws XMLStreamException { 154 log.warn(message + " errorType: " + errorType + " relatedInfo: " + relatedInformation); 155 } 156 }); 157 158 xmlReader = xif.createXMLEventReader(inputStream); 159 160 XMLEvent docStart = xmlReader.nextEvent(); 161 162 XMLEvent rootElement = xmlReader.nextTag(); 163 164 if (rootElement instanceof StartElement) { 165 StartElement startElement = (StartElement) rootElement; 166 Attribute toAttribute = startElement.getAttributeByName(ATTRIBUTE_TO); 167 if (toAttribute != null) { 168 to = toAttribute.getValue(); 169 } 170 } 171 while (true) { 172 if (isStopped()) { 173 break; 174 } 175 176 XMLEvent event = xmlReader.peek(); 177 if (event.isStartElement()) { 178 Object object = unmarshaller.unmarshal(xmlReader); 180 if (object != null) { 181 converter.onXmppCommand(object); 182 } 183 } 184 else { 185 if (event.getEventType() == XMLEvent.END_ELEMENT) { 186 break; 187 } 188 else 189 if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) { 190 break; 191 } 192 else { 193 xmlReader.nextEvent(); 194 } 195 196 } 197 } 198 } 199 catch (XMLStreamException e) { 200 log.error("XMPP Reader thread caught: " + e, e); 201 } 202 catch (Exception e) { 203 log.error("XMPP Reader thread caught: " + e, e); 204 } 205 try { 206 stop(); 207 } 208 catch (Exception e) { 209 log.error("Failed to stop XMPP transport: " + e, e); 210 } 211 } 212 213 214 public String getFrom() { 215 return from; 216 } 217 218 @Override 219 protected void doStop(ServiceStopper stopper) throws Exception { 220 if (xmlWriter != null) { 221 try { 222 xmlWriter.writeEndElement(); 223 xmlWriter.writeEndDocument(); 224 xmlWriter.close(); 225 } 226 catch (XMLStreamException e) { 227 log.info("Caught trying to close transport: " + e, e); 229 } 230 } 231 if (xmlReader != null) { 232 try { 233 xmlReader.close(); 234 } 235 catch (XMLStreamException e) { 236 log.info("Caught trying to close transport: " + e, e); 238 } 239 } 240 super.doStop(stopper); 241 } 242 243 @Override 244 protected void initializeStreams() throws Exception { 245 context = JAXBContext.newInstance("jabber.client" 247 260 + ":jabber.iq._private" 261 + ":jabber.iq.auth" 262 + ":jabber.iq.roster" 263 + ":org.jabber.etherx.streams" 264 + ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items" 265 + ":org.jabber.protocol.muc" 266 + ":org.jabber.protocol.muc_user" 267 + ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas" 268 + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls" 269 ); 270 271 inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024); 272 outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024); 273 274 unmarshaller = context.createUnmarshaller(); 275 marshaller = context.createMarshaller(); 276 marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true); 277 } 278 279 protected void writeOpenStream(String id, String from) throws IOException , XMLStreamException { 280 log.debug("Sending initial stream element"); 281 XMLOutputFactory factory = XMLOutputFactory.newInstance(); 282 xmlWriter = factory.createXMLStreamWriter(outputStream); 284 285 xmlWriter.writeStartDocument(); 287 xmlWriter.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams"); 288 xmlWriter.writeDefaultNamespace("jabber:client"); 289 xmlWriter.writeNamespace("stream", "http://etherx.jabber.org/streams"); 290 xmlWriter.writeAttribute("version", "1.0"); 291 xmlWriter.writeAttribute("id", id); 292 if (to == null) { 293 to = "client"; 294 } 295 xmlWriter.writeAttribute("to", to); 296 xmlWriter.writeAttribute("from", from); 297 298 299 Features features = new Features(); 301 302 305 Mechanisms mechanisms = new Mechanisms(); 306 307 features.getAny().add(mechanisms); 311 marshall(features); 312 313 log.debug("Initial stream element sent!"); 314 } 315 316 } 317 | Popular Tags |