KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > channel > OpenedSocketManagerMultiPersistentImpl


1 /**
2  * Dream
3  * Copyright (C) 2003-2004 INRIA Rhone-Alpes
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact : dream@objectweb.org
20  *
21  * Initial developer(s): Matthieu Leclercq
22  * Contributor(s): Vivien Quema
23  */

24
25 package org.objectweb.dream.channel;
26
27 import java.io.IOException JavaDoc;
28 import java.util.HashMap JavaDoc;
29 import java.util.LinkedList JavaDoc;
30 import java.util.Map JavaDoc;
31
32 import org.objectweb.dream.AbstractComponent;
33 import org.objectweb.dream.control.activity.Util;
34 import org.objectweb.dream.control.activity.task.AbstractTask;
35 import org.objectweb.dream.control.activity.task.Task;
36 import org.objectweb.dream.control.activity.task.TaskController;
37 import org.objectweb.dream.control.activity.task.thread.ThreadPoolController;
38 import org.objectweb.dream.control.activity.task.thread.ThreadPoolOverflowException;
39 import org.objectweb.dream.util.Error;
40 import org.objectweb.fractal.api.Component;
41 import org.objectweb.fractal.api.NoSuchInterfaceException;
42 import org.objectweb.fractal.api.control.IllegalBindingException;
43 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
44 import org.objectweb.fractal.julia.control.lifecycle.ChainedIllegalLifeCycleException;
45 import org.objectweb.util.monolog.api.BasicLevel;
46
47 /**
48  * Opened socket manager component that lanage multiple opened sockets at a
49  * time. Each opened socket is executed by its own task.
50  */

