1 package com.quikj.server.framework; 2 3 import java.io.*; 4 import java.net.*; 5 import java.util.*; 6 7 public class AceIPCServer extends AceThread implements AceIPCEntityInterface 8 { 9 16 public AceIPCServer (long user_parm, 17 String name, 18 int port, 19 int max_connections, 20 int hb_interval, 21 AceThread unsol_msg_handler, 22 AceThread connect_handler, 23 AceThread disconnect_handler) 24 throws IOException, AceException 25 { 26 super(name); 27 28 if (initNewSocket (port, user_parm) == false) 29 { 30 throw new AceException ("Datagram socket initialization failed"); 31 } 32 33 userParm = user_parm; 34 maxConnections = max_connections; 35 hbInterval = hb_interval; 36 unsolMsgHandler = unsol_msg_handler; 37 connectHandler = connect_handler; 38 disconnectHandler = disconnect_handler; 39 this.port = port; 40 41 } 45 46 public void dispose() 47 { 48 dropConnections(); 49 dropSocket(); 50 51 if (this.isAlive() == true) 52 { 53 if (interruptWait (AceSignalMessage.SIGNAL_TERM, 54 "Normal IPC Server dispose") == false) 55 { 56 System.err.println (getName() 58 + ": AceIPCServer.dispose() -- Could not interrupt own wait : " 59 + getErrorMessage()); 60 61 super.dispose(); 62 } 63 } 64 else 65 { 66 super.dispose(); 67 68 } 72 } 73 74 public void run() 75 { 76 sockListener.start(); 77 78 while (true) 79 { 80 AceMessageInterface message = waitMessage(); 81 if (message == null) 82 { 83 continue; 87 } 88 else if ((message instanceof AceSignalMessage) == true) 89 { 90 super.dispose(); 99 break; 100 } 101 102 processEvent (message); 103 } 104 } 105 106 public AceMessageInterface waitIPCMessage () 107 { 108 Thread thr = Thread.currentThread(); 109 110 if ((thr instanceof AceThread) == false) 111 { 112 writeErrorMessage ("This method is not being called from an object which is a sub-class of type AceThread"); 113 return null; 114 } 115 116 AceThread cthread = (AceThread)thr; 117 118 while (true) 119 { 120 AceMessageInterface msg_received = cthread.waitMessage(); 121 if ((msg_received instanceof AceIPCMessage) == true) 122 { 123 if (((AceIPCMessage)msg_received).getEntity() == this) 124 { 125 return msg_received; 126 } 127 } 128 else if ((msg_received instanceof AceSignalMessage) == true) 129 { 130 return msg_received; 131 } 132 } 133 } 134 135 private boolean initNewSocket (int port, long user_parm) 136 { 137 139 synchronized (socketLock) 140 { 141 try 142 { 143 socket = new DatagramSocket (port); 144 } 145 catch (SocketException ex) 146 { 147 System.err.println (getName() + 149 ": AceIPCServer.initNewSocket() -- Socket error creating DatagramSocket : " 150 + ex.getMessage() + ' ' 151 + (new Date()) + ' ' + 152 (new Date().getTime() & 0xFFFF)); 153 return false; 154 } 155 156 try 157 { 158 sockListener = new AceDatagram (user_parm, 159 getName() + "_sockListener", 160 this, 161 socket, 162 AceIPCMessage.MAX_IPC_MSG_SIZE); 163 } 164 catch (IOException ex) 165 { 166 System.err.println (getName() + 168 ": AceIPCServer.initNewSocket() -- IO error creating AceDatagram : " 169 + ex.getMessage()); 170 socket.close(); 171 socket = null; 172 return false; 173 } 174 catch (AceException ex) 175 { 176 System.err.println (getName() + 178 ": AceIPCServer.initNewSocket() -- Ace error creating AceDatagram : " 179 + ex.getMessage()); 180 socket.close(); 181 socket = null; 182 return false; 183 } 184 } 185 186 return true; 187 } 188 189 private void dropSocket () 190 { 191 synchronized (socketLock) 192 { 193 if (sockListener != null) 194 { 195 sockListener.dispose(); 196 sockListener = null; 197 socket = null; 198 } 199 200 flushMessages(); } 202 } 203 204 private void dropConnections () 205 { 206 synchronized (clientList) 207 { 208 Collection elements = clientList.values(); 209 for (Iterator i = elements.iterator(); i.hasNext() == true; ) 210 { 211 HashMap element = (HashMap) i.next(); 212 Collection subelements = element.values(); 213 for (Iterator j = subelements.iterator(); j.hasNext() == true; ) 214 { 215 AceIPCServerConnection conn = (AceIPCServerConnection) j.next(); 216 InetAddress addr = conn.getClientAddress(); 217 int port = conn.getClientPort(); 218 conn.dispose(); 219 223 if (disconnectHandler != null) { 225 AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.DISCONNECT, 226 this, 227 addr, 228 port, 229 userParm); 230 if (disconnectHandler.sendMessage (msg_for_user) == false) 231 { 232 System.err.println (getName() 234 + ": AceIPCServer.dropConnections() -- Could not send IPC message to the user disconnect handler thread : " 235 + getErrorMessage()); 236 } 237 } 238 } 239 element.clear(); 240 } 241 242 clientList.clear(); 243 } 244 } 245 246 private void dropConnection (AceIPCServerConnection conn, boolean send_disc_msg) 247 { 249 InetAddress client_addr = conn.getClientAddress(); 250 int client_port = conn.getClientPort(); 251 conn.dispose(); 252 removeClient (conn); 253 if (send_disc_msg == true) 254 { 255 sendDisconnectMessage (client_addr, client_port); 256 } 257 } 258 259 protected void connectionClosed (AceIPCServerConnection conn, 260 boolean send_disc_msg) 261 { 263 removeClient (conn); 264 if (send_disc_msg == true) 265 { 266 sendDisconnectMessage (conn.getClientAddress(), conn.getClientPort()); 267 } 268 } 269 270 private void getNewSocket () 271 { 272 dropConnections(); 273 dropSocket(); 274 275 for (int i = 0; i < MAX_INIT_FAILURES_IN_A_ROW; i++) 276 { 277 if (initNewSocket (port, userParm) == true) 278 { 279 sockListener.start(); 280 break; 281 } 282 } 283 } 284 285 286 private void processEvent (AceMessageInterface message) 287 { 288 if ((message instanceof AceDatagramMessage) == true) 289 { 290 if (((AceDatagramMessage)message).getStatus() == AceDatagramMessage.READ_COMPLETED) 291 { 292 try 294 { 295 AceIPCMessageParser parser = new AceIPCMessageParser 296 (((AceDatagramMessage)message).getBuffer(), 297 ((AceDatagramMessage)message).getLength()); 298 int msg_type = parser.getMessageType(); 299 300 AceIPCServerConnection client_conn = findClient 302 (((AceDatagramMessage)message).getAddress(), 303 ((AceDatagramMessage)message).getPort()); 304 305 switch (msg_type) 306 { 307 case AceIPCMessageInterface.CONN_REQ_MSG: 308 { 309 if (client_conn != null) 310 { 311 client_conn.dispose(); 312 removeClient (client_conn); 313 } 314 316 processConnReqMessage ((AceIPCConnReqMessage) parser.getMessage(), 317 ((AceDatagramMessage)message).getAddress(), 318 ((AceDatagramMessage)message).getPort()); 319 } 320 break; 321 default: 322 { 323 switch (msg_type) 324 { 325 case AceIPCMessageInterface.HB_MSG: 326 { 327 if (client_conn != null) 328 { 329 if (client_conn.resetReceiveTiming() == false) 330 { 331 dropConnection (client_conn, 332 true); 333 } 334 } 335 } 336 break; 337 case AceIPCMessageInterface.DISCONNECT_MSG: 338 { 339 if (client_conn != null) 340 { 341 client_conn.dispose(); 342 removeClient (client_conn); 343 } 344 } 345 break; 346 case AceIPCMessageInterface.USER_MSG: 347 { 348 if (client_conn != null) 349 { 350 if (client_conn.resetReceiveTiming() == true) 351 { 352 processUserMessage ((AceIPCUserMessage) parser.getMessage(), 353 ((AceDatagramMessage)message).getAddress(), 354 ((AceDatagramMessage)message).getPort()); 355 } 356 else 357 { 358 dropConnection (client_conn, 359 true); 360 } 361 } 362 } 363 break; 364 default: 365 { 366 System.err.println (getName() + 368 ": AceIPCServer.processEvent() -- Unexpected message type received : " 369 + parser.getMessageType() 370 + ", msg follows: " + '\n' 371 + AceIPCMessage.dumpRawBytes 372 (((AceDatagramMessage)message).getBuffer(), 373 0, 374 ((AceDatagramMessage)message).getLength())); 375 376 } 377 break; 378 } 379 } 380 break; 381 } 382 } 383 catch (AceException ex) 384 { 385 System.err.println (getName() + 387 ": AceIPCServer.processEvent() -- Error parsing message, AceException : " 388 + ex.getMessage() 389 + ", msg follows: " + '\n' 390 + AceIPCMessage.dumpRawBytes 391 (((AceDatagramMessage)message).getBuffer(), 392 0, 393 ((AceDatagramMessage)message).getLength())); 394 return; 395 } 396 } 397 else 398 { 399 System.err.println (getName() + 401 ": AceIPCServer.processEvent() -- Error on datagram read, status = " 402 + ((AceDatagramMessage)message).getStatus()); 403 getNewSocket(); 404 } 405 } 406 else 407 { 408 System.err.println (getName() + 410 ": AceIPCServer.processEvent() -- Unexpected Ace message type encountered : " 411 + message.messageType()); 412 } 413 414 } 415 416 private void processConnReqMessage (AceIPCConnReqMessage conn_message, 417 InetAddress addr, 418 int port) 419 { 420 int current_num_connections = 0; 421 synchronized (clientList) 422 { 423 Collection elements = clientList.values(); 424 for (Iterator i = elements.iterator(); i.hasNext() == true; ) 425 { 426 current_num_connections += ((HashMap) i.next()).size(); 427 } 428 } 429 430 if (current_num_connections < maxConnections) 431 { 432 try 433 { 434 AceIPCServerConnection client_conn = new AceIPCServerConnection (getName() + '_' + addr + '_' + port, 435 addr, 436 port, 437 hbInterval, 438 this); 439 if (sendConnectResponseMessage (AceIPCConnRespMessage.STATUS_OK, 440 addr, port) == true) 441 { 442 addClient (client_conn, 443 conn_message.getBytes(), 444 conn_message.userDataOffset(), 445 conn_message.userDataLength()); 446 client_conn.start(); 447 } 448 else 449 { 450 client_conn.dispose(); 451 } 452 } 453 catch (IOException ex) 454 { 455 System.err.println (getName() + 457 ": AceIPCServer.processConnReqMessage() -- IOException creating AceIPCServerConnection : " 458 + ex.getMessage()); 459 460 boolean status = sendConnectResponseMessage (AceIPCConnRespMessage.STATUS_REFUSED, 461 addr, port); 462 } 463 } 464 else 465 { 466 boolean status = sendConnectResponseMessage (AceIPCConnRespMessage.STATUS_TRY_LATER, 467 addr, port); 468 } 469 } 470 471 private void processUserMessage (AceIPCUserMessage received_message, 472 InetAddress addr, 473 int port) 474 { 475 int to_thread_id = received_message.getToThreadID(); 476 477 483 AceIPCMessage msg_for_user = new AceIPCMessage 484 (AceIPCMessage.MESSAGE_RECEIVED, 485 (to_thread_id > 0 ? true : false), 486 this, 487 received_message.getFromThreadID(), 488 received_message.getBytes(), 489 received_message.userDataOffset(), 490 received_message.userDataLength(), 491 addr, 492 port, 493 userParm); 494 495 if (to_thread_id > 0) 496 { 497 AceThread to_thread = AceThread.getAceThreadObject (to_thread_id); 498 if (to_thread != null) 499 { 500 if (to_thread.sendMessage (msg_for_user) == false) 501 { 502 System.err.println (getName() 504 + ": AceIPCServer.processUserMessage() -- Could not send solicited IPC message to thread id = " 505 + to_thread_id 506 + " : " 507 + getErrorMessage()); 508 } 509 } 510 else 511 { 512 } 517 } 518 else 519 { 520 if (unsolMsgHandler != null) 521 { 522 if (unsolMsgHandler.sendMessage (msg_for_user) == false) 523 { 524 System.err.println (getName() 526 + ": AceIPCServer.processUserMessage() -- Could not send IPC message to the unsolicited msg handler thread : " 527 + getErrorMessage()); 528 } 529 } 530 } 531 } 532 533 private boolean sendConnectResponseMessage (int status, 534 InetAddress addr, 535 int port) 536 { 537 return (sendMessage (new AceIPCConnRespMessage (status, hbInterval), 539 addr, 540 port)); 541 } 542 543 private void sendDisconnectMessage (InetAddress addr, int port) 544 { 545 boolean status = sendMessage (new AceIPCDiscMessage(), addr, port); 546 } 547 548 protected boolean sendHeartbeatMessage (InetAddress addr, int port) 549 { 551 return sendMessage (new AceIPCHeartbeatMessage(), addr, port); 552 } 553 554 private boolean sendMessage (AceIPCMessageInterface message, 555 InetAddress addr, int port) 556 { 557 boolean retval = true; 558 559 synchronized (socketLock) 560 { 561 if (socket == null) 562 { 563 return false; 564 } 565 566 DatagramPacket dp = new DatagramPacket (message.getBytes(), 567 message.getLength(), 568 addr, 569 port); 570 571 try 572 { 573 574 socket.send (dp); 575 } 585 catch (IOException ex) 586 { 587 System.err.println (getName() + 589 ": AceIPCServer.sendMessage() -- IOException sending message on socket, error : " 590 + ex.getMessage() 591 + ", dest address = " 592 + addr.toString() 593 + ", dest port = " 594 + port + ' ' 595 + (new Date()) + ' ' 596 + (new Date().getTime() & 0xFFFF) 597 + ", message follows: \n" 598 + message.traceIPCMessage(true)); 599 600 retval = false; 601 } 602 } 603 604 if (retval == false) 605 { 606 } 608 609 return retval; 610 } 611 612 public boolean sendIPCMessage (byte[] message, int offset, int len, 613 int to_thread_id, AceThread sender, 614 InetAddress addr, int port) 615 { 616 Thread parent_thread = null; 617 if (sender == null) 618 { 619 parent_thread = Thread.currentThread(); 620 } 621 else 622 { 623 parent_thread = sender; 624 } 625 626 if ((parent_thread instanceof AceThread) == false) 627 { 628 writeErrorMessage ("The calling thread must be an instance of AceThread"); 629 return false; 630 } 631 632 boolean retval = true; 633 634 AceIPCServerConnection client_conn = findClient (addr, port); 636 637 if (client_conn == null) 638 { 639 writeErrorMessage ("The client is not currently connected"); 640 retval = false; 641 } 642 else if (client_conn.resetSendTiming() == false) 643 { 644 dropConnection (client_conn, true); 645 writeErrorMessage ("Fatal timing error encountered, connection dropped"); 646 retval = false; 647 } 648 else 649 { 650 AceIPCUserMessage ipc_msg = new AceIPCUserMessage (to_thread_id, 651 ((AceThread)parent_thread).getAceThreadId(), 652 message, 653 offset, 654 len); 655 656 if (sendMessage (ipc_msg, addr, port) == false) 657 { 658 writeErrorMessage ("Socket error sending message, connection dropped"); 659 retval = false; 660 } 661 } 662 663 return retval; 664 } 665 666 public boolean sendIPCMessage (byte[] message, int offset, int len, int to_thread_id, 667 InetAddress addr, int port) 668 { 669 return sendIPCMessage (message, offset, len, to_thread_id, null, addr, port); 670 } 671 672 public boolean sendIPCMessage (byte[] message, int offset, int len, InetAddress addr, int port) 673 { 674 return sendIPCMessage (message, offset, len, 0, null, addr, port); 675 } 676 677 public boolean sendIPCMessage (byte[] message, int offset, int len, AceThread sender, 678 InetAddress addr, int port) 679 { 680 return sendIPCMessage (message, offset, len, 0, sender, addr, port); 681 } 682 683 684 685 private int addressToInt (InetAddress addr) 686 { 687 int ret = 0; 688 689 byte[] addr_bytes = addr.getAddress(); 690 try 691 { 692 ret = (int) AceInputSocketStream.octetsToIntMsbFirst (addr_bytes, 0, 4); 693 } 694 catch (NumberFormatException ex) 695 { 696 System.err.println (getName() + 698 ": AceIPCServer.addressToInt() -- NumberFormatException converting InetAddress to int : " 699 + ex.getMessage()); 700 } 701 702 return ret; 703 } 704 705 private void addClient (AceIPCServerConnection client_element, 706 byte[] reg_data, 707 int offset, 708 int length) 709 { 710 synchronized (clientList) 711 { 712 int addr_int = addressToInt (client_element.getClientAddress()); 713 HashMap element = (HashMap) clientList.get (new Integer (addr_int)); 714 if (element == null) { 716 element = new HashMap (); 717 element.put (new Integer (client_element.getClientPort()), 718 client_element); 719 clientList.put (new Integer (addr_int), element); 720 } 721 else 722 { 723 element.put (new Integer (client_element.getClientPort()), 724 client_element); 725 } 726 } 727 728 if (connectHandler != null) 729 { 730 AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.CONNECTION_ESTABLISHED, 731 this, 732 reg_data, offset, length, 733 client_element.getClientAddress(), 734 client_element.getClientPort(), 735 userParm); 736 if (connectHandler.sendMessage (msg_for_user) == false) 737 { 738 System.err.println (getName() 740 + ": AceIPCServer.addClient() -- Could not send IPC message to the user connect handler thread : " 741 + getErrorMessage()); 742 } 743 } 744 } 745 746 private void removeClient (AceIPCServerConnection client_element) 747 { 748 boolean element_removed = false; 749 750 synchronized (clientList) 751 { 752 int addr_int = addressToInt (client_element.getClientAddress()); 753 HashMap element = (HashMap) clientList.get (new Integer (addr_int)); 754 if (element != null) 755 { 756 AceIPCServerConnection subelement = (AceIPCServerConnection) element.get 757 (new Integer (client_element.getClientPort())); 758 if (subelement != null) 759 { 760 element.remove (new Integer (client_element.getClientPort())); 761 element_removed = true; 762 if (element.isEmpty() == true) 763 { 764 clientList.remove (new Integer (addr_int)); 765 } 766 } 767 } 768 } 769 770 if (element_removed == true) 771 if (disconnectHandler != null) 772 { 773 AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.DISCONNECT, 774 this, 775 client_element.getClientAddress(), 776 client_element.getClientPort(), 777 userParm); 778 if (disconnectHandler.sendMessage (msg_for_user) == false) 779 { 780 System.err.println (getName() 782 + ": AceIPCServer.removeClient() -- Could not send IPC message to the user disconnect handler thread : " 783 + getErrorMessage()); 784 } 785 } 786 } 787 788 private AceIPCServerConnection findClient (InetAddress addr, int port) 789 { 790 synchronized (clientList) 791 { 792 int addr_int = addressToInt (addr); 793 HashMap element = (HashMap) clientList.get (new Integer (addr_int)); 794 if (element != null) 795 { 796 return ((AceIPCServerConnection) element.get (new Integer (port))); 797 } 798 } 799 800 return null; 801 } 802 803 private static final int MAX_INIT_FAILURES_IN_A_ROW = 10; 804 805 private DatagramSocket socket = null; 806 private AceDatagram sockListener = null; 807 private Object socketLock = new Object (); 808 private HashMap clientList = new HashMap(); 809 private int maxConnections; 810 private int hbInterval; 811 private AceThread unsolMsgHandler = null; 812 private AceThread connectHandler = null; 813 private AceThread disconnectHandler = null; 814 private long userParm; 815 private int port; 816 817 818 public static void main (String [] args) 820 { 821 class ServerUser extends AceThread 822 { 823 class DataSender extends AceThread 824 { 825 public DataSender (int send_data_interval, 826 ServerUser parent) 827 throws IOException 828 { 829 super (); 830 sendInterval = send_data_interval; 831 this.parent = parent; 832 System.out.println ("DATA SENDER THREAD ID = " + getAceThreadId()); 833 } 834 835 public void dispose() 836 { 837 } 838 839 public void run() 840 { 841 int msg_counter = 0; 842 byte[] msg_data = new byte[4]; 843 844 try 845 { 846 while (true) 847 { 848 sleep (sendInterval); 849 AceInputSocketStream.intToBytesMsbFirst (++msg_counter, 850 msg_data, 851 0); 852 parent.broadcastMessage (msg_data, 0, msg_data.length); 853 } 854 } 855 catch (InterruptedException ex) 856 { 857 System.err.println ("DataSender sleep interrupted"); 858 } 859 } 860 861 private int sendInterval; 862 private ServerUser parent; 863 } 864 865 class ConnectedClient 866 { 867 public ConnectedClient (InetAddress addr, int port) 868 { 869 this.address = addr; 870 this.port = port; 871 } 872 873 public InetAddress getAddress() 874 { 875 return address; 876 } 877 878 public int getPort() 879 { 880 return port; 881 } 882 883 private InetAddress address; 884 private int port; 885 } 886 887 public ServerUser (String name, 888 int port, 889 int max_connections, 890 int hb_interval, 891 int send_data_interval) 892 throws IOException, AceException 893 { 894 super(name); 895 896 ipcServer = new AceIPCServer (5000, 897 name, 898 port, 899 max_connections, 900 hb_interval, 901 this, 902 this, 903 this); 904 905 if (send_data_interval > 0) 906 { 907 dataSender = new DataSender (send_data_interval, this); 908 } 909 910 System.out.println (name + " THREAD ID = " + getAceThreadId()); 911 } 912 913 public void run() 914 { 915 ipcServer.start(); 916 917 if (dataSender != null) 918 { 919 dataSender.start(); 920 } 921 922 while (true) 923 { 924 AceMessageInterface message = waitMessage(); if (message == null) 927 { 928 System.err.println (getName() + 929 " Null message encountered"); 930 continue; 931 } 932 else if ((message instanceof AceSignalMessage) == true) 933 { 934 System.out.println (getName() + 935 " Signal received, ID = " 936 + ((AceSignalMessage)message).getSignalId() 937 + ", signal message = " 938 + ((AceSignalMessage)message).getMessage()); 939 ipcServer.dispose(); 940 if (dataSender != null) 941 { 942 dataSender.dispose(); 943 } 944 super.dispose(); 945 break; 946 } 947 948 if ((message instanceof AceIPCMessage) == true) 949 { 950 AceIPCMessage msg = (AceIPCMessage) message; 951 switch (msg.getEvent()) 952 { 953 case AceIPCMessage.CONNECTION_ESTABLISHED: 954 { 955 InetAddress addr = msg.getFarEndAddress(); 956 int port = msg.getFarEndPort(); 957 System.out.println (getName() + 958 '_' + getAceThreadId() 959 + (new Date()) + ' ' + 960 (new Date().getTime() & 0xFFFF) 961 + " CONNECTION ESTABLISHED WITH CLIENT ADDR = " 962 + addr 963 + ", PORT = " 964 + port 965 + ", registration data size = " 966 + msg.getUserDataLength() 967 + ", reg bytes = " 968 + AceIPCMessage.dumpRawBytes (msg.getMessage(), 969 msg.getUserDataOffset(), 970 msg.getUserDataLength()) 971 + ", userparm=" 972 + msg.getUserParm()); 973 if (dataSender != null) 974 { 975 synchronized (clientList) 977 { 978 if (clientList.add (new ConnectedClient (addr, port)) == false) 979 { 980 System.err.println (getName() + 981 " Couldn't add new client into list, addr = " + addr + ", port = " + port); 982 } 983 } 984 } 985 } 986 break; 987 case AceIPCMessage.DISCONNECT: 988 { 989 System.out.println (getName() + 990 '_' + getAceThreadId() + (new Date()) + ' ' + 991 (new Date().getTime() & 0xFFFF) + 992 " CONNECTION DISCONNECTED, CLIENT ADDR = " 993 + msg.getFarEndAddress() 994 + ", PORT = " 995 + msg.getFarEndPort() 996 + ", userparm=" 997 + msg.getUserParm()); 998 if (dataSender != null) 999 { 1000 synchronized (clientList) 1002 { 1003 int num_elements = clientList.size(); 1004 int i; 1005 for (i = 0; i < num_elements; i++) 1006 { 1007 ConnectedClient element = (ConnectedClient) clientList.get(i); 1008 if (element.getAddress().equals(msg.getFarEndAddress())) 1009 if (element.getPort() == msg.getFarEndPort()) 1010 { 1011 break; 1012 } 1013 } 1014 if (i < num_elements) 1015 { 1016 clientList.remove (i); 1017 } 1018 } 1019 } 1020 } 1021 break; 1022 case AceIPCMessage.MESSAGE_RECEIVED: 1023 { 1024 int msg_num = (int) AceInputSocketStream.octetsToIntMsbFirst (msg.getMessage(), 1025 msg.getUserDataOffset(), 1026 msg.getUserDataLength()); 1027 1028 System.out.println (getName() + 1029 '_' + getAceThreadId() + (new Date()) + ' ' + 1030 (new Date().getTime() & 0xFFFF) + 1031 " RECEIVED " + 1032 ((msg.solicitedMessage() == true) ? "solicited " : "unsolicited ") + 1033 "senderThreadID=" + msg.getSenderThreadId() + 1034 " userparm=" + msg.getUserParm() + 1035 " : " + 1036 msg_num); 1037 1038 1039 if (dataSender == null) 1040 { 1041 byte[] reply = new byte[4]; 1043 AceInputSocketStream.intToBytesMsbFirst (++msg_num, 1044 reply, 1045 0); 1046 if (ipcServer.sendIPCMessage (reply, 0, reply.length, 1047 msg.getSenderThreadId(), 1048 this, 1049 msg.getFarEndAddress(), 1050 msg.getFarEndPort()) == false) 1051 { 1052 System.err.println (getName() + (new Date()) + ' ' + 1053 (new Date().getTime() & 0xFFFF) + 1054 " Message sending failed : " 1055 + getErrorMessage()); 1056 } 1057 } 1058 } 1059 break; 1060 default: 1061 { 1062 System.err.println (getName() + 1063 " Unexpected IPC message event encountered : " 1064 + msg.getEvent()); 1065 } 1066 break; 1067 } 1068 } 1069 else 1070 { 1071 System.err.println (getName() + 1072 " Unexpected Ace message type encountered : " 1073 + message.messageType()); 1074 } 1075 } 1076 1077 } 1078 1079 public void broadcastMessage (byte[] msg_data, int offset, int length) 1080 { 1081 synchronized (clientList) 1082 { 1083 int num_elements = clientList.size(); 1084 for (int i = 0; i < num_elements; i++) 1085 { 1086 ConnectedClient element = (ConnectedClient) clientList.get(i); 1087 if (ipcServer.sendIPCMessage (msg_data, offset, length, this, 1088 element.getAddress(), 1089 element.getPort()) == false) 1090 { 1091 System.err.println (getName() + (new Date()) + ' ' + 1092 (new Date().getTime() & 0xFFFF) + 1093 " Broadcast sending failed to client addr = " 1094 + (element.getAddress()).toString() 1095 + ", port = " 1096 + element.getPort() + " : " 1097 + ((AceThread)(Thread.currentThread())).getErrorMessage()); 1098 } 1099 } 1119 } 1120 } 1121 1122 private AceIPCServer ipcServer; 1123 private DataSender dataSender = null; 1124 private ArrayList clientList = new ArrayList(); 1125 } 1126 1127 1128 try 1129 { 1130 int port = 3000; 1131 int max_connections = 5; 1132 int hb_interval = 2000; 1133 int send_data_interval = 0; 1134 1135 if ((args.length != 0) && (args.length != 4)) 1136 { 1137 System.out.println ("Arguments (all or nothing): <port>, <max connections>, <hb interval(ms)>, <send user data interval(ms) - if 0, sends upon receipt>"); 1138 System.out.println ("Defaults: port=" + port + ", max connections=" + max_connections + ", hb interval=" + hb_interval + ", send user data interval=" + send_data_interval); 1139 System.exit (0); 1140 } 1141 1142 if (args.length == 4) 1143 { 1144 try 1145 { 1146 port = Integer.parseInt (args[0]); 1147 max_connections = Integer.parseInt (args[1]); 1148 hb_interval = Integer.parseInt (args[2]); 1149 send_data_interval = Integer.parseInt (args[3]); 1150 } 1151 catch (NumberFormatException ex) 1152 { 1153 System.err.println ("Input must be numeric"); 1154 System.exit (1); 1155 } 1156 } 1157 1158 AceTimer.Instance().start(); 1160 ServerUser user = new ServerUser ("TestServer", 1161 port, 1162 max_connections, 1163 hb_interval, 1164 send_data_interval); 1165 1166 user.start(); 1167 user.join(); 1168 System.exit (0); 1169 } 1170 catch (IOException ex) 1171 { 1172 System.err.println ("IOException in main " + ex.getMessage()); 1173 System.exit (1); 1174 } 1175 catch (AceException ex) 1176 { 1177 System.err.println ("AceException in main " + ex.getMessage()); 1178 System.exit (1); 1179 } 1180 catch (InterruptedException ex) 1181 { 1182 System.err.println ("InterruptedException in main " + ex.getMessage()); 1183 System.exit (1); 1184 } 1185 } 1186} 1187 | Popular Tags |