1 18 package org.apache.activemq.transport.xmpp; 19 20 import ietf.params.xml.ns.xmpp_sasl.Auth; 21 import ietf.params.xml.ns.xmpp_sasl.Challenge; 22 import ietf.params.xml.ns.xmpp_sasl.Success; 23 import ietf.params.xml.ns.xmpp_tls.Proceed; 24 import ietf.params.xml.ns.xmpp_tls.Starttls; 25 import jabber.client.Body; 26 import jabber.client.Error; 27 import jabber.client.Iq; 28 import jabber.client.Message; 29 import jabber.client.Presence; 30 import jabber.iq.auth.Query; 31 import org.apache.activemq.advisory.AdvisorySupport; 32 import org.apache.activemq.command.*; 33 import org.apache.activemq.transport.xmpp.command.Handler; 34 import org.apache.activemq.transport.xmpp.command.HandlerRegistry; 35 import org.apache.activemq.util.IdGenerator; 36 import org.apache.activemq.util.IntSequenceGenerator; 37 import org.apache.activemq.util.LongSequenceGenerator; 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 import org.jabber.protocol.disco_info.Feature; 41 import org.jabber.protocol.disco_info.Identity; 42 import org.jabber.protocol.disco_items.Item; 43 import org.jabber.protocol.muc_user.X; 44 import org.w3c.dom.Element ; 45 46 import javax.jms.JMSException ; 47 import java.io.IOException ; 48 import java.io.PrintWriter ; 49 import java.io.StringWriter ; 50 import java.util.HashMap ; 51 import java.util.List ; 52 import java.util.Map ; 53 import java.util.concurrent.ConcurrentHashMap ; 54 import java.util.concurrent.atomic.AtomicBoolean ; 55 56 59 public class ProtocolConverter { 60 private static final transient Log log = LogFactory.getLog(ProtocolConverter.class); 61 62 private HandlerRegistry registry = new HandlerRegistry(); 63 private XmppTransport transport; 64 65 66 private static final IdGenerator connectionIdGenerator = new IdGenerator(); 67 private static final IdGenerator clientIdGenerator = new IdGenerator("xmpp"); 68 private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); 69 private final SessionId sessionId = new SessionId(connectionId, -1); 70 private final ProducerId producerId = new ProducerId(sessionId, 1); 71 72 private final ConnectionInfo connectionInfo = new ConnectionInfo(connectionId); 73 private final SessionInfo sessionInfo = new SessionInfo(sessionId); 74 private final ProducerInfo producerInfo = new ProducerInfo(producerId); 75 76 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 77 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 78 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); 79 private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator(); 80 81 private final Map <Integer , Handler<Response>> resposeHandlers = new ConcurrentHashMap <Integer , Handler<Response>>(); 82 private final Map <ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap <ConsumerId, Handler<MessageDispatch>>(); 83 private final Map <String , ConsumerInfo> jidToConsumerMap = new HashMap <String , ConsumerInfo>(); 84 private final Map <String , ConsumerInfo> jidToInboxConsumerMap = new HashMap <String , ConsumerInfo>(); 85 86 private final Map transactions = new ConcurrentHashMap (); 87 88 private final Object commnadIdMutex = new Object (); 89 private int lastCommandId; 90 private final AtomicBoolean connected = new AtomicBoolean (false); 91 private ActiveMQTempQueue inboxDestination; 92 93 public ProtocolConverter(XmppTransport transport) { 94 this.transport = transport; 95 initialiseRegistry(); 96 } 97 98 protected int generateCommandId() { 99 synchronized (commnadIdMutex) { 100 return lastCommandId++; 101 } 102 } 103 104 protected void initialiseRegistry() { 105 registry.registerHandler(Message.class, new Handler<Message>() { 107 public void handle(Message event) throws Exception { 108 onMessage(event); 109 } 110 }); 111 registry.registerHandler(Auth.class, new Handler<Auth>() { 112 public void handle(Auth event) throws Exception { 113 onAuth(event); 114 } 115 }); 116 registry.registerHandler(Starttls.class, new Handler<Starttls>() { 117 public void handle(Starttls event) throws Exception { 118 onStarttls(event); 119 } 120 }); 121 registry.registerHandler(Iq.class, new Handler<Iq>() { 122 public void handle(Iq event) throws Exception { 123 onIq(event); 124 } 125 }); 126 registry.registerHandler(Presence.class, new Handler<Presence>() { 127 public void handle(Presence event) throws Exception { 128 onPresence(event); 129 } 130 }); 131 } 132 133 public void onXmppCommand(Object command) throws Exception { 134 137 Handler handler = registry.getHandler(command.getClass()); 138 if (handler == null) { 139 unknownCommand(command); 140 } 141 else { 142 handler.handle(command); 143 } 144 } 145 146 public void onActiveMQCommad(Command command) throws Exception { 147 if (command.isResponse()) { 148 Response response = (Response) command; 149 Handler<Response> handler = resposeHandlers.remove(new Integer (response.getCorrelationId())); 150 if (handler != null) { 151 handler.handle(response); 152 } 153 else { 154 log.warn("No handler for response: " + response); 155 } 156 } 157 else if (command.isMessageDispatch()) { 158 MessageDispatch md = (MessageDispatch) command; 159 Handler<MessageDispatch> handler = subscriptionsByConsumerId.get(md.getConsumerId()); 160 if (handler != null) { 161 handler.handle(md); 162 } 163 else { 164 log.warn("No handler for message: " + md); 165 } 166 } 167 } 168 169 protected void unknownCommand(Object command) throws Exception { 170 log.warn("Unkown command: " + command + " of type: " + command.getClass().getName()); 171 } 172 173 protected void onIq(final Iq iq) throws Exception { 174 Object any = iq.getAny(); 175 176 if (any instanceof Query) { 177 onAuthQuery(any, iq); 178 179 } 180 else if (any instanceof jabber.iq._private.Query) { 181 jabber.iq._private.Query query = (jabber.iq._private.Query) any; 182 183 if (log.isDebugEnabled()) { 184 log.debug("Iq Private " + debugString(iq) + " any: " + query.getAny()); 185 } 186 187 Iq result = createResult(iq); 188 jabber.iq._private.Query answer = new jabber.iq._private.Query(); 189 result.setAny(answer); 190 transport.marshall(result); 191 } 192 else if (any instanceof jabber.iq.roster.Query) { 193 jabber.iq.roster.Query query = (jabber.iq.roster.Query) any; 194 195 if (log.isDebugEnabled()) { 196 log.debug("Iq Roster " + debugString(iq) + " item: " + query.getItem()); 197 } 198 199 Iq result = createResult(iq); 200 jabber.iq.roster.Query roster = new jabber.iq.roster.Query(); 201 result.setAny(roster); 202 transport.marshall(result); 203 } 204 else if (any instanceof org.jabber.protocol.disco_items.Query) { 205 onDiscoItems(iq, (org.jabber.protocol.disco_items.Query) any); 206 } 207 else if (any instanceof org.jabber.protocol.disco_info.Query) { 208 onDiscoInfo(iq, (org.jabber.protocol.disco_info.Query) any); 209 } 210 else { 211 if (any instanceof Element ) { 212 Element element = (Element ) any; 213 log.warn("Iq Unknown " + debugString(iq) + " element namespace: " + element.getNamespaceURI() + " localName: " + element.getLocalName()); 214 } 215 else { 216 log.warn("Iq Unknown " + debugString(iq) + " any: " + any + " of type: " + any.getClass().getName()); 217 } 218 Iq result = createResult(iq); 219 jabber.client.Error error = new Error (); 220 error.setUnexpectedRequest("Don't understand: " + any.toString()); 221 result.setAny(error); 222 transport.marshall(result); 223 } 224 } 225 226 protected void onAuthQuery(Object any, final Iq iq) throws IOException { 227 Query query = (Query) any; 228 if (log.isDebugEnabled()) { 229 log.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername()); 230 } 231 if (query.getPassword() == null) { 232 Iq result = createResult(iq); 233 Query required = new Query(); 234 required.setPassword(""); 235 required.setUsername(""); 236 result.setAny(required); 237 transport.marshall(result); 238 return; 239 } 240 241 connectionInfo.setUserName(query.getUsername()); 243 connectionInfo.setPassword(query.getPassword()); 244 245 247 if (connectionInfo.getClientId() == null) { 248 connectionInfo.setClientId(clientIdGenerator.generateId()); 249 } 250 251 sendToActiveMQ(connectionInfo, new Handler<Response>() { 252 public void handle(Response response) throws Exception { 253 254 Iq result = createResult(iq); 255 256 if (response instanceof ExceptionResponse) { 257 ExceptionResponse exceptionResponse = (ExceptionResponse) response; 258 Throwable exception = exceptionResponse.getException(); 259 260 log.warn("Failed to create connection: " + exception, exception); 261 262 Error error = new Error (); 263 result.setError(error); 264 265 StringWriter buffer = new StringWriter (); 266 exception.printStackTrace(new PrintWriter (buffer)); 267 error.setInternalServerError(buffer.toString()); 268 } 269 else { 270 connected.set(true); 271 } 272 transport.marshall(result); 273 274 sendToActiveMQ(sessionInfo, createErrorHandler("create sesssion")); 275 sendToActiveMQ(producerInfo, createErrorHandler("create producer")); 276 } 277 }); 278 } 279 280 protected String debugString(Iq iq) { 281 return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId(); 282 } 283 284 protected void onDiscoItems(Iq iq, org.jabber.protocol.disco_items.Query query) throws IOException { 285 String to = iq.getTo(); 286 287 if (log.isDebugEnabled()) { 288 log.debug("Iq Disco Items query " + debugString(iq) + " node: " + query.getNode() + " item: " + query.getItem()); 289 } 290 291 Iq result = createResult(iq); 292 org.jabber.protocol.disco_items.Query answer = new org.jabber.protocol.disco_items.Query(); 293 if (to == null || to.length() == 0) { 294 answer.getItem().add(createItem("queues", "Queues", "queues")); 295 answer.getItem().add(createItem("topics", "Topics", "topics")); 296 } 297 else { 298 } 300 301 result.setAny(answer); 302 transport.marshall(result); 303 } 304 305 protected void onDiscoInfo(Iq iq, org.jabber.protocol.disco_info.Query query) throws IOException { 306 String to = iq.getTo(); 307 308 310 if (log.isDebugEnabled()) { 311 log.debug("Iq Disco Info query " + debugString(iq) + " node: " + query.getNode() + " features: " + query.getFeature() + " identity: " + query.getIdentity()); 312 } 313 314 Iq result = createResult(iq); 315 org.jabber.protocol.disco_info.Query answer = new org.jabber.protocol.disco_info.Query(); 316 answer.setNode(to); 317 answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#info")); 318 answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#items")); 319 if (to == null || to.length() == 0) { 320 answer.getIdentity().add(createIdentity("directory", "chatroom", "queues")); 321 answer.getIdentity().add(createIdentity("directory", "chatroom", "topics")); 322 326 } 327 else { 328 if (to.equals("queues")) { 330 answer.getIdentity().add(createIdentity("conference", "queue.a", "text")); 331 answer.getIdentity().add(createIdentity("conference", "queue.b", "text")); 332 } 333 else if (to.equals("topics")) { 334 answer.getIdentity().add(createIdentity("conference", "topic.x", "text")); 335 answer.getIdentity().add(createIdentity("conference", "topic.y", "text")); 336 answer.getIdentity().add(createIdentity("conference", "topic.z", "text")); 337 } 338 else { 339 answer.getIdentity().add(createIdentity("conference", to, "text")); 341 answer.getFeature().add(createFeature("http://jabber.org/protocol/muc")); 342 answer.getFeature().add(createFeature("muc-open")); 343 } 344 } 345 346 result.setAny(answer); 347 transport.marshall(result); 348 } 349 350 protected void onPresence(Presence presence) throws IOException , JMSException { 351 if (log.isDebugEnabled()) { 352 log.debug("Presence: " + presence.getFrom() + " id: " + presence.getId() + " to: " + presence.getTo() + " type: " + presence.getType() 353 + " showOrStatusOrPriority: " + presence.getShowOrStatusOrPriority() + " any: " + presence.getAny()); 354 } 355 org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item(); 356 item.setAffiliation("owner"); 357 item.setRole("moderator"); 358 item.setNick("broker"); 359 sendPresence(presence, item); 360 361 367 368 final String to = presence.getTo(); 370 371 ActiveMQDestination destination = createActiveMQDestination(to); 372 if (destination == null) { 373 log.debug("No 'to' attribute specified for presence so not creating a JMS subscription"); 374 return; 375 } 376 subscribe(to, destination, jidToConsumerMap); 377 378 380 if (inboxDestination == null) { 382 inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 383 384 DestinationInfo info = new DestinationInfo(); 385 info.setConnectionId(connectionInfo.getConnectionId()); 386 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 387 info.setDestination(inboxDestination); 388 sendToActiveMQ(info, null); 389 390 subscribe(to, inboxDestination, jidToInboxConsumerMap); 391 } 392 } 393 394 protected void subscribe(final String to, ActiveMQDestination destination, Map <String , ConsumerInfo> consumerMap) { 395 boolean createConsumer = false; 396 ConsumerInfo consumerInfo = null; 397 synchronized (consumerMap) { 398 consumerInfo = consumerMap.get(to); 399 if (consumerInfo == null) { 400 consumerInfo = new ConsumerInfo(); 401 consumerMap.put(to, consumerInfo); 402 403 ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 404 consumerInfo.setConsumerId(consumerId); 405 consumerInfo.setPrefetchSize(10); 406 consumerInfo.setNoLocal(true); 407 createConsumer = true; 408 } 409 } 410 if (!createConsumer) { 411 return; 412 } 413 414 consumerInfo.setDestination(destination); 415 416 subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() { 417 public void handle(MessageDispatch messageDispatch) throws Exception { 418 if (log.isDebugEnabled()) { 420 log.debug("Receiving inbound: " + messageDispatch.getMessage()); 421 } 422 423 MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1); 425 sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId())); 426 427 Message message = createXmppMessage(to, messageDispatch); 428 if (message != null) { 429 if (log.isDebugEnabled()) { 430 log.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny()); 431 } 432 transport.marshall(message); 433 } 434 } 435 }); 436 sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination)); 437 } 438 439 protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws JMSException { 440 Message answer = new Message(); 441 answer.setType("groupchat"); 442 String from = to; 443 int idx = from.indexOf('/'); 444 if (idx > 0) { 445 from = from.substring(0, idx) + "/broker"; 446 } 447 answer.setFrom(from); 448 answer.setTo(to); 449 450 org.apache.activemq.command.Message message = messageDispatch.getMessage(); 451 if (message instanceof ActiveMQTextMessage) { 453 ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message; 454 Body body = new Body(); 455 String text = activeMQTextMessage.getText(); 456 log.info("Setting the body text to be: " + text); 457 body.setValue(text); 458 answer.getAny().add(body); 459 } 460 else { 461 log.warn("Could not convert the message to a complete Jabber message: " + message); 463 } 464 return answer; 465 } 466 467 protected void sendPresence(Presence presence, org.jabber.protocol.muc_user.Item item) throws IOException { 468 Presence answer = new Presence(); 469 answer.setFrom(presence.getTo()); 470 answer.setType(presence.getType()); 471 answer.setTo(presence.getFrom()); 472 X x = new X(); 473 x.getDeclineOrDestroyOrInvite().add(item); 474 answer.getShowOrStatusOrPriority().add(x); 475 transport.marshall(answer); 476 } 477 478 479 protected Item createItem(String jid, String name, String node) { 480 Item answer = new Item(); 481 answer.setJid(jid); 482 answer.setName(name); 483 answer.setNode(node); 484 return answer; 485 } 486 487 protected Identity createIdentity(String category, String type, String name) { 488 Identity answer = new Identity(); 489 answer.setCategory(category); 490 answer.setName(name); 491 answer.setType(type); 492 return answer; 493 } 494 495 protected Feature createFeature(String var) { 496 Feature feature = new Feature(); 497 feature.setVar(var); 498 return feature; 499 } 500 501 504 protected Iq createResult(Iq iq) { 505 Iq result = new Iq(); 506 result.setId(iq.getId()); 507 result.setFrom(transport.getFrom()); 508 result.setTo(iq.getFrom()); 509 result.setLang(iq.getLang()); 510 result.setType("result"); 511 return result; 512 } 513 514 protected void sendToActiveMQ(Command command, Handler<Response> handler) { 515 command.setCommandId(generateCommandId()); 516 if (handler != null) { 517 command.setResponseRequired(true); 518 resposeHandlers.put(command.getCommandId(), handler); 519 } 520 transport.getTransportListener().onCommand(command); 521 } 522 523 524 protected void onStarttls(Starttls starttls) throws Exception { 525 log.debug("Starttls"); 526 transport.marshall(new Proceed()); 527 } 528 529 protected void onMessage(Message message) throws Exception { 530 if (log.isDebugEnabled()) { 531 log.debug("Message from: " + message.getFrom() + " to: " + message.getTo() + " subjectOrBodyOrThread: " + message.getSubjectOrBodyOrThread()); 532 } 533 534 final ActiveMQMessage activeMQMessage = createActiveMQMessage(message); 535 536 ActiveMQDestination destination = createActiveMQDestination(message.getTo()); 537 538 activeMQMessage.setMessageId(new MessageId(producerInfo, messageIdGenerator.getNextSequenceId())); 539 activeMQMessage.setDestination(destination); 540 activeMQMessage.setProducerId(producerId); 541 activeMQMessage.setTimestamp(System.currentTimeMillis()); 542 addActiveMQMessageHeaders(activeMQMessage, message); 543 544 549 550 if (log.isDebugEnabled()) { 551 log.debug("Sending ActiveMQ message: " + activeMQMessage); 552 } 553 sendToActiveMQ(activeMQMessage, createErrorHandler("send message")); 554 } 555 556 protected Handler<Response> createErrorHandler(final String text) { 557 return new Handler<Response>() { 558 public void handle(Response event) throws Exception { 559 if (event instanceof ExceptionResponse) { 560 ExceptionResponse exceptionResponse = (ExceptionResponse) event; 561 Throwable exception = exceptionResponse.getException(); 562 log.error("Failed to " + text + ". Reason: " + exception, exception); 563 } 564 else if (log.isDebugEnabled()) { 565 log.debug("Completed " + text); 566 } 567 } 568 }; 569 } 570 571 572 575 protected ActiveMQDestination createActiveMQDestination(String jabberDestination) throws JMSException { 576 if (jabberDestination == null) { 577 return null; 578 } 579 String name = jabberDestination; 580 int idx = jabberDestination.indexOf('@'); 581 if (idx > 0) { 582 name = name.substring(0, idx); 583 } 584 585 System.out.println("#### Creating ActiveMQ destination for: " + name); 586 587 if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) { 589 name = AdvisorySupport.AGENT_TOPIC; 590 } 591 return new ActiveMQTopic(name); 592 } 593 594 protected ActiveMQMessage createActiveMQMessage(Message message) throws JMSException { 595 ActiveMQTextMessage answer = new ActiveMQTextMessage(); 596 String text = ""; 597 List <Object > list = message.getSubjectOrBodyOrThread(); 598 for (Object object : list) { 599 if (object instanceof Body) { 600 Body body = (Body) object; 601 text = body.getValue(); 602 break; 603 } 604 } 605 answer.setText(text); 606 return answer; 607 } 608 609 protected void addActiveMQMessageHeaders(ActiveMQMessage answer, Message message) throws JMSException { 610 answer.setStringProperty("XMPPFrom", message.getFrom()); 611 answer.setStringProperty("XMPPID", message.getId()); 612 answer.setStringProperty("XMPPLang", message.getLang()); 613 answer.setStringProperty("XMPPTo", message.getTo()); 614 answer.setJMSType(message.getType()); 615 ActiveMQDestination replyTo = createActiveMQDestination(message.getFrom()); 616 if (replyTo == null) { 617 replyTo = inboxDestination; 618 } 619 System.out.println("Setting reply to destination to: " + replyTo); 620 answer.setJMSReplyTo(replyTo); 621 } 622 623 protected void onAuth(Auth auth) throws Exception { 624 if (log.isDebugEnabled()) { 625 log.debug("Auth mechanism: " + auth.getMechanism() + " value: " + auth.getValue()); 626 } 627 String value = createChallengeValue(auth); 628 if (value != null) { 629 Challenge challenge = new Challenge(); 630 challenge.setValue(value); 631 transport.marshall(challenge); 632 } 633 else { 634 transport.marshall(new Success()); 635 } 636 } 637 638 protected String createChallengeValue(Auth auth) { 639 return null; 641 } 642 643 } 644 | Popular Tags |