51 public class OpenedSocketManagerMultiPersistentImpl extends AbstractComponent
52     implements
53       OpenedSocket,
54       OpenedSocketManagerMultiAttributeController
55 {
56
57   protected Task inTask = new InTask();
58
59   protected ThreadPoolController threadPoolController;
60
61   protected LinkedList JavaDoc availableSocketList = new LinkedList JavaDoc();
62   protected int maxOpenedSocket;
63   protected int nbOpenedSocket = 0;
64
65   // ---------------------------------------------------------------------------
66
// Client interfaces
67
// ---------------------------------------------------------------------------
68

69   protected OpenedSocket delegateOpenedSocketItf;
70
71   // ---------------------------------------------------------------------------
72
// Inner task class implementation
73
// ---------------------------------------------------------------------------
74

75   protected class InTask extends AbstractTask
76   {
77
78     /** Constructor */
79     public InTask()
80     {
81       super("ChannelIn-reader-task");
82     }
83
84     /**
85      * @see Task#execute(Object)
86      */

87     public Object JavaDoc execute(Object JavaDoc hints) throws InterruptedException JavaDoc
88     {
89       SocketState socketState;
90       synchronized (availableSocketList)
91       {
92         socketState = (SocketState) availableSocketList.removeFirst();
93       }
94       logger.log(BasicLevel.DEBUG, "ChannelIn-reader-task : got a connection");
95       if (socketState.isClosed())
96       {
97         stopThread(socketState);
98         logger.log(BasicLevel.DEBUG,
99             "ChannelIn-reader-task : connection closed, stop thread");
100         return STOP_EXECUTING;
101       }
102
103       try
104       {
105         // call delegate
106
delegateOpenedSocketItf.openedSocket(socketState);
107       }
108       catch (IOException JavaDoc e)
109       {
110         logger
111             .log(
112                 BasicLevel.WARN,
113                 "ChannelIn-reader-task : I/O error while receiving message, close connection",
114                 e);
115         stopThread(socketState);
116         return STOP_EXECUTING;
117       }
118
119       synchronized (availableSocketList)
120       {
121         availableSocketList.add(socketState);
122       }
123       return EXECUTE_AGAIN;
124     }
125
126     protected void stopThread(SocketState socketState)
127     {
128       synchronized (availableSocketList)
129       {
130         nbOpenedSocket--;
131         // notify connection task.
132
availableSocketList.notify();
133       }
134       socketState.close();
135     }
136   }
137
138   // -------------------------------------------------------------------------
139
// Overriden methods
140
// -------------------------------------------------------------------------
141

142   /**
143    * @see org.objectweb.dream.AbstractComponent#beforeFirstStart(org.objectweb.fractal.api.Component)
144    */

145   protected void beforeFirstStart(Component componentItf)
146       throws IllegalLifeCycleException
147   {
148     try
149     {
150       Map JavaDoc hints = new HashMap JavaDoc();
151       hints.put("thread", "pool");
152       logger.log(BasicLevel.DEBUG, "Initial Max connection=" + maxOpenedSocket);
153       Util.addTask(componentItf, inTask, hints);
154     }
155     catch (Exception JavaDoc e)
156     {
157       throw new IllegalLifeCycleException("Can't add task");
158     }
159   }
160
161   // ---------------------------------------------------------------------------
162
// Implementation of OpenedSocket interface
163
// ---------------------------------------------------------------------------
164

165   /**
166    * @see OpenedSocket#openedSocket(SocketState)
167    */

168   public void openedSocket(SocketState socket) throws IOException JavaDoc,
169       InterruptedException JavaDoc
170   {
171     synchronized (availableSocketList)
172     {
173       nbOpenedSocket++;
174       availableSocketList.add(socket);
175       if (threadPoolController != null)
176       {
177         try
178         {
179           logger.log(BasicLevel.DEBUG, "add a thread in thread pool");
180           threadPoolController.addThreads(1);
181         }
182         catch (ThreadPoolOverflowException e)
183         {
184           logger.log(BasicLevel.WARN, "Unable to add reader thread", e);
185         }
186         catch (IllegalLifeCycleException e)
187         {
188           Error.bug(logger, e);
189         }
190       }
191       // if maxConnection reach, wait.
192
while (nbOpenedSocket >= maxOpenedSocket)
193       {
194         availableSocketList.wait();
195       }
196     }
197   }
198
199   // ---------------------------------------------------------------------------
200
// Implementation of AttributeController interface
201
// ---------------------------------------------------------------------------
202

203   /**
204    * @see OpenedSocketManagerMultiAttributeController#getMaxOpenedSocket()
205    */

206   public int getMaxOpenedSocket()
207   {
208     return maxOpenedSocket;
209   }
210
211   /**
212    * @see OpenedSocketManagerMultiAttributeController#setMaxOpenedSocket(int)
213    */

214   public void setMaxOpenedSocket(int maxOpenedSocket)
215   {
216     this.maxOpenedSocket = maxOpenedSocket;
217     if (threadPoolController != null)
218     {
219       threadPoolController.setCapacity(maxOpenedSocket);
220     }
221   }
222
223   // ---------------------------------------------------------------------------
224
// Implementation of LifeCycleController interface
225
// ---------------------------------------------------------------------------
226

227   /**
228    * @see org.objectweb.dream.AbstractComponent#startFc()
229    */

230   public void startFc() throws IllegalLifeCycleException
231   {
232     super.startFc();
233     synchronized (availableSocketList)
234     {
235       try
236       {
237         TaskController tc = (TaskController) weaveableC
238             .getFcInterface("task-controller");
239         threadPoolController = (ThreadPoolController) tc.getTaskControl(inTask);
240         threadPoolController.setCapacity(getMaxOpenedSocket());
241         threadPoolController.addThreads(availableSocketList.size());
242       }
243       catch (Exception JavaDoc e)
244       {
245         throw new ChainedIllegalLifeCycleException(e, null,
246             "An error occurs while retreiving task control interface");
247       }
248     }
249   }
250
251   /**
252    * @see org.objectweb.fractal.api.control.LifeCycleController#stopFc()
253    */

254   public void stopFc() throws IllegalLifeCycleException
255   {
256     super.stopFc();
257     threadPoolController = null;
258   }
259
260   // ---------------------------------------------------------------------------
261
// Implementation of BindingController interface
262
// ---------------------------------------------------------------------------
263

264   /**
265    * @see org.objectweb.fractal.api.control.BindingController#listFc()
266    */

267   public String JavaDoc[] listFc()
268   {
269     return new String JavaDoc[]{OpenedSocket.ITF_NAME};
270   }
271
272   /**
273    * @see org.objectweb.fractal.api.control.BindingController#bindFc(String,
274    * Object)
275    */

276   public void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
277       throws NoSuchInterfaceException, IllegalBindingException,
278       IllegalLifeCycleException
279   {
280     super.bindFc(clientItfName, serverItf);
281     if (clientItfName.equals(OpenedSocket.ITF_NAME))
282     {
283       delegateOpenedSocketItf = (OpenedSocket) serverItf;
284     }
285   }
286 }
Popular Tags