1 package org.apache.activemq.transport.stomp; 2 3 import org.apache.activemq.command.ActiveMQMessage; 4 import org.apache.activemq.command.ActiveMQBytesMessage; 5 import org.apache.activemq.command.ActiveMQTextMessage; 6 import org.apache.activemq.command.ActiveMQDestination; 7 8 import javax.jms.JMSException ; 9 import javax.jms.Destination ; 10 import java.util.Map ; 11 import java.util.HashMap ; 12 import java.io.IOException ; 13 14 17 public class LegacyFrameTranslator implements FrameTranslator 18 { 19 public ActiveMQMessage convertFrame(StompFrame command) throws JMSException , ProtocolException { 20 final Map headers = command.getHeaders(); 21 final ActiveMQMessage msg; 22 if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { 23 headers.remove(Stomp.Headers.CONTENT_LENGTH); 24 ActiveMQBytesMessage bm = new ActiveMQBytesMessage(); 25 bm.writeBytes(command.getContent()); 26 msg = bm; 27 } else { 28 ActiveMQTextMessage text = new ActiveMQTextMessage(); 29 try { 30 text.setText(new String (command.getContent(), "UTF-8")); 31 } 32 catch (Throwable e) { 33 throw new ProtocolException("Text could not bet set: " + e, false, e); 34 } 35 msg = text; 36 } 37 FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(command, msg, this); 38 return msg; 39 } 40 41 public StompFrame convertMessage(ActiveMQMessage message) throws IOException , JMSException { 42 StompFrame command = new StompFrame(); 43 command.setAction(Stomp.Responses.MESSAGE); 44 Map headers = new HashMap (25); 45 command.setHeaders(headers); 46 47 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(message, command, this); 48 49 if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) { 50 51 ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy(); 52 command.setContent(msg.getText().getBytes("UTF-8")); 53 54 } else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) { 55 56 ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy(); 57 msg.setReadOnlyBody(true); 58 byte[] data = new byte[(int)msg.getBodyLength()]; 59 msg.readBytes(data); 60 61 headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length); 62 command.setContent(data); 63 } 64 return command; 65 } 66 67 public String convertDestination(Destination d) { 68 if (d == null) { 69 return null; 70 } 71 ActiveMQDestination amq_d = (ActiveMQDestination) d; 72 String p_name = amq_d.getPhysicalName(); 73 74 StringBuffer buffer = new StringBuffer (); 75 if (amq_d.isQueue()) { 76 if (amq_d.isTemporary()) { 77 buffer.append("/temp-queue/"); 78 } 79 else { 80 buffer.append("/queue/"); 81 } 82 } 83 else { 84 if (amq_d.isTemporary()) { 85 buffer.append("/temp-topic/"); 86 } 87 else { 88 buffer.append("/topic/"); 89 } 90 } 91 buffer.append(p_name); 92 return buffer.toString(); 93 } 94 95 public ActiveMQDestination convertDestination(String name) throws ProtocolException { 96 if (name == null) { 97 return null; 98 } 99 else if (name.startsWith("/queue/")) { 100 String q_name = name.substring("/queue/".length(), name.length()); 101 return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE); 102 } 103 else if (name.startsWith("/topic/")) { 104 String t_name = name.substring("/topic/".length(), name.length()); 105 return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE); 106 } 107 else if (name.startsWith("/temp-queue/")) { 108 String t_name = name.substring("/temp-queue/".length(), name.length()); 109 return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TEMP_QUEUE_TYPE); 110 } 111 else if (name.startsWith("/temp-topic/")) { 112 String t_name = name.substring("/temp-topic/".length(), name.length()); 113 return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TEMP_TOPIC_TYPE); 114 } 115 else { 116 throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + 117 "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); 118 } 119 } 120 } 121 | Popular Tags |