1 22 package fr.dyade.aaa.jndi2.ha; 23 24 import java.io.*; 25 import java.net.*; 26 import java.util.*; 27 28 29 import org.objectweb.util.monolog.api.BasicLevel; 30 31 import fr.dyade.aaa.util.*; 32 import fr.dyade.aaa.agent.*; 33 import fr.dyade.aaa.jndi2.server.*; 34 import fr.dyade.aaa.jndi2.msg.*; 35 36 public class HATcpServer { 37 38 private volatile ServerSocket listen; 39 40 private Monitor monitors[]; 41 42 private AgentId serverId; 43 44 public HATcpServer(ServerSocket listen, 45 int nbm, 46 AgentId serverId) { 47 this.listen = listen; 48 this.monitors = new Monitor[nbm]; 49 this.serverId = serverId; 50 for (int i = 0; i < monitors.length; i++) { 51 monitors[i] = new Monitor("JndiServer.Monitor#" + i, this); 52 monitors[i].setDaemon(true); 53 monitors[i].setThreadGroup(AgentServer.getThreadGroup()); 54 } 55 } 56 57 public final void start() { 58 for (int i = 0; i < monitors.length; i++) { 59 monitors[i].start(); 60 } 61 } 62 63 public final void stop() { 64 if (Trace.logger.isLoggable(BasicLevel.DEBUG)) 65 Trace.logger.log( 66 BasicLevel.DEBUG, "TcpServer.stop()"); 67 try { 68 listen.close(); 69 listen = null; 70 } catch (Exception exc) {} 71 for (int i = 0; i < monitors.length; i++) { 72 monitors[i].stop(); 73 } 74 } 75 76 public final ServerSocket getListen() { 77 return listen; 78 } 79 80 public final AgentId getServerId() { 81 return serverId; 82 } 83 84 public static class Monitor extends Daemon { 85 86 private HATcpServer tcpServer; 87 88 protected Monitor(String name, 89 HATcpServer tcpServer) { 90 super(name); 91 this.tcpServer = tcpServer; 92 } 93 94 public final void run() { 95 Socket socket; 96 try { 97 loop: 98 while (running) { 99 canStop = true; 100 try { 101 ServerSocket listen = tcpServer.getListen(); 102 if (listen != null) { 103 socket = listen.accept(); 104 canStop = false; 105 } else { 106 break loop; 107 } 108 } catch (IOException exc) { 109 if (running) { 110 Trace.logger.log( 111 BasicLevel.ERROR, 112 this.getName() + 113 ", error during accept", exc); 114 try { 115 Thread.sleep(1000); 116 } catch (InterruptedException ie) {} 117 continue loop; 118 } else { 119 break loop; 120 } 121 } 122 123 if (! running) break loop; 124 125 if (Trace.logger.isLoggable(BasicLevel.DEBUG)) { 126 Trace.logger.log( 127 BasicLevel.DEBUG, 128 this.getName() + ", connection from " + 129 socket.getInetAddress() + ':' + 130 socket.getPort()); 131 } 132 133 try { 134 IOControl ioCtrl = new IOControl(socket); 135 int rid = ioCtrl.readInt(); 136 if (Trace.logger.isLoggable(BasicLevel.DEBUG)) 137 Trace.logger.log(BasicLevel.DEBUG, " -> request id = " + rid); 138 switch (rid) { 139 case HARequestManager.IDEMPOTENT: 140 Channel.sendTo( 141 tcpServer.getServerId(), 142 new TcpRequestNot(new HARequestContext( 143 ioCtrl, HARequestManager.IDEMPOTENT))); 144 break; 145 case HARequestManager.NOT_IDEMPOTENT: 146 GetRequestIdNot gri = 147 new GetRequestIdNot(); 148 gri.invoke(tcpServer.getServerId()); 149 int newRid = gri.getId(); 150 ioCtrl.writeInt(newRid); 151 Channel.sendTo( 152 tcpServer.getServerId(), 153 new TcpRequestNot(new HARequestContext( 154 ioCtrl, newRid))); 155 break; 156 default: 157 Channel.sendTo( 158 tcpServer.getServerId(), 159 new TcpRequestNot(new HARequestContext( 160 ioCtrl, rid))); 161 } 162 } catch (Exception exc) { 163 Trace.logger.log( 164 BasicLevel.ERROR, 165 this.getName() + 166 "", exc); 167 } 168 } 169 } finally { 170 finish(); 171 } 172 } 173 174 protected void close() { 175 176 } 177 178 protected void shutdown() { 179 close(); 180 } 181 } 182 } 183 | Popular Tags |