KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > plugin > implementation > transport > tcp > TransportImpl


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  *
19  * Copyright 2001 Dan Greff
20  */

21  package com.presumo.jms.plugin.implementation.transport.tcp;
22
23 import java.net.Socket JavaDoc;
24 import java.net.InetAddress JavaDoc;
25 import java.io.InputStream JavaDoc;
26 import java.io.OutputStream JavaDoc;
27 import java.io.BufferedOutputStream JavaDoc;
28 import java.io.BufferedInputStream JavaDoc;
29 import java.io.DataInputStream JavaDoc;
30 import java.io.DataOutputStream JavaDoc;
31 import java.io.IOException JavaDoc;
32
33 import com.presumo.jms.message.JmsMessage;
34 import com.presumo.jms.message.MessageEncoder;
35 import com.presumo.jms.resources.Resources;
36 import com.presumo.util.log.Logger;
37 import com.presumo.util.log.LoggerFactory;
38
39 import com.presumo.jms.plugin.transport.Transport;
40
41
42 /**
43  * Implementation of Transport using a TCP client Socket.
44  */

45 public class TransportImpl implements Transport
46 {
47   private final static int CLOSE_SOCKET = -45; // old football number ;)
48
private final static int CLOSE_ACK = -46;
49   
50   private final Socket JavaDoc socket;
51   private final DataInputStream JavaDoc in;
52   private final DataOutputStream JavaDoc out;
53   
54   private final Object JavaDoc receiveLock = new String JavaDoc("TransportImpl_Receive_Lock");
55   
56   private boolean closed = false;
57   
58   private String JavaDoc host;
59   private int port;
60   
61   /**
62    * Creates a Socket connection to the given server using the specified
63    * local port.
64    */

65   public TransportImpl(String JavaDoc host, int port)
66     throws IOException JavaDoc
67   {
68     logger.entry("TransportImpl<init>", host, new Integer JavaDoc(port));
69     
70     this.host = host;
71     this.port = port;
72     InetAddress JavaDoc server = InetAddress.getByName(host);
73     this.socket = new Socket JavaDoc(server, port);
74     this.socket.setSoLinger(false, 0);
75     this.in = new DataInputStream JavaDoc(socket.getInputStream());
76     this.out = new DataOutputStream JavaDoc(new BufferedOutputStream JavaDoc(socket.getOutputStream(),
77                                                              5024));
78                                                              
79
80     logger.exit("TransportImpl<init>");
81   }
82   
83   /**
84    * Used by the ServerTransport impl to create a a Transport
85    * to handle the client connection.
86    */

87   TransportImpl(Socket JavaDoc clientSocket) throws IOException JavaDoc
88   {
89     logger.entry("TransportImple<init>(Socket)");
90     this.host = clientSocket.getInetAddress().getHostName();
91     this.port = clientSocket.getPort();
92     this.socket = clientSocket;
93     this.in = new DataInputStream JavaDoc(clientSocket.getInputStream());
94     this.out = new DataOutputStream JavaDoc(new BufferedOutputStream JavaDoc(clientSocket.getOutputStream(),
95                                                              5024));
96     logger.info("PJMSI5002", host + ":" +port);
97     logger.exit("TransportImpl<init>(Socket");
98   }
99
100   public String JavaDoc getRemoteID()
101   {
102     return host + ":" + port;
103   }
104   
105   public synchronized void sendMessages(JmsMessage [] messages)
106     throws IOException JavaDoc
107   {
108     if (closed)
109       throw new IOException JavaDoc("Socket closed");
110       
111     out.writeInt(messages.length);
112     MessageEncoder.encode(messages, out);
113     out.flush();
114   }
115   
116   public JmsMessage [] receiveMessages()
117     throws IOException JavaDoc
118   {
119     if (closed)
120       throw new IOException JavaDoc("Socket closed");
121     JmsMessage [] retval = null;
122     synchronized (receiveLock) {
123       try {
124         int size = in.readInt();
125         if (size < 0) {
126           if (size == CLOSE_SOCKET) {
127             logger.debug("remote close socket request received");
128             closeSocket();
129             throw new IOException JavaDoc("Socket closed");
130           } if (size == CLOSE_ACK) {
131             logger.debug("close ACK received");
132             closed = true;
133             socket.close();
134             throw new IOException JavaDoc("Socket closed");
135           }
136           else
137             throw new IOException JavaDoc("Input stream corrupted");
138         }
139         retval = new JmsMessage[size];
140         for (int i=0; i < size; ++i)
141           retval[i] = MessageEncoder.decode(in);
142           
143       } catch (IOException JavaDoc ioe) {
144         if (!closed) {
145           closed = true;
146           try { socket.close(); } catch (IOException JavaDoc ioe2) {}
147         }
148         throw ioe;
149       }
150     }
151     return retval;
152   }
153
154   
155   /**
156    *
157    */

158   public synchronized void close()
159   {
160     if (! closed) {
161       closed = true;
162       logger.debug("Initiating close");
163       try {
164         out.writeInt(CLOSE_SOCKET);
165         out.flush();
166         logger.debug("Waiting for close acknowledgement");
167         waitForAckAndClose();
168       } catch (IOException JavaDoc ioe) {
169         ioe.printStackTrace();
170       }
171     }
172   }
173
174   private void waitForAckAndClose()
175   {
176     try {
177       while (true)
178         receiveMessages();
179     } catch (IOException JavaDoc ioe) {}
180   }
181
182   private void closeSocket() throws IOException JavaDoc
183   {
184     logger.debug("Sending close acknowledgement");
185     out.writeInt(CLOSE_ACK);
186     out.flush();
187     int bytesread = in.read();
188     while (bytesread != -1) {
189       logger.debug("Did not reach end of stream yet");
190       bytesread = in.read();
191     }
192     closed = true;
193     logger.debug("Closing the socket after sending ACK");
194     socket.close();
195   }
196   
197   ////////////////////////////// Misc stuff ////////////////////////////////
198
private static Logger logger =
199     LoggerFactory.getLogger(TransportImpl.class, Resources.getBundle());
200   ///////////////////////////////////////////////////////////////////////////
201
}
202
Popular Tags