1 46 package org.mr.api.simple; 47 48 import org.mr.api.jms.MantaQueueConnectionFactory; 49 50 66 public class Consumer { 67 private String queueName; 68 private javax.jms.MessageConsumer consumer; 69 70 Consumer(String queueName) throws SimpleException { 71 this.queueName = queueName; 72 73 javax.jms.QueueConnectionFactory factory; 74 javax.jms.QueueConnection connection; 75 javax.jms.QueueSession session; 76 javax.jms.Queue queue; 77 78 try { 79 factory = new MantaQueueConnectionFactory(); 80 connection = factory.createQueueConnection(); 81 session = connection.createQueueSession(false, javax.jms.Session. 82 AUTO_ACKNOWLEDGE); 83 queue = session.createQueue(queueName); 84 this.consumer = session.createConsumer(queue); 85 connection.start(); 86 } catch (javax.jms.JMSException e) { 87 throw new SimpleException(e); 88 } 89 } 91 96 public Message receive() throws SimpleException { 97 if (this.consumer == null) { 98 throw new SimpleException(SimpleException.ERROR_CLOSED_ACTOR, 99 "consumer for queue " + this.queueName + 100 " is closed"); 101 } 102 javax.jms.Message jmsMessage = null; 103 Message message = null; 104 try { 105 jmsMessage = this.consumer.receive(); 106 if (jmsMessage != null) { 107 message = new Message((javax.jms.BytesMessage ) jmsMessage); 108 } 109 } catch (javax.jms.JMSException e) { 110 throw new SimpleException(e); 111 } 112 113 return message; 114 } 115 116 122 public void close() throws SimpleException { 123 if (this.consumer == null) { 124 throw new SimpleException(SimpleException.ERROR_CLOSED_ACTOR, 125 "consumer for queue " + this.queueName + 126 " is already closed"); 127 } 128 129 try { 130 this.consumer.close(); 131 this.consumer = null; 132 } catch (javax.jms.JMSException e) { 133 throw new SimpleException(e); 134 } 135 } 136 139 public String getQueueName() { 140 return queueName; 141 } 142 } | Popular Tags |