1 7 package org.jboss.remoting.stream; 8 9 import java.io.IOException ; 10 import java.io.InputStream ; 11 import org.jboss.logging.Logger; 12 import org.jboss.remoting.Client; 13 import org.jboss.remoting.InvokerLocator; 14 15 30 public class StreamHandler extends InputStream { 32 private InvokerLocator streamServerLocator = null; 33 private Client streamClient = null; 34 35 private static final Logger log = Logger.getLogger(StreamHandler.class); 36 37 public static final String READ = "read()"; 39 public static final String AVAILABLE = "available()"; 40 public static final String CLOSE = "close()"; 41 public static final String RESET = "reset()"; 42 public static final String MARKSUPPORTED = "markSupported()"; 43 public static final String MARKREADLIMIT = "mark(int readlimit)"; 44 public static final String SKIP = "skip(long n)"; 45 public static final String READBYTEARRAY = "read(byte b[])"; 46 public static final String READOFFSET = "read(byte b[], int off, int len)"; 47 48 55 public StreamHandler(String locatorURL) throws Exception 57 { 58 streamServerLocator = new InvokerLocator(locatorURL); 59 streamClient = new Client(streamServerLocator); 60 } 61 62 77 public int available() throws IOException 78 { 79 int readInt = 0; 80 81 try 82 { 83 Integer retInt = (Integer ) streamClient.invoke(new StreamCallPayload(AVAILABLE)); 84 if(retInt != null) 85 { 86 readInt = retInt.intValue(); 87 } 88 } 89 catch(Throwable throwable) 90 { 91 log.error("Error getting available from client stream.", throwable); 92 throw new IOException (throwable.getMessage()); 93 } 94 return readInt; 95 } 96 97 106 public void close() throws IOException 107 { 108 try 109 { 110 streamClient.invoke(new StreamCallPayload(CLOSE)); 111 } 112 catch(Throwable throwable) 113 { 114 log.error("Error closing client stream.", throwable); 115 throw new IOException (throwable.getMessage()); 116 } 117 } 118 119 164 public synchronized void reset() throws IOException 165 { 166 try 167 { 168 streamClient.invoke(new StreamCallPayload(RESET)); 169 } 170 catch(Throwable throwable) 171 { 172 log.error("Error reseting client stream.", throwable); 173 throw new IOException (throwable.getMessage()); 174 } 175 } 176 177 189 public boolean markSupported() 190 { 191 boolean supported = false; 192 193 try 194 { 195 Boolean bSupported = (Boolean ) streamClient.invoke(new StreamCallPayload(MARKSUPPORTED)); 196 if(bSupported != null) 197 { 198 supported = bSupported.booleanValue(); 199 } 200 } 201 catch(Throwable throwable) 202 { 203 log.error("Error getting markSupported from client stream.", throwable); 204 throw new RuntimeException (throwable.getMessage(), throwable); 205 } 206 return supported; 207 } 208 209 233 public synchronized void mark(int readlimit) 234 { 235 try 236 { 237 StreamCallPayload payload = new StreamCallPayload(MARKREADLIMIT); 238 payload.setParams(new Object []{new Integer (readlimit)}); 239 streamClient.invoke(payload); 240 } 241 catch(Throwable throwable) 242 { 243 log.error("Error marking with read limit on client stream.", throwable); 244 throw new RuntimeException (throwable.getMessage(), throwable); 245 } 246 } 247 248 266 public long skip(long n) throws IOException 267 { 268 long numSkipped = -1; 269 270 try 271 { 272 StreamCallPayload payload = new StreamCallPayload(SKIP); 273 payload.setParams(new Object []{new Long (n)}); 274 Long ret = (Long ) streamClient.invoke(payload); 275 if(ret != null) 276 { 277 numSkipped = ret.longValue(); 278 } 279 } 280 catch(Throwable throwable) 281 { 282 log.error("Error skipping on client stream.", throwable); 283 throw new IOException (throwable.getMessage()); 284 } 285 286 return numSkipped; 287 } 288 289 326 public int read(byte b[]) throws IOException 327 { 328 if(b == null) 329 { 330 throw new NullPointerException ("can not read for a null byte array."); 331 } 332 else 333 { 334 if(b.length == 0) 335 { 336 return 0; 337 } 338 } 339 340 int retByte = -1; 341 342 try 343 { 344 StreamCallPayload payload = new StreamCallPayload(READBYTEARRAY); 345 payload.setParams(new Object []{b}); 346 StreamCallPayload ret = (StreamCallPayload) streamClient.invoke(payload); 347 if(ret != null) 348 { 349 Object [] retVals = ret.getParams(); 350 byte[] retBytes = (byte[]) retVals[0]; 351 Integer retInt = (Integer ) retVals[1]; 352 353 retByte = retInt.intValue(); 354 355 if(retByte != -1) 356 { 357 System.arraycopy(retBytes, 0, b, 0, retByte); 358 } 359 } 360 } 361 catch(Throwable throwable) 362 { 363 log.error("Error reading from client stream.", throwable); 364 throw new IOException (throwable.getMessage()); 365 } 366 367 return retByte; 368 } 369 370 432 public int read(byte b[], int off, int len) throws IOException 433 { 434 if(b == null) 435 { 436 throw new NullPointerException ("can not read for a null byte array."); 437 } 438 else 439 { 440 if(b.length == 0) 441 { 442 return 0; 443 } 444 else 445 { 446 if(off < 0 || len < 0 || off + len > b.length) 447 { 448 throw new IndexOutOfBoundsException ("Either off or len is negative or off+len is greater than length of b."); 449 } 450 if(len == 0) 451 { 452 return 0; 453 } 454 } 455 } 456 457 int retByte = -1; 458 459 try 460 { 461 byte[] payloadArray = new byte[len]; 462 StreamCallPayload payload = new StreamCallPayload(READBYTEARRAY); 463 payload.setParams(new Object []{payloadArray}); 464 StreamCallPayload ret = (StreamCallPayload) streamClient.invoke(payload); 465 if(ret != null) 466 { 467 Object [] retVals = ret.getParams(); 468 byte[] retBytes = (byte[]) retVals[0]; 469 Integer retInt = (Integer ) retVals[1]; 470 471 retByte = retInt.intValue(); 472 473 System.arraycopy(retBytes, 0, b, off, retByte); 474 } 475 } 476 catch(Throwable throwable) 477 { 478 log.error("Error reading with offset from client stream.", throwable); 479 throw new IOException (throwable.getMessage()); 480 } 481 482 return retByte; 483 } 484 485 499 public int read() throws IOException 500 { 501 int readInt = -1; 502 503 try 504 { 505 Integer retInt = (Integer ) streamClient.invoke(new StreamCallPayload(READ)); 506 if(retInt != null) 507 { 508 readInt = retInt.intValue(); 509 } 510 } 511 catch(Throwable throwable) 512 { 513 log.error("Error reading from client stream.", throwable); 514 throw new IOException (throwable.getMessage()); 515 } 516 return readInt; 517 } 518 519 } | Popular Tags |