1 24 25 package org.objectweb.dream.channel; 26 27 import java.io.IOException ; 28 import java.util.HashMap ; 29 import java.util.LinkedList ; 30 import java.util.Map ; 31 32 import org.objectweb.dream.AbstractComponent; 33 import org.objectweb.dream.control.activity.Util; 34 import org.objectweb.dream.control.activity.task.AbstractTask; 35 import org.objectweb.dream.control.activity.task.Task; 36 import org.objectweb.dream.control.activity.task.TaskController; 37 import org.objectweb.dream.control.activity.task.thread.ThreadPoolController; 38 import org.objectweb.dream.control.activity.task.thread.ThreadPoolOverflowException; 39 import org.objectweb.dream.util.Error; 40 import org.objectweb.fractal.api.Component; 41 import org.objectweb.fractal.api.NoSuchInterfaceException; 42 import org.objectweb.fractal.api.control.IllegalBindingException; 43 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 44 import org.objectweb.fractal.julia.control.lifecycle.ChainedIllegalLifeCycleException; 45 import org.objectweb.util.monolog.api.BasicLevel; 46 47 51 public class OpenedSocketManagerMultiPersistentImpl extends AbstractComponent 52 implements 53 OpenedSocket, 54 OpenedSocketManagerMultiAttributeController 55 { 56 57 protected Task inTask = new InTask(); 58 59 protected ThreadPoolController threadPoolController; 60 61 protected LinkedList availableSocketList = new LinkedList (); 62 protected int maxOpenedSocket; 63 protected int nbOpenedSocket = 0; 64 65 69 protected OpenedSocket delegateOpenedSocketItf; 70 71 75 protected class InTask extends AbstractTask 76 { 77 78 79 public InTask() 80 { 81 super("ChannelIn-reader-task"); 82 } 83 84 87 public Object execute(Object hints) throws InterruptedException 88 { 89 SocketState socketState; 90 synchronized (availableSocketList) 91 { 92 socketState = (SocketState) availableSocketList.removeFirst(); 93 } 94 logger.log(BasicLevel.DEBUG, "ChannelIn-reader-task : got a connection"); 95 if (socketState.isClosed()) 96 { 97 stopThread(socketState); 98 logger.log(BasicLevel.DEBUG, 99 "ChannelIn-reader-task : connection closed, stop thread"); 100 return STOP_EXECUTING; 101 } 102 103 try 104 { 105 delegateOpenedSocketItf.openedSocket(socketState); 107 } 108 catch (IOException e) 109 { 110 logger 111 .log( 112 BasicLevel.WARN, 113 "ChannelIn-reader-task : I/O error while receiving message, close connection", 114 e); 115 stopThread(socketState); 116 return STOP_EXECUTING; 117 } 118 119 synchronized (availableSocketList) 120 { 121 availableSocketList.add(socketState); 122 } 123 return EXECUTE_AGAIN; 124 } 125 126 protected void stopThread(SocketState socketState) 127 { 128 synchronized (availableSocketList) 129 { 130 nbOpenedSocket--; 131 availableSocketList.notify(); 133 } 134 socketState.close(); 135 } 136 } 137 138 142 145 protected void beforeFirstStart(Component componentItf) 146 throws IllegalLifeCycleException 147 { 148 try 149 { 150 Map hints = new HashMap (); 151 hints.put("thread", "pool"); 152 logger.log(BasicLevel.DEBUG, "Initial Max connection=" + maxOpenedSocket); 153 Util.addTask(componentItf, inTask, hints); 154 } 155 catch (Exception e) 156 { 157 throw new IllegalLifeCycleException("Can't add task"); 158 } 159 } 160 161 165 168 public void openedSocket(SocketState socket) throws IOException , 169 InterruptedException 170 { 171 synchronized (availableSocketList) 172 { 173 nbOpenedSocket++; 174 availableSocketList.add(socket); 175 if (threadPoolController != null) 176 { 177 try 178 { 179 logger.log(BasicLevel.DEBUG, "add a thread in thread pool"); 180 threadPoolController.addThreads(1); 181 } 182 catch (ThreadPoolOverflowException e) 183 { 184 logger.log(BasicLevel.WARN, "Unable to add reader thread", e); 185 } 186 catch (IllegalLifeCycleException e) 187 { 188 Error.bug(logger, e); 189 } 190 } 191 while (nbOpenedSocket >= maxOpenedSocket) 193 { 194 availableSocketList.wait(); 195 } 196 } 197 } 198 199 203 206 public int getMaxOpenedSocket() 207 { 208 return maxOpenedSocket; 209 } 210 211 214 public void setMaxOpenedSocket(int maxOpenedSocket) 215 { 216 this.maxOpenedSocket = maxOpenedSocket; 217 if (threadPoolController != null) 218 { 219 threadPoolController.setCapacity(maxOpenedSocket); 220 } 221 } 222 223 227 230 public void startFc() throws IllegalLifeCycleException 231 { 232 super.startFc(); 233 synchronized (availableSocketList) 234 { 235 try 236 { 237 TaskController tc = (TaskController) weaveableC 238 .getFcInterface("task-controller"); 239 threadPoolController = (ThreadPoolController) tc.getTaskControl(inTask); 240 threadPoolController.setCapacity(getMaxOpenedSocket()); 241 threadPoolController.addThreads(availableSocketList.size()); 242 } 243 catch (Exception e) 244 { 245 throw new ChainedIllegalLifeCycleException(e, null, 246 "An error occurs while retreiving task control interface"); 247 } 248 } 249 } 250 251 254 public void stopFc() throws IllegalLifeCycleException 255 { 256 super.stopFc(); 257 threadPoolController = null; 258 } 259 260 264 267 public String [] listFc() 268 { 269 return new String []{OpenedSocket.ITF_NAME}; 270 } 271 272 276 public void bindFc(String clientItfName, Object serverItf) 277 throws NoSuchInterfaceException, IllegalBindingException, 278 IllegalLifeCycleException 279 { 280 super.bindFc(clientItfName, serverItf); 281 if (clientItfName.equals(OpenedSocket.ITF_NAME)) 282 { 283 delegateOpenedSocketItf = (OpenedSocket) serverItf; 284 } 285 } 286 } | Popular Tags |