1 24 package org.objectweb.joram.mom.proxies.tcp; 25 26 import java.io.IOException ; 27 28 import org.objectweb.joram.mom.proxies.CloseConnectionNot; 29 import org.objectweb.joram.mom.proxies.ConnectionManager; 30 import org.objectweb.joram.mom.proxies.FlowControl; 31 import org.objectweb.joram.mom.proxies.MultiCnxSync; 32 import org.objectweb.joram.mom.proxies.ProxyMessage; 33 import org.objectweb.joram.mom.proxies.RequestNot; 34 import org.objectweb.joram.shared.client.AbstractJmsRequest; 35 import org.objectweb.joram.shared.client.ProducerMessages; 36 37 import org.objectweb.joram.shared.JoramTracing; 38 import org.objectweb.util.monolog.api.BasicLevel; 39 40 import fr.dyade.aaa.util.Daemon; 41 import fr.dyade.aaa.agent.AgentId; 42 import fr.dyade.aaa.agent.Channel; 43 44 48 public class TcpReader extends Daemon { 49 50 53 private TcpConnection tcpConnection; 54 55 private IOControl ioctrl; 56 57 private AgentId proxyId; 58 59 private boolean closeConnection; 60 61 71 public TcpReader(IOControl ioctrl, AgentId proxyId, 72 TcpConnection tcpConnection, boolean closeConnection) throws IOException { 73 super("tcpReader"); 74 this.ioctrl = ioctrl; 75 this.proxyId = proxyId; 76 this.tcpConnection = tcpConnection; 77 this.closeConnection = closeConnection; 78 } 79 80 public void run() { 81 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 82 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "TcpReader.run()"); 83 try { 84 while (running) { 85 ProxyMessage msg = ioctrl.receive(); 86 canStop = false; 87 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 88 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "TcpReader reads msg: " 89 + msg); 90 ConnectionManager.sendToProxy( 91 proxyId, 92 tcpConnection.getKey(), 93 (AbstractJmsRequest)msg.getObject(), 94 msg); 95 canStop = true; 96 } 97 } catch (Throwable error) { 98 canStop = false; 99 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 100 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "", error); 101 } finally { 102 canStop = false; 103 if (closeConnection) { 104 Channel.sendTo(proxyId, new CloseConnectionNot(tcpConnection.getKey())); 105 } 106 new Thread (new Runnable () { 107 public void run() { 108 tcpConnection.close(); 109 } 110 }).start(); 111 112 } 113 } 114 115 protected void shutdown() { 116 close(); 117 } 118 119 protected void close() { 120 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 121 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "TcpReader.close()"); 122 if (ioctrl != null) 123 ioctrl.close(); 124 ioctrl = null; 125 } 126 } 127 | Popular Tags |