1 29 30 package com.caucho.jms.amq; 31 32 import com.caucho.jms.JMSExceptionWrapper; 33 import com.caucho.jms.session.MessageProducerImpl; 34 import com.caucho.jms.session.SessionImpl; 35 import com.caucho.util.L10N; 36 37 import javax.jms.Destination ; 38 import javax.jms.JMSException ; 39 import javax.jms.Message ; 40 import javax.jms.MessageProducer ; 41 import javax.jms.TextMessage ; 42 import java.io.ByteArrayInputStream ; 43 import java.io.IOException ; 44 import java.io.InputStream ; 45 46 49 public class AmqProducer extends MessageProducerImpl 50 { 51 private static final L10N L = new L10N(MessageProducer .class); 52 53 protected AmqQueue _queue; 54 private AmqClientChannel _channel; 55 56 public AmqProducer(SessionImpl session, AmqQueue queue) 57 { 58 super(session, queue); 59 60 _queue = queue; 61 } 62 63 72 public void send(Destination destination, 73 Message message, 74 int deliveryMode, 75 int priority, 76 long timeToLive) 77 throws JMSException 78 { 79 83 84 try { 85 AmqClientChannel channel = getChannel(); 86 87 TextMessage msg = (TextMessage ) message; 88 89 byte []data = msg.getText().getBytes(); 90 91 channel.publish(data.length, new ByteArrayInputStream (data)); 92 } catch (IOException e) { 93 throw new JMSExceptionWrapper(e); 94 } 95 } 96 97 private InputStream messageToInputStream(Message message) 98 { 99 return null; 100 } 101 102 private AmqClientChannel getChannel() 103 throws IOException 104 { 105 if (_channel == null) { 106 _channel = _queue.openChannel(); 107 } 108 109 return _channel; 110 } 111 112 115 public void close() 116 throws JMSException 117 { 118 AmqChannel channel = _channel; 119 _channel = null; 120 121 if (channel != null) 122 channel.close(); 123 } 124 } 125 126 | Popular Tags |