1 20 package sample.jms.queues.requestReply; 21 22 import java.util.Random ; 23 24 import javax.jms.Connection ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Queue ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 import javax.jms.Topic ; 33 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 import org.mr.api.jms.MantaConnectionFactory; 37 import org.mr.core.util.ExceptionMonitor; 38 import org.mr.core.util.SystemTime; 39 40 50 public class QueueReply implements javax.jms.MessageListener { 51 final String QUEUE = "queue"; 52 final String TOPIC = "topic"; 53 String destType, destName; 54 MantaConnectionFactory factory; 55 Queue queue; 56 Topic topic; 57 Connection prodConn, consConn; 58 Session prodSess, consSess; 59 MessageProducer producer; 60 MessageConsumer consumer; 61 private static Log log; 62 private Random random; 63 64 69 public QueueReply(String destinationName, String destinationType){ 70 factory = new MantaConnectionFactory(); 71 try { 72 if(destinationType.equalsIgnoreCase("queue")){ 73 destType=QUEUE; 74 }else{ 75 if(destinationType.equalsIgnoreCase("topic")){ 76 destType=TOPIC; 77 }else{ 78 try { 79 ExceptionMonitor.getInstance().shout(24, "There is no destination type "+destType); 80 } catch (JMSException e) { 81 e.printStackTrace(); 82 } 83 } } this.destName = destinationName; 86 prodConn = factory.createConnection(); 87 prodSess = prodConn.createSession(false, Session.AUTO_ACKNOWLEDGE); 88 consConn = factory.createConnection(); 89 consSess = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE); 90 if (destType.equals(QUEUE)){ 91 queue = prodSess.createQueue(destName); 92 producer = prodSess.createProducer(null); 93 consumer = consSess.createConsumer(queue); 94 }else{ 95 topic = prodSess.createTopic(destName); 96 producer = prodSess.createProducer(null); 97 consumer = consSess.createConsumer(topic); 98 } 99 consumer.setMessageListener(this); 100 prodConn.start(); 101 consConn.start(); 102 103 } catch (JMSException e) { 104 e.printStackTrace(); 105 } 106 random = new Random (SystemTime.currentTimeMillis()); 107 } 109 112 public void onMessage(Message message) 113 { 114 try { 115 String text; 116 int i= random.nextInt(4); 117 switch (i) { 118 case 0 : text="i agree."; 119 break; 120 case 1 : text="yes, this is true."; 121 break; 122 case 2 : text="have a nice day."; 123 break; 124 case 3 : text="interesting, "+((TextMessage )message).getText(); 125 break; 126 default : text ="yep."; 127 } 128 TextMessage newMessage; 129 newMessage = prodSess.createTextMessage(); 130 newMessage.setText(text); 131 newMessage.setJMSCorrelationID(message.getJMSCorrelationID()); 132 producer.send(message.getJMSReplyTo(),newMessage); 133 } catch (JMSException e) { 134 e.printStackTrace(); 135 } 136 } 138 public static void main(String args[]) 139 { 140 141 if( args.length != 1 ) 142 { 143 printUsage(); 144 System.exit(0); 145 } 146 147 String Qname = args[0]; 149 150 if( Qname == null ) 151 { 152 printUsage(); 153 System.exit(0); 154 } 155 156 QueueReply replier = new QueueReply(Qname, "queue"); 157 } 158 159 160 private static void printUsage() 161 { 162 System.err.println("usage: QueueReply \nSpecify the name of queue to receive requests.\n"); 163 } 164 165 } | Popular Tags |