1 26 27 package org.objectweb.jonathan.libs.protocols.tcpip; 28 29 import java.io.IOException ; 30 import java.util.Properties ; 31 32 import org.objectweb.jonathan.apis.kernel.Context; 33 import org.objectweb.jonathan.apis.kernel.ContextFactory; 34 import org.objectweb.jonathan.apis.kernel.InternalException; 35 import org.objectweb.jonathan.apis.kernel.JonathanException; 36 import org.objectweb.jonathan.apis.presentation.EndOfMessageException; 37 import org.objectweb.jonathan.apis.presentation.Marshaller; 38 import org.objectweb.jonathan.apis.presentation.MarshallerFactory; 39 import org.objectweb.jonathan.apis.protocols.CommunicationException; 40 import org.objectweb.jonathan.apis.protocols.Protocol; 41 import org.objectweb.jonathan.apis.protocols.ProtocolGraph; 42 import org.objectweb.jonathan.apis.protocols.ReplyInterface; 43 import org.objectweb.jonathan.apis.protocols.SessionIdentifier; 44 import org.objectweb.jonathan.apis.protocols.Session_High; 45 import org.objectweb.jonathan.apis.protocols.Session_Low; 46 import org.objectweb.jonathan.apis.protocols.ip.IpConnection; 47 import org.objectweb.jonathan.apis.protocols.ip.IpSession; 48 import org.objectweb.jonathan.apis.protocols.ip.IpSessionIdentifier; 49 import org.objectweb.jonathan.apis.protocols.ip.TcpIpConnectionMgr; 50 import org.objectweb.jonathan.apis.protocols.ip.TcpIpSrvConnectionFactory; 51 import org.objectweb.jonathan.apis.resources.Chunk; 52 import org.objectweb.jonathan.apis.resources.ChunkFactory; 53 import org.objectweb.jonathan.apis.resources.ChunkProvider; 54 import org.objectweb.jonathan.apis.resources.Scheduler; 55 56 import org.objectweb.util.monolog.api.BasicLevel; 57 58 63 public class TcpIpProtocol implements Protocol { 64 protected Scheduler scheduler; 65 protected ChunkFactory chunk_factory; 66 protected MarshallerFactory marshaller_factory; 67 protected ContextFactory context_factory; 68 69 75 public boolean verbose; 76 77 78 SrvSessionFactory[] srv_session_factories = new SrvSessionFactory[4]; 79 int num_srv_sessions = 0; 80 81 82 protected TcpIpConnectionMgr connection_mgr; 83 84 94 public TcpIpProtocol(TcpIpConnectionMgr connection_mgr, Scheduler scheduler, 95 ChunkFactory chunk_factory, MarshallerFactory mf, 96 ContextFactory context_factory) { 97 this.scheduler = scheduler; 98 this.chunk_factory = chunk_factory; 99 this.connection_mgr = connection_mgr; 100 this.marshaller_factory = mf; 101 this.context_factory = context_factory; 102 verbose = false; 103 } 104 105 109 public void setVerbose(boolean verbose) { 110 this.verbose = verbose; 111 } 112 113 117 public boolean isAnInvocationProtocol() { 118 return false; 119 } 120 121 127 public ProtocolGraph createProtocolGraph(ProtocolGraph[] lower, Context hints) { 128 try { 129 int port = ((Integer )hints.getValue("port", (char)0)).intValue(); 130 return new TcpIpProtocolGraph(port); 131 } catch (ClassCastException cce) { 132 return new TcpIpProtocolGraph(); 133 } 134 } 135 136 142 public IpSessionIdentifier newSessionIdentifier(String host, 143 int port) { 144 return new CltSessionIdentifier(host,port); 145 } 146 147 public SessionIdentifier createSessionIdentifier(Properties info, SessionIdentifier[] next) throws JonathanException { 148 String host = ""; 149 int port = 0; 150 try { 151 host = (String )info.get("hostname"); 152 port = ((Integer )info.get("port")).intValue() & 0xFFFF; 153 } catch (ClassCastException cce) { 154 throw new JonathanException("Unable to find relevant TCP/IP information (host, port)"); 155 } 156 157 return new CltSessionIdentifier(host, port); 158 } 159 160 168 public boolean isLocal(IpSessionIdentifier tcp_session_id) { 169 String hostname = 170 connection_mgr.getCanonicalHostName(tcp_session_id.hostname); 171 tcp_session_id.hostname = hostname; 172 return isLocal(hostname,tcp_session_id.port); 173 } 174 175 183 boolean isLocal(String host,int port) { 184 synchronized(srv_session_factories) { 185 for (int i = 0; i < num_srv_sessions; i++) { 186 if (srv_session_factories[i].session_id.port == port && 187 srv_session_factories[i].session_id.hostname.equals(host)) { 188 return true; 189 } 190 } 191 return false; 192 } 193 } 194 195 void remove(SrvSessionFactory factory) { 197 int i = 0; 198 while(i < num_srv_sessions && srv_session_factories[i] != factory) { 199 i++; 200 } 201 if (i < num_srv_sessions) { 202 num_srv_sessions--; 203 System.arraycopy(srv_session_factories,i+1, 204 srv_session_factories,i,num_srv_sessions - i); 205 srv_session_factories[num_srv_sessions] = null; 206 } 207 } 208 209 void send(Marshaller message,IpConnection connection) 210 throws IOException { 211 Chunk first = message.getState(), portion = null; 212 int size = 0, len; 213 boolean onechunk = true; 214 Chunk c = first; 215 216 while (c != null) { 217 len = c.top - c.offset; 218 if (len != 0) { 219 if (size != 0) { 220 onechunk = false; 221 } else { 222 portion = c; 223 } 224 size += len; 225 } 226 c = c.next; 227 } 228 if (onechunk) { 229 if (portion != null) { 230 if ((LoggerProvider.send_logger != null) 232 &&(LoggerProvider.send_logger.isLoggable(BasicLevel.DEBUG))) { 233 String str = "TCP a: "; 234 for (int i = 0; i < portion.top - portion.offset; i++) { 235 str = str + portion.data[i] + " "; 236 } 237 LoggerProvider.send_logger.log(BasicLevel.DEBUG,str); 238 } 239 connection.emit(portion); 241 } 242 message.close(); 243 } else { 244 try { 245 portion = chunk_factory.newChunk(size); 246 int off = portion.offset; 247 c = first; 248 while (c != null) { 249 if ((len = c.top - c.offset) > 0) { 250 System.arraycopy(c.data,c.offset,portion.data,off,len); 251 off += len; 252 } 253 c = c.next; 254 } 255 portion.top = off; 256 if ((LoggerProvider.send_logger != null) 258 &&(LoggerProvider.send_logger.isLoggable(BasicLevel.DEBUG))) { 259 String str = "TCP b: "; 260 for (int i = 0; i < size; i++) { 261 str = portion.data[i] + " "; 262 } 263 LoggerProvider.send_logger.log(BasicLevel.DEBUG,str); 265 } 266 connection.emit(portion); 268 message.close(); 269 } finally { 270 portion.release(); 271 } 272 } 273 } 274 275 final class TcpIpProtocolGraph implements ProtocolGraph { 276 277 int port; 278 279 TcpIpProtocolGraph(int port) { 280 this.port = port; 281 } 282 283 TcpIpProtocolGraph() { 284 this(0); 285 } 286 287 public SessionIdentifier export(Session_Low hls) 288 throws JonathanException { 289 SrvSessionId session_id; 290 synchronized (srv_session_factories) { 291 for (int i = 0; i < num_srv_sessions; i++) { 293 if ((session_id = 294 srv_session_factories[i].register(hls,this)) != null) { 295 return session_id; 296 } 297 } 298 TcpIpSrvConnectionFactory srv_connection_factory = 301 connection_mgr.newSrvConnectionFactory(port); 302 session_id = new SrvSessionId(srv_connection_factory); 303 SrvSessionFactory srv_fac = 304 new SrvSessionFactory(session_id,hls,TcpIpProtocol.this); 305 scheduler.newJob().run(srv_fac); 306 int len = srv_session_factories.length; 307 if (num_srv_sessions == len) { 308 SrvSessionFactory[] new_srv_session_factories = 309 new SrvSessionFactory[len + 4]; 310 System.arraycopy(srv_session_factories,0,new_srv_session_factories, 311 0,len); 312 srv_session_factories = new_srv_session_factories; 313 } 314 srv_session_factories[num_srv_sessions++] = srv_fac; 315 return session_id; 316 } 317 } 318 } 319 320 final class CltSessionIdentifier extends IpSessionIdentifier { 321 322 CltSessionIdentifier(String hostname, int port) { 323 super(hostname,port); 324 } 325 326 public Protocol getProtocol() { 327 return TcpIpProtocol.this; 328 } 329 330 public void unexport() {} 331 332 public Session_High bind(Session_Low hls) 333 throws JonathanException { 334 CltSession session = new CltSession(hls,this); 335 IpConnection connection = 336 connection_mgr.newCltConnection(hostname,port,session); 337 session = (CltSession) connection.getSession(); 338 session.acquire(); 339 session.connect(connection); 340 return session; 341 } 342 343 public Context getInfo() throws JonathanException { 344 Context c = context_factory.newContext(); 345 c.addElement("hostname", String .class, hostname, (char)0); 346 c.addElement("port", Integer .class, new Integer (port), (char)0); 347 return c; 348 } 349 350 public boolean isLocal() { 351 return TcpIpProtocol.this.isLocal(this); 352 } 353 } 354 355 final class SrvSessionId extends IpSessionIdentifier { 356 357 TcpIpSrvConnectionFactory connection_factory; 358 359 SrvSessionId(TcpIpSrvConnectionFactory connection_factory) { 360 super(connection_factory.getHostName(), 361 connection_factory.getPort()); 362 this.connection_factory = connection_factory; 363 } 364 365 public Protocol getProtocol() { 366 return TcpIpProtocol.this; 367 } 368 369 public void unexport() { 370 connection_factory.close(); 371 } 372 373 public Session_High bind(Session_Low hls) 374 throws JonathanException { 375 throw new InternalException("Meaningless operation"); 376 } 377 378 public Context getInfo() throws JonathanException { 379 Context c = context_factory.newContext(); 380 c.addElement("hostname", String .class, hostname, (char)0); 381 c.addElement("port", Integer .class, new Integer (port), (char)0); 382 return c; 383 } 384 385 public boolean isLocal() { 386 return TcpIpProtocol.this.isLocal(this); 387 } 388 } 389 390 abstract class Session implements IpSession, Runnable { 391 395 IpConnection connection; 396 397 TcpIpChunkProvider tcp_message; 398 399 Session_Low hls; 400 401 Session(Session_Low hls) { 402 super(); 403 this.hls = hls; 404 } 405 406 final public void prepare(Marshaller m) {} 407 408 final public ReplyInterface prepareInvocation(Marshaller m) 409 throws JonathanException { 410 throw new InternalException("TCP session don't handle invocations."); 411 } 412 413 final public boolean direct() { 414 return true; 415 } 416 417 public final Session_Low getHls() { 418 return hls; 419 } 420 421 public synchronized void connect(IpConnection connection) 422 throws JonathanException { 423 if (this.connection != null) { 424 return; 425 } 426 this.connection = connection; 427 if (tcp_message == null) { 428 tcp_message = new TcpIpChunkProvider(this); 431 scheduler.newJob().run(this); 432 } 433 } 434 435 436 public final IpConnection getConnection() { 437 return connection; 438 } 439 440 441 final public void run() { 442 TcpIpChunkProvider message = null; 443 IpConnection connection = null; 444 try { 445 synchronized (this) { 446 if (this.connection != null) { 447 message = tcp_message; 448 connection = this.connection; 449 } else { 450 return; 451 } 452 } 453 hls.send(marshaller_factory.newUnMarshaller(message),this); 454 } catch (JonathanException e) { 455 if ((LoggerProvider.receive_logger != null) 457 &&(LoggerProvider.receive_logger.isLoggable(BasicLevel.INFO))) { 458 LoggerProvider.receive_logger.log(BasicLevel.INFO,"Exception caught by TcpIpProtocol."); 459 } 460 Throwable f = e.represents(); 462 if (f instanceof IOException || f instanceof EndOfMessageException) { 463 synchronized (this) { 464 if (connection == this.connection) { 465 unbind(); 466 return; 467 } else { 468 message.delete(); 469 } 470 } 471 } 472 hls.send(e,this); 473 } 474 } 475 476 final synchronized void closeNotify(TcpIpChunkProvider message) { 478 if (connection != null && message == tcp_message) { 479 tcp_message = new TcpIpChunkProvider(message); 480 scheduler.newJob().run(this); 481 } else { 482 System.out.println("connection " + connection + 483 " " + message + " " + tcp_message); 484 message.delete(); 485 } 486 } 487 488 final synchronized void deleteNotify(TcpIpChunkProvider message) { 490 if (message == tcp_message) { 491 tcp_message = null; } 493 } 494 495 496 void unbind() { 497 if (connection != null) { 498 connection.delete(); 499 connection = null; 500 } 501 } 502 503 TcpIpProtocol getProtocol() { 504 return TcpIpProtocol.this; 505 } 506 } 507 508 final class SrvSession extends Session { 509 SrvSession(Session_Low hls) { 510 super(hls); 511 } 512 513 final public void close() {} 514 515 public final void send(Marshaller message) 517 throws JonathanException { 518 try { 519 TcpIpProtocol.this.send(message,connection); 520 } catch (IOException e) { 521 try { 522 synchronized (this) { 523 if (connection == null) { 524 throw new CommunicationException("Session is closed"); 525 } 526 unbind(); 527 throw new CommunicationException(e); 528 } 529 } finally { 530 message.close(); 531 } 532 } 533 } 534 } 535 536 final class CltSession extends Session { 537 IpSessionIdentifier session_id; 538 int acquired; 539 540 541 CltSession(Session_Low hls,IpSessionIdentifier session_id) 542 throws JonathanException { 543 super(hls); 544 this.session_id = session_id; 545 acquired = 0; 546 } 547 548 public final void send(Marshaller message) 550 throws JonathanException { 551 try { 552 TcpIpProtocol.this.send(message,connection); 553 } catch (IOException e) { 554 try { 555 synchronized (this) { 556 if (connection == null) { 557 throw new CommunicationException("Session is closed"); 558 } 559 try { 560 rebind(); 561 } catch (JonathanException g) { 562 throw new org.objectweb.jonathan.apis.binding.BindException("Can't rebind TCP session"); 563 } 564 try { 565 TcpIpProtocol.this.send(message,connection); 566 } catch (IOException f) { 567 unbind(); 568 throw new CommunicationException(f); 569 } 570 } 571 } finally { 572 message.close(); 573 } 574 } 575 } 576 577 void rebind() throws JonathanException { 579 connection.delete(); 580 connection = null; 581 String hostname = session_id.hostname; 582 int port = session_id.port; 583 IpConnection new_connection = 584 connection_mgr.newCltConnection(hostname,port,this); 585 connect(new_connection); 586 } 587 588 591 final public synchronized void close() { 592 acquired--; 593 if (acquired == 0 && connection != null) { 594 connection.release(); 595 connection = null; 596 } 597 } 598 599 final synchronized void acquire() { 600 acquired++; 601 } 602 603 public boolean equals(Object object) { 604 if (object instanceof CltSession) { 605 CltSession other = (CltSession) object; 606 return other.session_id.equals(session_id) && other.hls.equals(hls); 607 } 608 return false; 609 } 610 611 public int hashCode() { 612 return session_id.hashCode() + hls.hashCode(); 613 } 614 } 615 616 619 final class SrvSessionFactory implements Runnable { 620 SrvSessionId session_id; 621 Session_Low hls; 622 boolean cont; 623 TcpIpProtocol protocol; 624 Thread runner; 625 626 630 SrvSessionFactory(SrvSessionId session_id, Session_Low hls, 631 TcpIpProtocol protocol) { 632 this.hls = hls; 633 this.session_id = session_id; 634 this.protocol = protocol; 635 cont = true; 636 } 637 638 SrvSessionId register(Session_Low hls, 639 TcpIpProtocolGraph protocol_graph) { 640 int port = protocol_graph.port; 641 if ((port == 0 || port == session_id.port) && this.hls.equals(hls)) { 642 return session_id; 643 } else { 644 return null; 645 } 646 } 647 648 public void run() { 649 runner = Thread.currentThread(); 650 TcpIpSrvConnectionFactory connection_factory = 651 session_id.connection_factory; 652 while (cont) { 653 try { 654 SrvSession session = new SrvSession(hls); 655 IpConnection connection = 656 connection_factory.newSrvConnection(session); 657 session.connect(connection); 658 } catch (JonathanException e) { 659 if (cont) { 660 if (verbose) { 661 System.err.println("Stopping server socket on exception."); 662 e.printStackTrace(); 663 } 664 if ((LoggerProvider.bind_logger != null) 666 &&(LoggerProvider.bind_logger.isLoggable(BasicLevel.INFO))) { 667 LoggerProvider.bind_logger.log(BasicLevel.INFO,"Stopping server socket on exception.",e); 668 } 669 session_id.unexport(); 671 remove(this); 672 cont = false; 673 } 674 } 675 } 676 } 677 678 void release() { 679 cont = false; 680 runner.interrupt(); 681 session_id.unexport(); 682 remove(this); 683 } 684 } 685 } 686 687 688 692 final class TcpIpChunkProvider extends Chunk implements ChunkProvider { 693 static final byte[] empty_data = new byte[0]; 694 static final Chunk empty_chunk = new Chunk(empty_data,0,0); 695 696 697 TcpIpProtocol.Session session; 698 699 IpConnection connection; 700 int max; 701 702 703 Chunk cache; 704 705 TcpIpChunkProvider(TcpIpProtocol.Session session) { 706 super(empty_data,0,0); 707 max = 0; 708 this.session = session; 709 connection = session.connection; 710 cache = empty_chunk; 711 } 712 713 TcpIpChunkProvider(TcpIpChunkProvider message) { 714 super(message.data,message.offset,message.top); 715 cache = message.cache; 716 max = message.max; 717 session = message.session; 718 connection = message.connection; 719 message.cache = null; 720 } 721 722 public Chunk prepare() throws JonathanException { 723 TcpIpProtocol protocol = session.getProtocol(); 724 if (top == offset) { 725 try { 727 int to_read = connection.available(); 728 if (to_read > 1) { 729 if (to_read > max - top) { 730 cache.release(); 731 cache = protocol.chunk_factory.newChunk(to_read); 732 data = cache.data; 733 offset = cache.offset; 734 top = cache.top; 735 max = data.length; 736 } 737 connection.receive(this,to_read); 738 return this; 739 } else { 740 if (max - top == 0) { 741 cache.release(); 742 cache = protocol.chunk_factory.newChunk(); 743 data = cache.data; 744 offset = cache.offset; 745 top = cache.top; 746 max = data.length; 747 } 748 connection.receive(this,1); 749 return this; 750 } 751 } catch (IOException e) { 752 delete(); 753 connection.delete(); 754 connection = null; 755 throw new JonathanException(e); 756 } 757 } else { 758 return this; 759 } 760 } 761 762 public void close() { 763 if (cache != null) { 764 session.closeNotify(this); 765 } 766 } 767 768 protected void finalize() { 769 if (cache != null) { 770 if ((LoggerProvider.logger != null) 772 &&(LoggerProvider.logger.isLoggable(BasicLevel.ERROR))) { 773 LoggerProvider.logger.log(BasicLevel.ERROR, 774 "Resource management error. Message has not been properly closed."); 775 } 776 delete(); 778 } 779 } 780 781 final void delete() { 782 if (cache != null) { 783 cache.release(); 784 cache = null; 785 } 786 787 session.deleteNotify(this); 788 } 789 790 public Chunk duplicate() throws JonathanException { 791 cache.top = top; 792 return cache.duplicate(offset,top); 793 } 794 795 public Chunk duplicate(int off,int t) throws JonathanException { 796 cache.top = top; 797 return cache.duplicate(off,t); 798 } 799 800 public void release() {} 801 } 802 | Popular Tags |