1 57 58 package org.apache.wsif.util.jms; 59 60 import java.io.Serializable ; 61 62 import javax.jms.JMSException ; 63 import javax.jms.Message ; 64 import javax.jms.Queue ; 65 import javax.jms.QueueReceiver ; 66 67 import org.apache.wsif.WSIFCorrelationId; 68 import org.apache.wsif.WSIFCorrelationService; 69 import org.apache.wsif.WSIFException; 70 import org.apache.wsif.WSIFOperation; 71 import org.apache.wsif.logging.Trc; 72 import org.apache.wsif.util.WSIFCorrelationServiceLocator; 73 import util.TestUtilities; 74 78 public class JMSAsyncListener extends JMS2HTTPBridgeDestination { 79 80 static final String startType = JMS2HTTPBridgeDestination.COLDSTART; 81 static final boolean VERBOSE = TestUtilities.isJmsVerbose(); 82 83 private Thread listenerThread; 84 85 private WSIFJMSListener list = new WSIFJMSListener() { 86 public void onException(JMSException arg1) { 87 Trc.entry(this, arg1); 88 arg1.printStackTrace(); 89 Trc.exit(); 90 } 91 92 public void onMessage(Message message) { 93 Trc.entry(this, message); 94 processResponse(message); 95 Trc.exit(); 96 } 97 }; 98 99 public JMSAsyncListener(String msgQ) throws WSIFException { 100 super( 101 new WSIFJMSFinderForJndi( 102 null, 103 TestUtilities.getWsifProperty( 104 "wsif.jms2httpbridge.initialcontextfactory"), 105 TestUtilities.getWsifProperty( 106 "wsif.jms2httpbridge.jndiproviderurl"), 107 WSIFJMSFinder.STYLE_QUEUE, 108 TestUtilities.getWsifProperty( 109 "wsif.jms2httpbridge.jndiconnectionfactoryname"), 110 msgQ, 111 null), 112 null, 113 WSIFJMSConstants.WAIT_FOREVER, 114 startType, 115 VERBOSE); 116 117 listenerThread = new Thread () { 118 public void run() { 119 try { 120 listen( list ); 121 } catch (WSIFException ex) { 122 ex.printStackTrace(); 123 } 124 } 125 }; 126 listenerThread.start(); 127 } 128 134 public void listen(WSIFJMSListener listener, Queue queue) 135 throws WSIFException { 136 Trc.entry(this, listener, queue); 137 areWeClosed(); 138 139 try { 140 QueueReceiver qr = session.createReceiver(queue); 141 qr.setMessageListener(listener); 142 connection.setExceptionListener(listener); 143 144 connection.start(); 145 146 for (int i = 1; !Thread.interrupted(); i++) { 147 Thread.yield(); 148 Thread.sleep(5000); 149 if (VERBOSE) 150 System.out.println("JMSAsyncListener waiting... " + i); 151 } 152 } catch (JMSException je) { 153 throw WSIFJMSConstants.ToWsifException(je); 154 } catch (InterruptedException ignored) { 155 if ( VERBOSE ) System.out.println("JMSAsyncListener Exitting"); 156 } 157 Trc.exit(); 158 } 159 160 private void processResponse(Message msg) { 161 Serializable so; 162 try { 163 if (VERBOSE) 164 System.out.println( 165 "WSIFJmsAsyncListener.processResponse called"); 166 167 WSIFCorrelationService cs = 168 WSIFCorrelationServiceLocator.getCorrelationService(); 169 WSIFCorrelationId cid = 170 new WSIFJMSCorrelationId(msg.getJMSCorrelationID()); 171 synchronized (cs) { 172 so = cs.get(cid); 173 } 174 if (so != null && so instanceof WSIFOperation) { 175 cs.remove(cid); 176 ((WSIFOperation) so).fireAsyncResponse(msg); 177 } 178 } catch (Exception ex) { 179 ex.printStackTrace(); 180 } 181 } 182 183 public void stop() { 184 listenerThread.interrupt(); 185 } 186 187 } | Popular Tags |