KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > udp > UdpTransportServer


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.udp;
19
20 import java.io.IOException JavaDoc;
21 import java.net.InetSocketAddress JavaDoc;
22 import java.net.SocketAddress JavaDoc;
23 import java.net.URI JavaDoc;
24 import java.util.HashMap JavaDoc;
25 import java.util.Map JavaDoc;
26
27 import org.apache.activemq.command.BrokerInfo;
28 import org.apache.activemq.command.Command;
29 import org.apache.activemq.openwire.OpenWireFormat;
30 import org.apache.activemq.transport.CommandJoiner;
31 import org.apache.activemq.transport.InactivityMonitor;
32 import org.apache.activemq.transport.Transport;
33 import org.apache.activemq.transport.TransportListener;
34 import org.apache.activemq.transport.TransportServer;
35 import org.apache.activemq.transport.TransportServerSupport;
36 import org.apache.activemq.transport.reliable.ReliableTransport;
37 import org.apache.activemq.transport.reliable.ReplayStrategy;
38 import org.apache.activemq.transport.reliable.Replayer;
39 import org.apache.activemq.util.ServiceStopper;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42
43 /**
44  * A UDP based implementation of {@link TransportServer}
45  *
46  * @version $Revision: 464110 $
47  */

48
49 public class UdpTransportServer extends TransportServerSupport {
50     private static final Log log = LogFactory.getLog(UdpTransportServer.class);
51
52     private UdpTransport serverTransport;
53     private ReplayStrategy replayStrategy;
54     private Transport configuredTransport;
55     private boolean usingWireFormatNegotiation;
56     private Map JavaDoc transports = new HashMap JavaDoc();
57
58
59     public UdpTransportServer(URI JavaDoc connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
60         super(connectURI);
61         this.serverTransport = serverTransport;
62         this.configuredTransport = configuredTransport;
63         this.replayStrategy = replayStrategy;
64     }
65
66     public String JavaDoc toString() {
67         return "UdpTransportServer@" + serverTransport;
68     }
69
70     public void run() {
71     }
72
73     public UdpTransport getServerTransport() {
74         return serverTransport;
75     }
76
77     public void setBrokerInfo(BrokerInfo brokerInfo) {
78     }
79
80     protected void doStart() throws Exception JavaDoc {
81         log.info("Starting " + this);
82
83         configuredTransport.setTransportListener(new TransportListener() {
84             public void onCommand(Object JavaDoc o) {
85                 final Command command = (Command) o;
86                 processInboundConnection(command);
87             }
88
89             public void onException(IOException JavaDoc error) {
90                 log.error("Caught: " + error, error);
91             }
92
93             public void transportInterupted() {
94             }
95
96             public void transportResumed() {
97             }
98         });
99         configuredTransport.start();
100     }
101
102     protected void doStop(ServiceStopper stopper) throws Exception JavaDoc {
103         configuredTransport.stop();
104     }
105
106     protected void processInboundConnection(Command command) {
107         DatagramEndpoint endpoint = (DatagramEndpoint) command.getFrom();
108         if (log.isDebugEnabled()) {
109             log.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command);
110         }
111         Transport transport = null;
112         synchronized (transports) {
113             transport = (Transport) transports.get(endpoint);
114             if (transport == null) {
115                 if (usingWireFormatNegotiation && !command.isWireFormatInfo()) {
116                     log.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command);
117                 }
118                 else {
119                     if (log.isDebugEnabled()) {
120                         log.debug("Creating a new UDP server connection");
121                     }
122                     try {
123                         transport = createTransport(command, endpoint);
124                         transport = configureTransport(transport);
125                         transports.put(endpoint, transport);
126                     }
127                     catch (IOException JavaDoc e) {
128                         log.error("Caught: " + e, e);
129                         getAcceptListener().onAcceptError(e);
130                     }
131                 }
132             }
133             else {
134                 log.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command);
135             }
136         }
137     }
138
139     protected Transport configureTransport(Transport transport) {
140         transport = new InactivityMonitor(transport);
141         getAcceptListener().onAccept(transport);
142         return transport;
143     }
144
145     protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException JavaDoc {
146         if (endpoint == null) {
147             throw new IOException JavaDoc("No endpoint available for command: " + command);
148         }
149         final SocketAddress JavaDoc address = endpoint.getAddress();
150         final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
151         final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
152
153         final ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
154         Replayer replayer = reliableTransport.getReplayer();
155         reliableTransport.setReplayStrategy(replayStrategy);
156         
157         // Joiner must be on outside as the inbound messages must be processed by the reliable transport first
158
return new CommandJoiner(reliableTransport, connectionWireFormat) {
159             public void start() throws Exception JavaDoc {
160                 super.start();
161                 reliableTransport.onCommand(command);
162             }
163         };
164
165
166         
167         /**
168         final WireFormatNegotiator wireFormatNegotiator = new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport
169                 .getMinmumWireFormatVersion()) {
170
171             public void start() throws Exception {
172                 super.start();
173                 log.debug("Starting a new server transport: " + this + " with command: " + command);
174                 onCommand(command);
175             }
176
177             // lets use the specific addressing of wire format
178             protected void sendWireFormat(WireFormatInfo info) throws IOException {
179                 log.debug("#### we have negotiated the wireformat; sending a wireformat to: " + address);
180                 transport.oneway(info, address);
181             }
182         };
183         return wireFormatNegotiator;
184         */

185     }
186     
187     public InetSocketAddress JavaDoc getSocketAddress() {
188         return serverTransport.getLocalSocketAddress();
189     }
190 }
191
Popular Tags