1 48 49 package com.caucho.hessian.mux; 50 51 import java.io.IOException ; 52 import java.io.InputStream ; 53 import java.io.OutputStream ; 54 55 58 public class MuxServer { 59 private Object READ_LOCK = new Object (); 60 private Object WRITE_LOCK = new Object (); 61 62 private InputStream is; 63 private OutputStream os; 64 private boolean isClient; 65 66 private transient boolean isClosed; 67 68 private boolean inputReady[] = new boolean[4]; 70 71 private boolean isReadLocked; 73 private boolean isWriteLocked; 75 76 79 public MuxServer() 80 { 81 } 82 83 90 public MuxServer(InputStream is, OutputStream os, boolean isClient) 91 { 92 init(is, os, isClient); 93 } 94 95 102 public void init(InputStream is, OutputStream os, boolean isClient) 103 { 104 this.is = is; 105 this.os = os; 106 this.isClient = isClient; 107 } 108 109 113 public InputStream getInputStream() 114 { 115 return is; 116 } 117 118 122 public OutputStream getOutputStream() 123 { 124 return os; 125 } 126 127 130 public boolean startCall(MuxInputStream in, MuxOutputStream out) 131 throws IOException 132 { 133 int channel = isClient ? 2 : 3; 134 135 return startCall(channel, in, out); 136 } 137 138 141 public boolean startCall(int channel, MuxInputStream in, MuxOutputStream out) 142 throws IOException 143 { 144 147 in.init(this, channel); 148 out.init(this, channel); 149 150 return true; 151 } 152 153 156 public boolean readRequest(MuxInputStream in, MuxOutputStream out) 157 throws IOException 158 { 159 int channel = isClient ? 3 : 2; 160 161 in.init(this, channel); 162 out.init(this, channel); 163 164 if (readChannel(channel) != null) { 165 in.setInputStream(is); 166 in.readToData(false); 167 return true; 168 } 169 else 170 return false; 171 } 172 173 180 OutputStream writeChannel(int channel) 181 throws IOException 182 { 183 while (os != null) { 184 boolean canWrite = false; 185 synchronized (WRITE_LOCK) { 186 if (! isWriteLocked) { 187 isWriteLocked = true; 188 canWrite = true; 189 } 190 else { 191 try { 192 WRITE_LOCK.wait(5000); 193 } catch (Exception e) { 194 } 195 } 196 } 197 198 if (canWrite) { 199 os.write('C'); 200 os.write(channel >> 8); 201 os.write(channel); 202 203 return os; 204 } 205 } 206 207 return null; 208 } 209 210 void yield(int channel) 211 throws IOException 212 { 213 os.write('Y'); 214 freeWriteLock(); 215 } 216 217 void flush(int channel) 218 throws IOException 219 { 220 os.write('Y'); 221 os.flush(); 222 freeWriteLock(); 223 } 224 225 void close(int channel) 226 throws IOException 227 { 228 if (os != null) { 229 os.write('Q'); 230 os.flush(); 231 freeWriteLock(); 232 } 233 } 234 235 238 void freeWriteLock() 239 { 240 synchronized (WRITE_LOCK) { 241 isWriteLocked = false; 242 WRITE_LOCK.notifyAll(); 243 } 244 } 245 246 253 InputStream readChannel(int channel) 254 throws IOException 255 { 256 while (! isClosed) { 257 if (inputReady[channel]) { 258 inputReady[channel] = false; 259 return is; 260 } 261 262 boolean canRead = false; 263 synchronized (READ_LOCK) { 264 if (! isReadLocked) { 265 isReadLocked = true; 266 canRead = true; 267 } 268 else { 269 try { 270 READ_LOCK.wait(5000); 271 } catch (Exception e) { 272 } 273 } 274 } 275 276 if (canRead) { 277 try { 278 readData(); 279 } catch (IOException e) { 280 close(); 281 } 282 } 283 } 284 285 return null; 286 } 287 288 boolean getReadLock() 289 { 290 synchronized (READ_LOCK) { 291 if (! isReadLocked) { 292 isReadLocked = true; 293 return true; 294 } 295 else { 296 try { 297 READ_LOCK.wait(5000); 298 } catch (Exception e) { 299 } 300 } 301 } 302 303 return false; 304 } 305 306 309 void freeReadLock() 310 { 311 synchronized (READ_LOCK) { 312 isReadLocked = false; 313 READ_LOCK.notifyAll(); 314 } 315 } 316 317 320 private void readData() 321 throws IOException 322 { 323 while (! isClosed) { 324 int code = is.read(); 325 326 switch (code) { 327 case ' ': 328 case '\t': 329 case '\n': 330 case '\r': 331 break; 332 333 case 'C': { 334 int channel = (is.read() << 8) + is.read(); 335 336 inputReady[channel] = true; 337 return; 338 } 339 340 case 'E': { 341 int channel = (is.read() << 8) + is.read(); 342 int status = (is.read() << 8) + is.read(); 343 344 inputReady[channel] = true; 345 346 return; 347 } 348 349 case -1: 350 close(); 351 return; 352 353 default: 354 close(); 356 return; 357 } 358 } 359 360 return; 361 } 362 363 366 public void close() 367 throws IOException 368 { 369 isClosed = true; 370 371 OutputStream os = this.os; 372 this.os = null; 373 374 InputStream is = this.is; 375 this.is = null; 376 377 if (os != null) 378 os.close(); 379 380 if (is != null) 381 is.close(); 382 } 383 } 384 | Popular Tags |