1 package hudson.remoting; 2 3 import java.io.EOFException ; 4 import java.io.IOException ; 5 import java.io.InputStream ; 6 import java.io.ObjectInputStream ; 7 import java.io.ObjectOutputStream ; 8 import java.io.OutputStream ; 9 import java.lang.reflect.Proxy ; 10 import java.util.Hashtable ; 11 import java.util.Map ; 12 import java.util.Vector ; 13 import java.util.concurrent.ExecutionException ; 14 import java.util.concurrent.Executor ; 15 import java.util.logging.Level ; 16 import java.util.logging.Logger ; 17 18 69 public class Channel implements VirtualChannel { 70 private final ObjectInputStream ois; 71 private final ObjectOutputStream oos; 72 private final String name; 73 final Executor executor; 74 75 79 private volatile boolean closed = false; 80 81 private volatile boolean closing = false; 82 83 final Map <Integer ,Request<?,?>> pendingCalls = new Hashtable <Integer ,Request<?,?>>(); 84 85 88 final ImportedClassLoaderTable importedClassLoaders = new ImportedClassLoaderTable(this); 89 90 93 private final ExportTable<Object > exportedObjects = new ExportTable<Object >(); 94 95 98 private final Vector <Listener> listeners = new Vector <Listener>(); 99 100 public Channel(String name, Executor exec, InputStream is, OutputStream os) throws IOException { 101 this(name,exec,is,os,null); 102 } 103 104 121 public Channel(String name, Executor exec, InputStream is, OutputStream os, OutputStream header) throws IOException { 122 this.name = name; 123 this.executor = exec; 124 125 os.write(new byte[]{0,0,0,0}); this.oos = new ObjectOutputStream (os); 133 oos.flush(); 135 { int ch; 137 int count=0; 138 139 while(true) { 140 ch = is.read(); 141 if(ch==-1) { 142 throw new EOFException ("unexpected stream termination"); 143 } 144 if(ch==0) { 145 count++; 146 if(count==4) break; 147 } else { 148 if(header!=null) 149 header.write(ch); 150 count=0; 151 } 152 } 153 } 154 155 this.ois = new ObjectInputStream (is); 156 new ReaderThread(name).start(); 157 } 158 159 162 public static abstract class Listener { 163 170 public void onClosed(Channel channel, IOException cause) {} 171 } 172 173 180 synchronized void send(Command cmd) throws IOException { 181 if(closed) 182 throw new IOException ("already closed"); 183 if(logger.isLoggable(Level.FINE)) 184 logger.fine("Send "+cmd); 185 Channel old = Channel.setCurrent(this); 186 try { 187 oos.writeObject(cmd); 188 oos.flush(); } finally { 190 Channel.setCurrent(old); 191 } 192 oos.reset(); 193 } 194 195 198 public <T> T export(Class <T> type, T instance) { 199 return export(type,instance,true); 200 } 201 202 212 <T> T export(Class <T> type, T instance, boolean userProxy) { 213 if(instance==null) 214 return null; 215 216 final int id = export(instance); 218 return type.cast(Proxy.newProxyInstance( type.getClassLoader(), new Class []{type}, 219 new RemoteInvocationHandler(id,userProxy))); 220 } 221 222 int export(Object instance) { 223 return exportedObjects.export(instance); 224 } 225 226 Object getExportedObject(int oid) { 227 return exportedObjects.get(oid); 228 } 229 230 void unexport(int id) { 231 exportedObjects.unexport(id); 232 } 233 234 237 public <V,T extends Throwable > 238 V call(Callable<V,T> callable) throws IOException , T, InterruptedException { 239 try { 240 UserResponse<V,T> r = new UserRequest<V,T>(this, callable).call(this); 241 return r.retrieve(this, UserRequest.getClassLoader(callable)); 242 243 } catch (ClassNotFoundException e) { 245 IOException x = new IOException ("Remote call failed"); 246 x.initCause(e); 247 throw x; 248 } catch (Error e) { 249 IOException x = new IOException ("Remote call failed"); 250 x.initCause(e); 251 throw x; 252 } 253 } 254 255 258 public <V,T extends Throwable > 259 Future<V> callAsync(final Callable<V,T> callable) throws IOException { 260 final Future<UserResponse<V,T>> f = new UserRequest<V,T>(this, callable).callAsync(this); 261 return new FutureAdapter<V,UserResponse<V,T>>(f) { 262 protected V adapt(UserResponse<V,T> r) throws ExecutionException { 263 try { 264 return r.retrieve(Channel.this, UserRequest.getClassLoader(callable)); 265 } catch (Throwable t) { throw new ExecutionException (t); 267 } 268 } 269 }; 270 } 271 272 275 private synchronized void terminate(IOException e) { 276 closed = true; 277 try { 278 synchronized(pendingCalls) { 279 for (Request<?,?> req : pendingCalls.values()) 280 req.abort(e); 281 pendingCalls.clear(); 282 } 283 } finally { 284 notifyAll(); 285 286 for (Listener l : listeners.toArray(new Listener[listeners.size()])) 287 l.onClosed(this,e); 288 } 289 } 290 291 296 public void addListener(Listener l) { 297 listeners.add(l); 298 } 299 300 306 public boolean removeListener(Listener l) { 307 return listeners.remove(l); 308 } 309 310 318 public synchronized void join() throws InterruptedException { 319 while(!closed) 320 wait(); 321 } 322 323 330 private static final class CloseCommand extends Command { 331 protected void execute(Channel channel) { 332 try { 333 channel.close(); 334 } catch (IOException e) { 335 logger.log(Level.SEVERE,"close command failed on "+channel.name,e); 336 logger.log(Level.INFO,"close command created at",createdAt); 337 } 338 } 339 340 public String toString() { 341 return "close"; 342 } 343 } 344 345 348 public synchronized void close() throws IOException { 349 if(closed) return; 351 if(closing) return; 352 closing=true; 353 354 send(new CloseCommand()); 355 oos.close(); 356 357 terminate(null); 359 } 360 361 private final class ReaderThread extends Thread { 362 public ReaderThread(String name) { 363 super("Channel reader thread: "+name); 364 } 365 366 public void run() { 367 try { 368 while(!closed) { 369 try { 370 Command cmd = null; 371 Channel old = Channel.setCurrent(Channel.this); 372 try { 373 cmd = (Command)ois.readObject(); 374 } finally { 375 Channel.setCurrent(old); 376 } 377 if(logger.isLoggable(Level.FINE)) 378 logger.fine("Received "+cmd); 379 cmd.execute(Channel.this); 380 } catch (ClassNotFoundException e) { 381 logger.log(Level.SEVERE, "Unable to read a command",e); 382 } 383 } 384 ois.close(); 385 } catch (IOException e) { 386 logger.log(Level.SEVERE, "I/O error in channel "+name,e); 387 terminate(e); 388 } 389 } 390 } 391 392 static Channel setCurrent(Channel channel) { 393 Channel old = CURRENT.get(); 394 CURRENT.set(channel); 395 return old; 396 } 397 398 406 public static Channel current() { 407 return CURRENT.get(); 408 } 409 410 413 private static final ThreadLocal <Channel> CURRENT = new ThreadLocal <Channel>(); 414 415 private static final Logger logger = Logger.getLogger(Channel.class.getName()); 416 } 417 | Popular Tags |