1 23 package com.sun.enterprise.jbi.serviceengine.comm; 24 25 import com.sun.enterprise.jbi.serviceengine.core.JavaEEServiceEngineContext; 26 import java.util.HashMap ; 27 import java.util.logging.Level ; 28 29 import javax.jbi.messaging.MessageExchange; 30 import javax.jbi.messaging.MessagingException; 31 import com.sun.enterprise.jbi.serviceengine.work.OneWork; 32 33 43 public class MessageAcceptor extends OneWork { 44 45 private HashMap <String ,MessageReceiver> receivers = new HashMap (); 46 private boolean released = false; 47 48 52 public void startAccepting() { 53 execute(); 54 } 55 56 62 public void register(MessageReceiver receiver) { 63 String id = receiver.getMessageExchange().getExchangeId(); 64 logger.log(Level.FINER, "Adding recever for " + id); 65 synchronized (receivers) { 66 receivers.put(id, receiver); 67 } 68 } 69 70 75 public void release() { 76 released = true; 77 } 78 79 88 public void doWork() { 89 while (true) { 90 try { 91 92 MessageExchange me = getDeliveryChannel().accept(); 93 if (released) { 94 break; 95 } 96 97 if(me != null) { 98 String id = me.getExchangeId(); 99 100 if (receivers.containsKey(id)) { 105 synchronized(receivers) { 106 MessageReceiver receiver = receivers.remove(id); 107 receiver.setMessageExchange(me); 108 if (logger.isLoggable(Level.FINE)) { 109 logger.log(Level.FINE, 110 "Releasing MessageReceiver:" + id + ",MEP :" + me); 111 } 112 receiver.release(); 113 } 114 } else { 115 MessageProcessor processor = 116 JavaEEServiceEngineContext.getInstance(). 117 getBridge().getMessageProcessor(me); 118 processor.setUseCurrentThread(false); 119 processor.setMessageExchange(me); 120 if (logger.isLoggable(Level.FINE)) { 121 logger.log(Level.FINE, 122 "Spawning MessageProcessorfor MEP :" + me); 123 } 124 processor.process(); 125 } 126 } 127 } catch (MessagingException ie) { 128 logger.log(Level.FINE, "Stopping the acceptor thread"); 130 break; 131 } 132 } 133 } 134 } 135 136 | Popular Tags |