KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > tribe > channel > AbstractChannelPool


1 /**
2  * Tribe: Group communication library.
3  * Copyright (C) 2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: tribe@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.tribe.channel;
26
27 import java.util.ArrayList JavaDoc;
28 import java.util.Collection JavaDoc;
29 import java.util.HashMap JavaDoc;
30
31 import org.objectweb.tribe.common.Address;
32 import org.objectweb.tribe.common.Member;
33 import org.objectweb.tribe.common.log.Trace;
34 import org.objectweb.tribe.exceptions.ChannelException;
35 import org.objectweb.tribe.messages.ChannelMessage;
36
37 /**
38  * This class defines a AbstractChannelPool
39  *
40  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
41  * @version 1.0
42  */

43 public abstract class AbstractChannelPool
44 {
45   // HashMap: IpAddress -> AbstractReliableFifoChannel
46
protected HashMap JavaDoc channels;
47   // HashMap: IpAddress -> AbstractServerChannel
48
protected HashMap JavaDoc serverChannels;
49   // HashMap: Object(keys) -> ArrayList<ReceiveBuffer>
50
protected HashMap JavaDoc keyBuffers;
51
52   private static Trace logger = Trace.getLogger("org.objectweb.tribe.channel");
53
54   /**
55    * Creates a new <code>AbstractChannelPool</code> object
56    */

57   public AbstractChannelPool()
58   {
59     channels = new HashMap JavaDoc();
60     serverChannels = new HashMap JavaDoc();
61     keyBuffers = new HashMap JavaDoc();
62   }
63
64   /**
65    * Get a reliable fifo serverSocket to the given destination.
66    * <p>
67    * If no serverSocket exist, a new one is created else an existing one may be
68    * returned.
69    *
70    * @param destination destination address to reach
71    * @return a reliable fifo serverSocket
72    * @throws ChannelException if an error occurs
73    */

74   public abstract AbstractReliableFifoChannel getChannel(Address destination)
75       throws ChannelException;
76
77   /**
78    * Removed the specified channel from the pool.
79    *
80    * @param channel the channel to remove
81    * @return true if the channel was removed from the pool, false if the channel
82    * wasn't found in the pool.
83    */

84   public boolean removeChannelFromPool(AbstractReliableFifoChannel channel)
85   {
86     synchronized (channels)
87     {
88       Collection JavaDoc values = channels.values();
89       if (values == null)
90         return false;
91       return values.remove(channel);
92     }
93   }
94
95   /**
96    * Get a server socket on the given address.
97    * <p>
98    * If no server Socket exist, a new one is created else an existing one may be
99    * returned.
100    *
101    * @param serverAddress server address to bind
102    * @return a server serverSocket
103    * @throws ChannelException if an error occurs
104    */

105   public abstract AbstractServerChannel getServerChannel(Address serverAddress)
106       throws ChannelException;
107
108   /**
109    * Removed the specified channel from the pool.
110    *
111    * @param channel the channel to remove
112    * @return true if the channel was removed from the pool, false if the channel
113    * wasn't found in the pool.
114    */

115   public boolean removeServerChannelFromPool(AbstractServerChannel channel)
116   {
117     synchronized (serverChannels)
118     {
119       Collection JavaDoc values = serverChannels.values();
120       if (values == null)
121         return false;
122       return values.remove(channel);
123     }
124   }
125
126   /**
127    * Register a new receive buffer (which includes the group identifier the
128    * client is interested in).
129    *
130    * @param buffer ReceiveBuffer to register
131    */

132   public void registerReceiveBuffer(ReceiveBuffer buffer)
133   {
134     Object JavaDoc key = buffer.getBufferKey();
135     synchronized (keyBuffers)
136     {
137       if (logger.isDebugEnabled())
138         logger.debug("Registering new receive buffer: " + key);
139       // Get buffers for this gid
140
ArrayList JavaDoc buffers = (ArrayList JavaDoc) keyBuffers.get(key);
141       if (buffers == null)
142       { // None, create a new list
143
buffers = new ArrayList JavaDoc();
144         keyBuffers.put(key, buffers);
145       }
146       // Add the given buffer
147
synchronized (buffers)
148       {
149         buffers.add(buffer);
150       }
151     }
152   }
153
154   /**
155    * Unregister a receive buffer. When the last receive buffer is unregistered,
156    * all channels are closed.
157    *
158    * @param buffer ReceiveBuffer to unregister
159    * @return true if the ReceiveBuffer was successfully unregistered, false if
160    * it was not found in this pool.
161    */

162   public boolean unregisterReceiveBuffer(ReceiveBuffer buffer)
163   {
164     Object JavaDoc key = buffer.getBufferKey();
165     synchronized (keyBuffers)
166     {
167       if (logger.isDebugEnabled())
168         logger.debug("Unregistering receive buffer: " + key);
169       // Get buffers for this gid
170
ArrayList JavaDoc buffers = (ArrayList JavaDoc) keyBuffers.get(key);
171       if (buffers == null)
172         return false; // No buffer for this group id
173
// Remove the given buffer
174
synchronized (buffers)
175       {
176         boolean wasUnregistered = buffers.remove(buffer);
177         if (buffer.isEmpty())
178         { // Close all channels, no subscription to anything anymore
179
Object JavaDoc[] channelsToClose = channels.values().toArray();
180           for (int i = 0; i < channelsToClose.length; i++)
181           {
182             AbstractReliableFifoChannel channel = (AbstractReliableFifoChannel) channelsToClose[i];
183             removeChannelFromPool(channel);
184           }
185           channels.clear();
186         }
187         if (wasUnregistered)
188           buffer.addMessage(null); // Notify the queue to wake up blocking
189
// threads
190
return wasUnregistered;
191       }
192     }
193   }
194
195   /**
196    * Sends a message to the given list of recipients.
197    *
198    * @param msg message to send
199    * @param members list of destination members
200    * @return list of members who failed, null if everything was successful.
201    */

202   public ArrayList JavaDoc send(ChannelMessage msg, ArrayList JavaDoc members)
203   {
204     ArrayList JavaDoc failed = null;
205     Address lastAddress = null;
206     synchronized (members)
207     {
208       int size = members.size();
209       for (int i = 0; i < size; i++)
210       {
211         Member m = (Member) members.get(i);
212         // Check that we do not send multiple times to co-located members
213
if (!m.getAddress().equals(lastAddress))
214         {
215           AbstractReliableFifoChannel channel = null;
216           try
217           {
218             channel = getChannel(m.getAddress());
219             channel.send(msg);
220             lastAddress = m.getAddress();
221           }
222           catch (Exception JavaDoc e)
223           {
224             logger.debug("Failed to send message to member " + m, e);
225             if (failed == null)
226               failed = new ArrayList JavaDoc();
227             failed.add(m);
228             removeChannelFromPool(channel);
229             lastAddress = null;
230           }
231         }
232       }
233     }
234     return failed;
235   }
236
237 }
Popular Tags