KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > core > net > NetworkManager


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Uri Schneider.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46
47 /*
48  * Created on Dec 16, 2003
49  *
50  * responsible for all network trafic
51  */

52
53 package org.mr.core.net;
54
55 import java.io.IOException JavaDoc;
56 import java.net.InetAddress JavaDoc;
57 import java.net.InetSocketAddress JavaDoc;
58 import java.net.SocketAddress JavaDoc;
59 import java.nio.ByteBuffer JavaDoc;
60 import java.nio.channels.SocketChannel JavaDoc;
61 import java.security.MessageDigest JavaDoc;
62 import java.util.ArrayList JavaDoc;
63 import java.util.Collection JavaDoc;
64 import java.util.HashSet JavaDoc;
65 import java.util.Iterator JavaDoc;
66 import java.util.List JavaDoc;
67 import java.util.Set JavaDoc;
68
69 import org.apache.commons.logging.Log;
70 import org.apache.commons.logging.LogFactory;
71 import org.mr.MantaAgent;
72 import org.mr.core.configuration.ConfigManager;
73 import org.mr.core.configuration.ConfigurationElement;
74 import org.mr.core.log.StartupLogger;
75 import org.mr.core.net.messages.NetworkMessage;
76 import org.mr.core.net.messages.NetworkMessageHandler;
77 import org.mr.core.net.messages.NetworkMessageID;
78 import org.mr.core.protocol.MantaBusMessage;
79 import org.mr.core.stats.StatManager;
80 import org.mr.core.util.byteable.IncomingByteBufferPool;
81 import org.mr.kernel.delivery.PostOffice;
82 import org.mr.kernel.world.WorldModeler;
83 import org.mr.kernel.world.WorldModelerNetListener;
84 import org.mr.kernel.security.MantaAuthorization;
85
86 import javax.jms.JMSSecurityException JavaDoc;
87
88 /**
89  * @author Uri Schneider
90  *
91  * NetworkManager - container class for the network layer. The
92  * network manager provides a message sending interface for the
93  * protocol layer, and also reads messages from the network using a
94  * NetworkSelector object.
95  */

