1 24 package dotcom; 25 26 import javax.jms.*; 27 import javax.naming.*; 28 29 43 public class BillingServer { 44 static Context ictx = null; 45 46 public static void main (String argv[]) throws Exception { 47 48 try { 49 ictx = new InitialContext(); 51 TopicConnectionFactory tcf; 53 tcf = (TopicConnectionFactory) ictx.lookup("tcf"); 54 Topic topicOrders; 56 topicOrders = (Topic) ictx.lookup("tOrders"); 57 ictx.close(); 58 59 TopicConnection tc = tcf.createTopicConnection("billing", "billing"); 61 TopicSession tsession; 63 tsession = tc.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); 64 TopicSubscriber ts = tsession.createSubscriber(topicOrders); 66 67 fr.dyade.aaa.util.Queue queue ; 69 queue = new fr.dyade.aaa.util.Queue() ; 70 71 TopicListener billingListener = new TopicListener(tsession, queue); 73 ts.setMessageListener(billingListener); 74 75 BillingTreatment billingTreatment = new BillingTreatment(queue, tc, tsession) ; 77 java.lang.Thread billingThread = new java.lang.Thread (billingTreatment) ; 78 billingThread.start() ; 79 80 tc.start(); 82 83 } catch (Exception exc) { 84 System.out.println("Exception caught in BillingServer: " + exc); 85 exc.printStackTrace(); 86 } 87 } 88 } 89 90 91 102 class BillingTreatment implements Runnable { 103 static Context ictx = null; 104 105 TopicConnection tc ; 106 107 TopicSession tsession ; 108 109 fr.dyade.aaa.util.Queue queue ; 110 111 118 BillingTreatment(fr.dyade.aaa.util.Queue queue, TopicConnection tc, TopicSession tsession) { 119 this.queue = queue ; 120 this.tc = tc ; 121 this.tsession = tsession ; 122 } 123 124 127 public void run() { 128 try { 129 ictx = new InitialContext(); 131 QueueConnectionFactory qcf ; 133 qcf = (QueueConnectionFactory) ictx.lookup("qcf"); 134 Queue queueCheck ; 136 queueCheck = (Queue) ictx.lookup("qCheck"); 137 Queue queueChecked ; 138 queueChecked = (Queue) ictx.lookup("qChecked"); 139 Queue queueBills ; 140 queueBills = (Queue) ictx.lookup("qBills"); 141 ictx.close(); 142 143 QueueConnection qc = qcf.createQueueConnection("billing", "billing"); 145 QueueSession qsession = qc.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); 147 QueueSender qsCheck = qsession.createSender(queueCheck); 149 QueueReceiver qr = qsession.createReceiver(queueChecked); 151 QueueSender qsBills = qsession.createSender(queueBills); 153 qc.start() ; 155 156 System.out.println("BillingServer is ready."); 157 158 while (true) { 159 ObjectMessage msg ; 161 162 msg = (ObjectMessage) queue.get() ; 164 165 msg = (ObjectMessage) queue.pop(); 167 168 if (msg.getObject() instanceof QuitMessage) { 170 QuitMessage quitMsg = (QuitMessage) msg.getObject() ; 172 ObjectMessage msgSent = qsession.createObjectMessage(); 174 msgSent.setObject(quitMsg) ; 175 qsCheck.send(msgSent) ; 177 qsession.commit() ; 178 179 tsession.close() ; 181 tc.close() ; 182 qsession.close() ; 183 qc.close() ; 184 185 System.out.println("Sessions and connections closed by BillingServer."); 186 System.exit(0) ; 187 } 188 189 else if (msg.getObject() instanceof OrderMessage) { 191 OrderMessage orderMsg = (OrderMessage) msg.getObject() ; 193 194 System.out.println("Message received by BillingServer from WebServer: " + orderMsg.id) ; 195 196 ObjectMessage msgSent = qsession.createObjectMessage(); 198 msgSent.setObject(orderMsg) ; 199 qsCheck.send(msgSent) ; 201 qsession.commit() ; 203 204 ObjectMessage msgRec = (ObjectMessage) qr.receive() ; 206 OkMessage okMsg = (OkMessage)(msgRec.getObject()) ; 208 209 System.out.println("Message received by BillingServer from ControlServer: " + okMsg.id) ; 210 211 msgSent = qsession.createObjectMessage() ; 213 msgSent.setObject(okMsg) ; 214 qsBills.send(msgSent); 216 217 qsession.commit() ; 219 } 220 } 221 } catch (Exception exc) { 222 System.out.println("Exception caught in BillingServer thread: " + exc) ; 223 exc.printStackTrace() ; 224 } 225 } 226 } 227 | Popular Tags |