96 public class NetworkManager
97     implements NetworkListener, WorldModelerNetListener,
98                NetworkMessageHandler
99 {
100     // members
101
// -------
102
private TransportTable transportTable;
103     private NetworkSelector selector;
104     private String JavaDoc myAgentName;
105
106     private Log log;
107     private StatManager statManager;
108     private AgentMonitorManager monitorManager;
109
110     private boolean transportChanged;
111
112     private static boolean tcpNoDelay = true;
113
114     /**
115      * Default constructor
116      */

117     public NetworkManager(WorldModeler modeler, StatManager statManager)
118     {
119         this.log = LogFactory.getLog("NetworkManager");
120         tcpNoDelay = MantaAgent.getInstance().getSingletonRepository()
121             .getConfigManager().getBooleanProperty("network.setTcpNoDelay", true);
122         this.transportTable = new TransportTable();
123         this.selector = new NetworkSelector();
124         this.statManager = statManager;
125         this.monitorManager = new AgentMonitorManager();
126         this.transportChanged = false;
127
128         selector.setListener(this);
129         selector.start();
130
131         modeler.getNetworkListener().addListener(this);
132
133
134     }
135
136
137
138     /**
139      * Enqueues a request to send the buffer to the addresses specified
140      * in the message.
141      * @param msg - the message to be sent. this is used for its address list.
142      */

143 // public void sendBuffer(MantaBusMessage msg)
144
// {
145
// doSendBuffer(msg);
146
// }
147

148     /**
149      * whether or not there is a connection to the specified agent
150      *
151      * @param agent a <code>String</code> value
152      * @return a <code>boolean</code> value
153      */

154     public boolean hasConnection(String JavaDoc agent) {
155         Set JavaDoc transports = transportTable.getTransports(agent);
156         if (transports != null && ! transports.isEmpty()) {
157             Iterator JavaDoc i = transports.iterator();
158             Transport t = (Transport) i.next();
159
160             return t.isConnected();
161         }
162         return false;
163     }
164
165     // implementation of interface NetworkSelectorListener
166

167     /* (non-Javadoc)
168      * @see org.mr.core.net.NetworkSelectorListener#acceptedChannel(java.nio.channels.SocketChannel)
169      */

170     public void acceptedChannel(SocketChannel JavaDoc channel) {
171         try {
172             InetAddress JavaDoc remote = channel.socket().getInetAddress();
173             MantaAuthorization auth =
174                 MantaAgent.getInstance().getSingletonRepository().
175                 getMantaAuthorization();
176             if (auth != null) {
177                 auth.authorize(remote);
178             }
179             TransportImpl impl = transportTable.addPendingTransport(channel,
180                                                                     this);
181             if (impl != null) {
182                 this.selector.addTransportImpl(impl, null);
183             }
184         } catch (SecurityException JavaDoc e) {
185             if (this.log.isWarnEnabled()) {
186                 this.log.warn("Accepted channel was not authorized: " +
187                               channel + ". Discarding.");
188             }
189             try {
190                 channel.close();
191             } catch (IOException JavaDoc e1) {}
192         }
193     }
194
195     public void acceptedImpl(TransportImpl impl) {
196         impl.setListener(this);
197         transportTable.addPendingTransport(impl);
198     }
199
200     /* (non-Javadoc)
201       * @see org.mr.core.net.NetworkListener#messageReady(java.nio.channels.SocketChannel, org.mr.core.net.CNLMessage)
202       */

203     public void messageReady(CNLMessage message) {
204         if (message.getType() == CNLMessage.CNL_NETWORK) {
205             networkMessageReady(message);
206         } else if (message.getType() == CNLMessage.CNL_MANTA) {
207             mantaMessageReady(message);
208         }
209     }
210
211     // implementation of interface NetworkMessageHandler
212
public void handleNetMessageID(NetworkMessageID msg) {
213         SocketAddress JavaDoc remote = msg.getSource();
214         InetAddress JavaDoc local = ((InetSocketAddress JavaDoc) msg.getDest()).getAddress();
215         boolean isTCP = msg.isTCP();
216         boolean initId = msg.getInitId();
217
218         if(log.isInfoEnabled()){
219             log.info("HandleID: received ID (" + msg.getName() + ") from " +
220                         remote.toString()+".");
221         }
222
223         if (isTCP && this.transportTable.isPending(remote)) {
224             boolean success =
225                 transportTable.associatePending(remote, msg.getName(),
226                                                 this.myAgentName, initId);
227             if (! success) {
228                 if(log.isWarnEnabled()) {
229                     log.warn("A peer named " + msg.getName() + " tried to " +
230                              "connect, but does not appear in the world map." +
231                              " Closing connection (" + remote.toString() +
232                              ")");
233                 }
234             }
235         } else {
236             Transport t = null;
237             if (isTCP) {
238                 t = this.transportTable.getTransport(remote);
239             } else {
240                 t = this.transportTable.getUdpTransport(((InetSocketAddress JavaDoc) remote).getAddress());
241             }
242             if (t != null) {
243                 t.setInitialized(local, initId);
244             }
245         }
246     }
247
248     /**
249      * return a Set containing agent names (Strings) to which this
250      * agent is connected to. this is used by the CMC command
251      * get_connections.
252      *
253      * @return a <code>List</code> value
254      */

255     public List JavaDoc getConnections() {
256         return this.transportTable.getConnections(this.myAgentName);
257     }
258
259     public void addAgentStateListener(String JavaDoc agent,
260                                       AgentStateListener listener) {
261         Set JavaDoc transports = this.transportTable.getTransports(agent);
262
263         if (transports == null) {
264             if (this.log.isFatalEnabled()) {
265                 this.log.fatal("Cannot add state listener for " + agent +
266                                ": no transports defined.");
267             }
268             throw new IllegalArgumentException JavaDoc("No transports for peer " +
269                                                agent);
270         }
271
272         Set JavaDoc accessibleTransports = new HashSet JavaDoc();
273         Iterator JavaDoc i = transports.iterator();
274         while (i.hasNext()) {
275             Transport t = (Transport) i.next();
276             TransportType type = t.getInfo().getTransportInfoType();
277             if (this.transportTable.isLocalType(type)) {
278                 accessibleTransports.add(t);
279             }
280         }
281         this.monitorManager.addMonitor(agent, accessibleTransports, listener);
282     }
283
284     public void removeAgentStateListener(String JavaDoc agent,
285                                          AgentStateListener listener) {
286         Set JavaDoc transports = this.transportTable.getTransports(agent);
287         this.monitorManager.removeMonitor(agent, transports, listener);
288     }
289
290     public int getAgentState(String JavaDoc agent) {
291         return this.monitorManager.getAgentState(agent);
292     }
293
294     public boolean isAccessible(String JavaDoc agent) {
295         if (agent.equals(this.myAgentName)) {
296             return true;
297         }
298         return this.transportTable.isAccessible(agent);
299     }
300
301     /**
302      * Return the local interface address used for communicating with
303      * an agent.
304      */

305     public InetAddress JavaDoc getLocalInterface(String JavaDoc agent) {
306         return this.transportTable.getLocalInterface(agent);
307     }
308
309     /**
310      * create a LocalTransport object and bind it to an address
311      * @param addr the local interface on which to bind the server
312      * @param portMin the minimum port number
313      * @param portMax the maximum port number
314      * @param type the transport type @see TransportType
315      * @return the address to which the server was bound
316      * @throws IOException if bind failed
317      */

318     public InetSocketAddress JavaDoc createServer(InetAddress JavaDoc addr, int portMin,
319                                           int portMax, TransportType type)
320         throws IOException JavaDoc
321     {
322         try {
323             LocalTransport local =
324                 TransportProvider.createLocalTransport(type, addr, portMin,
325                                                        portMax, this,
326                                                        this.selector);
327             this.transportTable.addLocalTransport(local);
328             this.transportChanged = true;
329             return (InetSocketAddress JavaDoc) local.getSocketAddress();
330         } catch (IllegalArgumentException JavaDoc e) {
331             // logger is not initialized yet at this stage
332
StartupLogger.log.error("Could not create server" +
333                                    " socket on " + addr.getHostName() + ":" +
334                                    portMin +
335                                    (portMin == portMax ? "" : ("-" + portMax)) +
336                                    ": " + e.getMessage(), "NetworkManager");
337 // System.err.println("NetworkManager ERROR Could not create server" +
338
// " socket on " + addr.getHostName() + ":" +
339
// portMin +
340
// (portMin == portMax ? "" : ("-" + portMax)) +
341
// ": " + e.getMessage());
342
return null;
343         }
344     }
345
346     /**
347      * close a server
348      * @param bindAddr the address to which the server was bound
349      */

350     public void destroyServer(InetSocketAddress JavaDoc bindAddr) {
351         LocalTransport local =
352             this.transportTable.removeLocalTransport(bindAddr);
353         this.transportChanged = true;
354         local.close();
355     }
356
357     /**
358      * make all servers start accepting
359      */

360     public void startServers() {
361         Iterator JavaDoc i = this.transportTable.getLocalTransports().iterator();
362         while (i.hasNext()) {
363             LocalTransport local = (LocalTransport) i.next();
364             local.open();
365         }
366     }
367
368     /**
369      * @return a collection of LocalTransport objects
370      */

371     public Collection JavaDoc getLocalTransports(){
372         return this.transportTable.getLocalTransports();
373     }
374
375     /**
376      * make a server start accepting
377      * @param addr the address to which the server was bound
378      */

379     public void startServer(InetSocketAddress JavaDoc addr) {
380         LocalTransport local =
381             this.transportTable.getLocalTransport(addr);
382         local.open();
383     }
384
385     public void sendBuffer(MantaBusMessage message) {
386         Transport out = decideTransport(message.getRealNetAddress());
387         if(log.isDebugEnabled()){
388             log.debug("Sending buffer: Message ID=" + message.getMessageId() +
389                       " using transport " +
390                       (out != null ? out.toString() : "null"));
391         }
392         if (out == null) {
393             if(log.isWarnEnabled() && this.transportChanged){
394                 log.warn("Cannot send message to " +
395                          message.getRealNetAddress().getAgentName()+
396                          ": no transports defined for the remote peer, or " +
397                          "no local transports exist which match remote " +
398                          "peer's transport type. Please review your " +
399                          "configuration.");
400             }
401             message.release(false);
402             this.transportChanged = false;
403             return;
404         }
405
406         CNLMessage cnlMessage = null;
407         try {
408             ByteBuffer JavaDoc[] buffers = message.getNetBuffers();
409             cnlMessage = new CNLMessage(CNLMessage.CNL_MANTA, buffers);
410             cnlMessage.setBusMessage(message);
411             cnlMessage.use();
412         } catch (IOException JavaDoc e) {
413             if(log.isErrorEnabled()){
414                 log.error("An error occured during message serialization.", e);
415                 return;
416             }
417         }
418
419         try {
420             out.createImpls();
421         } catch (IOException JavaDoc e) {
422             if(log.isWarnEnabled()){
423                 log.warn("Cannot create connection to " +
424                             out.getInfo().getSocketAddress().toString() +
425                             ": " + e.toString()+".");
426             }
427             cnlMessage.unuse();
428             return;
429         }
430         out.sendMantaMessage(cnlMessage);
431         cnlMessage.unuse();
432     }
433
434     // implementation of interface WorldModelerNetListener
435

436     public void handleAgentAdded(String JavaDoc agent) {
437         if(log.isDebugEnabled()){
438             log.debug("New peer discovered: "+agent+".");
439         }
440
441         if (! agent.equals(myAgentName)) {
442             transportTable.addAgent(agent, new HashSet JavaDoc());
443         }
444     }
445
446     public void handleAgentRemoved(String JavaDoc agent) {
447         if(log.isDebugEnabled()){
448             log.debug("Peer disconnected: "+agent+".");
449         }
450
451         Set JavaDoc transports = transportTable.removeAgent(agent);
452         if (transports != null) {
453             Iterator JavaDoc i = transports.iterator();
454             while (i.hasNext()) {
455                 Transport t = (Transport) i.next();
456                 t.shutdown();
457             }
458         }
459         this.monitorManager.removeMonitor(agent, transports);
460     }
461
462     public void handleAgentsTransportAdded(String JavaDoc agent, TransportInfo info) {
463         if(log.isInfoEnabled()){
464             log.info("Peer transport info resolved: peer="+agent+
465                      ", info="+info.toString()+".");
466         }
467         String JavaDoc mwbName = "mwb";
468         boolean indirect =
469             (info.getTransportInfoType() == TransportType.MWB &&
470              !agent.equals(mwbName) && !myAgentName.equals(mwbName));
471         boolean passive =
472             (info.getTransportInfoType() == TransportType.MWB &&
473              myAgentName.equals(mwbName));
474         if (!this.transportTable.exists(agent, info, indirect)) {
475             Transport t = null;
476             if (indirect) {
477                 Set JavaDoc set = this.transportTable.getTransports(mwbName);
478                 Transport master = null;
479                 if (set != null && !set.isEmpty()) {
480                     master = (Transport) set.toArray()[0];
481                 }
482                 t = new IndirectTransport(info, this.myAgentName, agent,
483                                           this, this.statManager,
484                                           this.selector, master);
485             } else {
486                 t = new Transport(info, this.myAgentName, agent,
487                                   this, this.statManager,
488                                   this.selector, passive);
489             }
490             this.transportTable.addTransport(agent, t);
491             if (indirect ||
492                 transportTable.isLocalType(info.getTransportInfoType())) {
493 // if (transportTable.isLocalType(info.getTransportInfoType())) {
494
this.monitorManager.addTransport(agent, t);
495             }
496             this.transportChanged = true;
497         } else {
498             if(log.isInfoEnabled()) {
499                 log.info("Peer already exists. Doing nothing.");
500             }
501         }
502     }
503
504     public void handleAgentsTransportsAdded(String JavaDoc agent, List JavaDoc infos) {
505         Iterator JavaDoc i = infos.iterator();
506         while (i.hasNext()) {
507             handleAgentsTransportAdded(agent, (TransportInfo) i.next());
508         }
509     }
510
511     public void handleAgentsTransportRemoved(String JavaDoc agent, TransportInfo info)
512     {
513         if(log.isInfoEnabled()){
514             log.info("doHandleAgentsTransportRemoved(): peer="+agent +
515                      ", info="+info.toString()+".");
516
517         }
518
519         boolean indirect =
520             (info.getTransportInfoType() == TransportType.MWB);
521
522         Transport t =
523             this.transportTable.removeTransport(agent,
524                                                 info.getSocketAddress(),
525                                                 indirect);
526
527         if (t != null) {
528             this.monitorManager.removeTransport(agent, t);
529             t.shutdown();
530         }
531
532         this.transportChanged = true;
533     }
534
535     private void mantaMessageReady(CNLMessage message) {
536         SocketAddress JavaDoc remote = null;
537         if (message.isTCP()) {
538             remote = message.getSourceAddress();
539         }
540         if (remote != null && this.transportTable.isPending(remote)) {
541             if(log.isWarnEnabled()){
542                 log.warn("Discarding a message from unidentifed channel (remote = " + remote + ").");
543             }
544         } else {
545             try {
546                 // messageMD5 is the signature optionall created by
547
// the sender. if there is such a signature, pass
548
// also a partial message digest to the post office.
549
byte[] messageMD5 = message.getMessageMD5();
550                 MessageDigest JavaDoc partialMD5 = null;
551                 if (messageMD5 != null) {
552                     partialMD5 = message.getPartialMD5();
553                 }
554
555                 PostOffice post = MantaAgent.getInstance().
556                     getSingletonRepository().getPostOffice();
557                 post.messageArrived(message.buffers()[0], messageMD5,
558                                     partialMD5);
559             } catch (Throwable JavaDoc t) {
560                 if(log.isErrorEnabled()){
561                     log.error("Cannot pass buffer to protocol handler.", t);
562                 }
563             }
564         }
565     }
566
567     private void networkMessageReady(CNLMessage message)
568     {
569         ByteBuffer JavaDoc buf = message.valueAsBuffers()[0];
570         NetworkMessage.create(buf, message.isTCP(), message.getSourceAddress(), message.getDestAddress(), this);
571         IncomingByteBufferPool.getInstance().release(message.buffers()[0]);
572     }
573
574     private Transport decideTransport(MantaAddress address) {
575         String JavaDoc agent = address.getAgentName();
576         Set JavaDoc transports = this.transportTable.getTransports(agent);
577         Transport best;
578
579         // find the best connected transport, otherwise return the
580
// best overall transport. also, if the best transport is an
581
// MWB transport (indirect), try to connect the best
582
// unconnected transport, so that next time, the message will
583
// pass through it.
584
if (transports == null) {
585             return null;
586         }
587         best = findBestTransport(transports, true);
588         if (best == null) {
589             best = findBestTransport(transports, false);
590         } else if (best.getInfo().getTransportInfoType() ==
591                    TransportType.MWB) {
592             Transport secondBest = findBestTransport(transports, false);
593             if (secondBest != null) {
594                 secondBest.createImplsMaybe();
595             }
596         }
597
598         return best;
599     }
600
601     private Transport findBestTransport(Set JavaDoc transports, boolean connectedOnly)
602     {
603         Iterator JavaDoc i = transports.iterator();
604         Transport best = null;
605         TransportType bestType = TransportType.UNKNOWN;
606
607         while (i.hasNext()) {
608             Transport t = (Transport) i.next();
609             TransportType type = t.getInfo().getTransportInfoType();
610             if (connectedOnly) {
611                 if (t.isInitialized() &&
612                     type.getPriority() < bestType.getPriority() &&
613                     this.transportTable.isLocalType(type)) {
614                     best = t;
615                     bestType = type;
616                 }
617             } else {
618                 if (type.getPriority() < bestType.getPriority() &&
619                     this.transportTable.isLocalType(type)) {
620                     best = t;
621                     bestType = type;
622                 }
623             }
624         }
625
626         return best;
627     }
628
629     /* (non-Javadoc)
630       * @see org.mr.core.net.NetworkListener#activityDetected()
631       */

632     public void activityDetected() {
633         // nothing to be done here.
634
}
635
636     public void implShutdown() {
637         // nothing to do here.
638
}
639
640
641     /**
642      * Creates the server sockets for this peer
643      */

644     public void createServerSockets() throws IOException JavaDoc {
645         ConfigManager config = MantaAgent.getInstance().
646             getSingletonRepository().getConfigManager();
647
648         ArrayList JavaDoc transports =
649             config.getConfigurationElements("network.transports.transport");
650
651         int size = transports.size();
652         for (int i = 0; i < size; i++) {
653             ConfigurationElement transProp =
654                 (ConfigurationElement) transports.get(i);
655
656             String JavaDoc ip =
657                 transProp.getSubConfigurationElementByName("ip").getValue();
658             String JavaDoc port =
659                 transProp.getSubConfigurationElementByName("port").getValue();
660             String JavaDoc type =
661                 transProp.getSubConfigurationElementByName("type").getValue();
662             InetAddress JavaDoc add = InetAddress.getByName(ip);
663             TransportType transType =
664                 TransportType.getTransportTypeFromString(type);
665
666             if (transType == TransportType.UNKNOWN) {
667                 // logger is not initialized yet here
668
// System.err.println("NetworkManager ERROR While creating " +
669
// "server socket: Unknown transport type: "
670
// + type);
671
StartupLogger.log.error("Error while creating " +
672                                    "server socket: Unknown transport type: "
673                                    + type, "NetworkManager");
674                 continue;
675             }
676             int minPort = 0;
677             int maxPort = 0;
678             if(port.indexOf("-") != -1) {
679                 minPort =
680                     Integer.parseInt(port.substring(0, port.indexOf("-")));
681                 maxPort =
682                     Integer.parseInt(port.substring(port.indexOf("-") + 1));
683             } else {
684                 minPort = maxPort = Integer.parseInt(port);
685             }
686             try {
687                 createServer(add, minPort, maxPort, transType);
688             } catch (IOException JavaDoc e) {
689 // System.err.println("NetworkManager ERROR Could not create " +
690
// type + " server socket on " + ip + ":" +
691
// port + ": " + e.getMessage());
692
StartupLogger.log.error("NetworkManager ERROR Could not " +
693                                         "create " + type +
694                                         " server socket on " + ip + ":" +
695                                         port + ": " + e.getMessage(),
696                                         "NetworkManager");
697             }
698         }
699     }
700
701     public void setMyAgentName(String JavaDoc myAgentName) {
702         this.myAgentName = myAgentName;
703         if(log.isInfoEnabled()){
704             log.info("Local MantaRay peer name is '" + this.myAgentName + "'"+".");
705         }
706     }
707     protected static boolean isTcpNoDelay() {
708         return tcpNoDelay;
709     }
710 }
711
Popular Tags