1 package com.quikj.server.framework; 2 3 import java.io.*; 4 import java.net.*; 5 import java.util.*; 6 7 public class AceIPCClient extends AceThread implements AceIPCEntityInterface 8 { 9 14 public AceIPCClient (long user_parm, 15 String name, 16 String server_host, 17 int server_port, 18 AceThread unsol_msg_handler, 19 AceThread connect_handler, 20 AceThread disconnect_handler, 21 byte[] registration_data, 22 int reg_data_offset, 23 int reg_data_len) 24 throws IOException, AceException 25 { 26 super(name); 27 28 try 29 { 30 serverAddress = InetAddress.getByName (server_host); 31 } 32 catch (UnknownHostException ex) 33 { 34 throw new AceException ("Could not resolve inet address of host " 35 + server_host 36 + ", error " + ex.getMessage()); 37 } 38 39 if (initNewSocket (user_parm) == false) 40 { 41 throw new AceException ("Datagram socket initialization failed"); 42 } 43 44 serverPort = server_port; 45 unsolMsgHandler = unsol_msg_handler; 46 connectHandler = connect_handler; 47 disconnectHandler = disconnect_handler; 48 registrationData = registration_data; 49 regDataOffset = reg_data_offset; 50 regDataLength = reg_data_len; 51 userParm = user_parm; 52 53 } 57 58 public void dispose() 59 { 60 synchronized (ipcLock) 61 { 62 dropConnection(); 63 dropSocket(); 64 } 65 66 if (this.isAlive() == true) 67 { 68 if (interruptWait (AceSignalMessage.SIGNAL_TERM, 69 "Normal IPC Client dispose") == false) 70 { 71 System.err.println (getName() 73 + ": AceIPCClient.dispose() -- Could not interrupt own wait : " 74 + getErrorMessage()); 75 76 super.dispose(); 77 } 78 } 79 else 80 { 81 super.dispose(); 82 83 } 87 } 88 89 public void run() 90 { 91 if (initConnection (CONNECT_SHORT_TIMER) == false) 92 { 93 dropSocket(); 94 } 95 else 96 { 97 sockListener.start(); 98 } 99 100 while (true) 101 { 102 AceMessageInterface message = waitMessage(); 103 if (message == null) 104 { 105 continue; 110 } 111 else if ((message instanceof AceSignalMessage) == true) 112 { 113 super.dispose(); 120 break; 121 } 122 123 synchronized (ipcLock) 124 { 125 switch (state) 126 { 127 case STATE_CONNECTING: 128 { 129 processConnectingEvent (message); 130 } 131 break; 132 case STATE_CONNECTED: 133 { 134 processConnectedEvent (message); 135 } 136 break; 137 case STATE_WAITING_BEFORE_RETRY: 138 { 139 processWaitingToRetryEvent (message); 140 } 141 break; 142 case STATE_DISCONNECTED: 143 break; 144 default: 145 { 146 System.err.println (getName() + 148 ": AceIPCClient.run() -- Bad state encountered : " 149 + state); 150 reconnect(); 151 } 152 break; 153 } 154 } 155 } 156 } 157 158 public AceMessageInterface waitIPCMessage () 159 { 160 Thread thr = Thread.currentThread(); 161 162 if ((thr instanceof AceThread) == false) 163 { 164 writeErrorMessage ("This method is not being called from an object which is a sub-class of type AceThread"); 165 return null; 166 } 167 168 AceThread cthread = (AceThread)thr; 169 170 while (true) 171 { 172 AceMessageInterface msg_received = cthread.waitMessage(); 173 if ((msg_received instanceof AceIPCMessage) == true) 174 { 175 if (((AceIPCMessage)msg_received).getEntity() == this) 176 { 177 return msg_received; 178 } 179 } 180 else if ((msg_received instanceof AceSignalMessage) == true) 181 { 182 return msg_received; 183 } 184 } 185 } 186 187 private boolean initNewSocket (long user_parm) 188 { 189 try 191 { 192 socket = new DatagramSocket (); 193 } 194 catch (SocketException ex) 195 { 196 System.err.println (getName() + 198 ": AceIPCClient.initNewSocket() -- Socket error creating DatagramSocket : " 199 + ex.getMessage()); 200 return false; 201 } 202 203 try 204 { 205 sockListener = new AceDatagram (user_parm, 206 getName() + "_sockListener", 207 this, 208 socket, 209 AceIPCMessage.MAX_IPC_MSG_SIZE); 210 } 211 catch (IOException ex) 212 { 213 System.err.println (getName() + 215 ": AceIPCClient.initNewSocket() -- IO error creating AceDatagram : " 216 + ex.getMessage()); 217 socket.close(); 218 socket = null; 219 return false; 220 } 221 catch (AceException ex) 222 { 223 System.err.println (getName() + 225 ": AceIPCClient.initNewSocket() -- Ace error creating AceDatagram : " 226 + ex.getMessage()); 227 socket.close(); 228 socket = null; 229 return false; 230 } 231 232 return true; 233 } 234 235 private boolean initNewSocket () 236 { 237 return initNewSocket (userParm); 238 } 239 240 private void dropSocket () 241 { 242 if (sockListener != null) 243 { 244 sockListener.dispose(); 246 sockListener = null; 247 socket = null; 248 } 249 } 250 251 private boolean initConnection (int timer_value) 252 { 253 259 state = STATE_DISCONNECTED; 261 for (int i = 0; i < MAX_INIT_FAILURES_IN_A_ROW; i++) 262 { 263 if (sendConnectRequestMessage () == true) 264 { 265 try 266 { 267 sendTimerId = AceTimer.Instance().startTimer (timer_value, 268 this, 269 0); 270 if (sendTimerId < 0) 271 { 272 System.err.println (getName() + 274 ": AceIPCClient.initConnection() -- Failure starting timer, returned ID = " 275 + sendTimerId); 276 sendDisconnectMessage(); 277 return false; 278 } 279 else 280 { 281 state = STATE_CONNECTING; 282 return true; 283 } 284 } 285 catch (IOException ex) 286 { 287 System.err.println (getName() + 289 ": AceIPCClient.initConnection() -- IOException starting timer : " 290 + ex.getMessage()); 291 sendDisconnectMessage(); 292 return false; 293 } 294 } 295 else 296 { 297 dropSocket(); 298 if (initNewSocket() == false) 299 { 300 return false; 301 } 302 } 303 } 304 305 return false; 306 } 307 308 private void dropConnection () 309 { 310 dropConnection (true); 311 } 312 313 private void dropConnection (boolean send_disc_msg) 314 { 315 if (send_disc_msg == true) 316 { 317 switch (state) 318 { 319 case STATE_CONNECTED: 320 case STATE_CONNECTING: 321 { 322 sendDisconnectMessage(); 323 } 324 break; 325 326 default: 327 break; 328 } 329 } 330 331 state = STATE_DISCONNECTED; 332 333 flushMessages(); 335 try 336 { 337 AceTimer.Instance().cancelAllTimers (this); 338 } 339 catch (IOException ex) 340 { 341 System.err.println (getName() + 343 ": AceIPCClient.dropConnection() -- Error canceling timers : " 344 + ex.getMessage()); 345 return; 346 } 347 } 348 349 private void reconnect () { 351 dropConnection (true); 352 if (initConnection (CONNECT_SHORT_TIMER) == false) 353 { 354 dropSocket(); 355 } 356 } 357 358 private void reconnectWithNewSocket () 359 { 360 dropConnection (false); 361 362 dropSocket(); 363 364 if (initNewSocket() == true) 365 { 366 if (initConnection (CONNECT_SHORT_TIMER) == false) 367 { 368 dropSocket(); 369 } 370 else 371 { 372 sockListener.start(); 373 } 374 } 375 } 376 377 private void stopTimer (int timer_id) 378 { 379 try 380 { 381 boolean status = AceTimer.Instance().cancelTimer (timer_id, this); 382 } 383 catch (IOException ex) 384 { 385 System.err.println (getName() + 387 ": AceIPCClient.stopTimer() -- IOException canceling timer ID = " 388 + timer_id 389 + " : " 390 + ex.getMessage()); 391 return; 392 } 393 } 394 395 396 397 private void processConnectingEvent (AceMessageInterface message) 398 { 399 if ((message instanceof AceDatagramMessage) == true) 401 { 402 403 stopTimer (sendTimerId); 404 405 if (((AceDatagramMessage)message).getStatus() == AceDatagramMessage.READ_COMPLETED) 406 { 407 try 409 { 410 AceIPCMessageParser parser = new AceIPCMessageParser 411 (((AceDatagramMessage)message).getBuffer(), 412 ((AceDatagramMessage)message).getLength()); 413 switch (parser.getMessageType()) 414 { 415 case AceIPCMessageInterface.CONN_RESP_MSG: 416 { 417 processConnRespMessage ((AceIPCConnRespMessage) parser.getMessage()); 418 } 419 break; 420 default: 421 { 422 state = STATE_DISCONNECTED; 423 flushMessages(); 425 if (initConnection (CONNECT_SHORT_TIMER) == false) 426 { 427 dropSocket(); 428 } 429 } 430 break; 431 } 432 } 433 catch (AceException ex) 434 { 435 System.err.println (getName() + 437 ": AceIPCClient.processConnectingEvent() -- Error parsing message, AceException : " 438 + ex.getMessage() 439 + ", msg follows: " + '\n' 440 + AceIPCMessage.dumpRawBytes 441 (((AceDatagramMessage)message).getBuffer(), 442 0, 443 ((AceDatagramMessage)message).getLength())); 444 return; 445 } 446 } 447 else 448 { 449 reconnectWithNewSocket(); 451 } 452 } 453 else if ((message instanceof AceTimerMessage) == true) 454 { 455 if (initConnection (CONNECT_LONG_TIMER) == false) 456 { 457 dropSocket(); 458 } 459 } 460 else 461 { 462 System.err.println (getName() + 464 ": AceIPCClient.processConnectingEvent() -- Unexpected Ace message type encountered : " 465 + message.messageType()); 466 } 467 } 468 469 private void processConnRespMessage (AceIPCConnRespMessage conn_message) 470 { 471 switch (conn_message.getStatus()) 472 { 473 case AceIPCConnRespMessage.STATUS_OK: 474 { 475 state = STATE_CONNECTED; 476 hbInterval = conn_message.getHbInterval(); 477 try 480 { 481 sendTimerId = AceTimer.Instance().startTimer (hbInterval, 482 this, 483 0); 484 receiveTimerId = AceTimer.Instance().startTimer ((hbInterval * 485 AceIPCHeartbeatMessage.TOLERANCE_FACTOR), 486 this, 487 0); 488 if ((sendTimerId < 0) || (receiveTimerId < 0)) 489 { 490 System.err.println (getName() + 492 ": AceIPCClient.processConnRespMessage() -- Failure starting one or more timers, returned IDs = " 493 + sendTimerId + ", " 494 + receiveTimerId); 495 dropConnection(); 496 dropSocket(); 497 return; 498 } 499 else 500 { 501 if (sendHeartbeatMessage() == false) 502 { 503 reconnectWithNewSocket(); 504 } 505 else 506 { 507 if (connectHandler != null) 508 { 509 AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.CONNECTION_ESTABLISHED, 510 this, 511 serverAddress, 512 serverPort, 513 userParm); 514 if (connectHandler.sendMessage (msg_for_user) == false) 515 { 516 System.err.println (getName() 518 + ": AceIPCClient.processConnRespMessage() -- Could not send IPC message to the user connect handler thread : " 519 + getErrorMessage()); 520 } 521 } 522 } 523 } 524 } 525 catch (IOException ex) 526 { 527 System.err.println (getName() + 529 ": AceIPCClient.processConnRespMessage() -- IOException starting one or more timers : " 530 + ex.getMessage()); 531 dropConnection(); 532 dropSocket(); 533 return; 534 } 535 } 536 break; 537 default: 538 { 539 state = STATE_WAITING_BEFORE_RETRY; 540 try 541 { 542 sendTimerId = AceTimer.Instance().startTimer (TRY_AGAIN_LATER_TIMER, 543 this, 544 0); 545 if (sendTimerId < 0) 546 { 547 System.err.println (getName() + 549 ": AceIPCClient.processConnRespMessage() -- Failure starting timer, returned ID = " 550 + sendTimerId); 551 dropConnection(); 552 dropSocket(); 553 return; 554 } 555 } 556 catch (IOException ex) 557 { 558 System.err.println (getName() + 560 ": AceIPCClient.processConnRespMessage() -- IOException starting timer : " 561 + ex.getMessage()); 562 dropConnection(); 563 dropSocket(); 564 return; 565 } 566 } 567 break; 568 } 569 } 570 571 private void processConnectedEvent (AceMessageInterface message) 572 { 573 if ((message instanceof AceDatagramMessage) == true) 574 { 575 if (((AceDatagramMessage)message).getStatus() == AceDatagramMessage.READ_COMPLETED) 576 { 577 stopTimer (receiveTimerId); 579 try 580 { 581 receiveTimerId = AceTimer.Instance().startTimer ((hbInterval * 582 AceIPCHeartbeatMessage.TOLERANCE_FACTOR), 583 this, 584 0); 585 if (receiveTimerId < 0) 586 { 587 System.err.println (getName() + 589 ": AceIPCClient.processConnectedEvent() -- Failure starting timer, returned ID = " 590 + receiveTimerId); 591 notifyUserOfDisc(); 592 dropConnection(); 593 dropSocket(); 594 return; 595 } 596 } 597 catch (IOException ex) 598 { 599 System.err.println (getName() + 601 ": AceIPCClient.processConnectedEvent() -- IOException starting timer : " 602 + ex.getMessage()); 603 notifyUserOfDisc(); 604 dropConnection(); 605 dropSocket(); 606 return; 607 } 608 609 try 611 { 612 AceIPCMessageParser parser = new AceIPCMessageParser 613 (((AceDatagramMessage)message).getBuffer(), 614 ((AceDatagramMessage)message).getLength()); 615 switch (parser.getMessageType()) 616 { 617 case AceIPCMessageInterface.HB_MSG: 618 { 619 if (resetSendTimer(true) == false) 620 { 621 notifyUserOfDisc(); 622 dropConnection(); 623 dropSocket(); 624 } 625 else 626 { 627 if (sendHeartbeatMessage() == false) 628 { 629 notifyUserOfDisc(); 630 reconnectWithNewSocket(); 631 } 632 } 633 } 634 break; 635 case AceIPCMessageInterface.DISCONNECT_MSG: 636 { 637 notifyUserOfDisc(); 638 state = STATE_DISCONNECTED; 639 flushMessages(); try 641 { 642 AceTimer.Instance().cancelAllTimers (this); 643 } 644 catch (IOException ex) 645 { 646 System.err.println (getName() + 648 ": AceIPCClient.processConnectedEvent() -- Error canceling timers : " 649 + ex.getMessage()); 650 } 651 finally 652 { 653 if (initConnection (CONNECT_SHORT_TIMER) == false) 654 { 655 dropSocket(); 656 } 657 } 658 } 659 break; 660 case AceIPCMessageInterface.USER_MSG: 661 { 662 AceIPCUserMessage received_message = 663 (AceIPCUserMessage) parser.getMessage(); 664 int to_thread_id = received_message.getToThreadID(); 665 666 AceIPCMessage msg_for_user = new AceIPCMessage 671 (AceIPCMessage.MESSAGE_RECEIVED, 672 (to_thread_id > 0 ? true : false), 673 this, 674 received_message.getFromThreadID(), 675 received_message.getBytes(), 676 received_message.userDataOffset(), 677 received_message.userDataLength(), 678 ((AceDatagramMessage)message).getAddress(), 679 ((AceDatagramMessage)message).getPort(), 680 userParm); 681 682 if (to_thread_id > 0) 683 { 684 AceThread to_thread = AceThread.getAceThreadObject (to_thread_id); 685 if (to_thread != null) 686 { 687 if (to_thread.sendMessage (msg_for_user) == false) 688 { 689 System.err.println (getName() 691 + ": AceIPCClient.processConnectedEvent() -- Could not send solicited IPC message to thread id = " 692 + to_thread_id 693 + " : " 694 + getErrorMessage()); 695 } 696 } 697 else 698 { 699 } 704 } 705 else 706 { 707 if (unsolMsgHandler != null) 708 { 709 if (unsolMsgHandler.sendMessage (msg_for_user) == false) 710 { 711 System.err.println (getName() 713 + ": AceIPCClient.processConnectedEvent() -- Could not send IPC message to the unsolicited msg handler thread : " 714 + getErrorMessage()); 715 } 716 } 717 } 718 } 719 break; 720 default: 721 { 722 System.err.println (getName() + 724 ": AceIPCClient.processConnectedEvent() -- Unexpected message type received : " 725 + parser.getMessageType() 726 + ", msg follows: " + '\n' 727 + AceIPCMessage.dumpRawBytes 728 (((AceDatagramMessage)message).getBuffer(), 729 0, 730 ((AceDatagramMessage)message).getLength())); 731 notifyUserOfDisc(); 732 reconnect(); 733 } 734 break; 735 } 736 } 737 catch (AceException ex) 738 { 739 System.err.println (getName() + 741 ": AceIPCClient.processConnectedEvent() -- Error parsing message, AceException : " 742 + ex.getMessage() 743 + ", msg follows: " + '\n' 744 + AceIPCMessage.dumpRawBytes 745 (((AceDatagramMessage)message).getBuffer(), 746 0, 747 ((AceDatagramMessage)message).getLength())); 748 return; 749 } 750 } 751 else 752 { 753 notifyUserOfDisc(); 754 reconnectWithNewSocket(); 755 } 756 } 757 else if ((message instanceof AceTimerMessage) == true) 758 { 759 if (((AceTimerMessage)message).getTimerId() == sendTimerId) 760 { 761 if (resetSendTimer(false) == false) 762 { 763 notifyUserOfDisc(); 764 dropConnection(); 765 dropSocket(); 766 } 767 else 768 { 769 if (sendHeartbeatMessage() == false) 770 { 771 notifyUserOfDisc(); 772 reconnectWithNewSocket(); 773 } 774 } 775 } 776 else if (((AceTimerMessage)message).getTimerId() == receiveTimerId) 777 { 778 System.err.println (getName() + 780 ": AceIPCClient.processConnectedEvent() -- Receive timer expired: LOST HEARTBEAT"); 781 notifyUserOfDisc(); 782 reconnectWithNewSocket(); 783 } 784 else 785 { 786 System.err.println (getName() + 788 ": AceIPCClient.processConnectedEvent() -- Message received with unexpected timer ID = " 789 + ((AceTimerMessage)message).getTimerId()); 790 } 791 } 792 else 793 { 794 System.err.println (getName() + 796 ": AceIPCClient.processConnectedEvent() -- Unexpected Ace message type encountered : " 797 + message.messageType()); 798 } 799 800 } 801 802 private void notifyUserOfDisc () 803 { 804 if (disconnectHandler != null) 805 { 806 AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.DISCONNECT, 807 this, 808 serverAddress, 809 serverPort, 810 userParm); 811 if (disconnectHandler.sendMessage (msg_for_user) == false) 812 { 813 System.err.println (getName() 815 + ": AceIPCClient.notifyUserOfDisc() -- Could not send IPC message to the user disconnect handler thread : " 816 + getErrorMessage()); 817 } 818 } 819 } 820 821 private void processWaitingToRetryEvent (AceMessageInterface message) 822 { 823 if ((message instanceof AceTimerMessage) == true) 824 { 825 if (initConnection (CONNECT_SHORT_TIMER) == false) 826 { 827 dropSocket(); 828 } 829 } 830 else 831 { 832 System.err.println (getName() + 834 ": AceIPCClient.processWaitingToRetryEvent() -- Unexpected Ace message type encountered : " 835 + message.messageType()); 836 } 837 } 838 839 private boolean sendConnectRequestMessage () 840 { 841 return (sendMessage (new AceIPCConnReqMessage(registrationData, 842 regDataOffset, 843 regDataLength))); 844 } 845 846 private void sendDisconnectMessage () 847 { 848 boolean status = sendMessage (new AceIPCDiscMessage()); 849 } 850 851 private boolean sendHeartbeatMessage () 852 { 853 return (sendMessage (new AceIPCHeartbeatMessage())); 854 } 855 856 private boolean sendMessage (AceIPCMessageInterface message) 857 { 858 DatagramPacket dp = new DatagramPacket (message.getBytes(), 859 message.getLength(), 860 serverAddress, 861 serverPort); 862 863 try 864 { 865 socket.send (dp); 866 } 876 catch (IOException ex) 877 { 878 System.err.println (getName() + 880 ": AceIPCClient.sendMessage() -- IOException sending message on socket, error : " 881 + ex.getMessage() 882 + ", dest address = " 883 + serverAddress.toString() 884 + ", dest port = " 885 + serverPort 886 + ", message follows: \n" 887 + message.traceIPCMessage(true)); 888 return false; 889 } 890 891 return true; 892 } 893 894 private boolean resetSendTimer (boolean currently_running) 895 { 896 if (currently_running == true) 897 { 898 stopTimer (sendTimerId); 899 } 900 901 try 902 { 903 sendTimerId = AceTimer.Instance().startTimer (hbInterval, 904 this, 905 0); 906 if (sendTimerId < 0) 907 { 908 System.err.println (getName() + 910 ": AceIPCClient.resetSendTimer() -- Failure starting timer, returned ID = " 911 + sendTimerId); 912 return false; 913 } 914 } 915 catch (IOException ex) 916 { 917 System.err.println (getName() + 919 ": AceIPCClient.resetSendTimer() -- IOException starting timer : " 920 + ex.getMessage()); 921 return false; 922 } 923 924 return true; 925 } 926 927 public boolean sendIPCMessage (byte[] message, int offset, int len, 928 int to_thread_id, AceThread sender) 929 { 930 Thread parent_thread = null; 931 if (sender == null) 932 { 933 parent_thread = Thread.currentThread(); 934 } 935 else 936 { 937 parent_thread = sender; 938 } 939 940 if ((parent_thread instanceof AceThread) == false) 941 { 942 writeErrorMessage ("The calling thread must be an instance of AceThread"); 943 return false; 944 } 945 946 boolean retval = true; 947 948 synchronized (ipcLock) 949 { 950 if (state != STATE_CONNECTED) 951 { 952 writeErrorMessage ("The client is not currently connected"); 953 retval = false; 954 } 955 else if (resetSendTimer(true) == false) 956 { 957 dropConnection(); 958 dropSocket(); 959 writeErrorMessage ("Fatal timing error encountered"); 960 retval = false; 961 } 962 else 963 { 964 AceIPCUserMessage ipc_msg = new AceIPCUserMessage (to_thread_id, 965 ((AceThread)parent_thread).getAceThreadId(), 966 message, 967 offset, 968 len); 969 970 if (sendMessage (ipc_msg) == false) 971 { 972 reconnectWithNewSocket(); 973 writeErrorMessage ("Socket error sending message, attempting reconnect"); 974 retval = false; 975 } 976 } 977 } 978 979 return retval; 980 } 981 982 public boolean sendIPCMessage (byte[] message, int offset, int len, int to_thread_id) 983 { 984 return sendIPCMessage (message, offset, len, to_thread_id, null); 985 } 986 987 public boolean sendIPCMessage (byte[] message, int offset, int len) 988 { 989 return sendIPCMessage (message, offset, len, 0, null); 990 } 991 992 public boolean sendIPCMessage (byte[] message, int offset, int len, AceThread sender) 993 { 994 return sendIPCMessage (message, offset, len, 0, sender); 995 } 996 997 private static final int CONNECT_SHORT_TIMER = 5 * 1000; 998 private static final int CONNECT_LONG_TIMER = 15 * 1000; 999 private static final int TRY_AGAIN_LATER_TIMER = 60 * 1000; 1000 private static final int MAX_INIT_FAILURES_IN_A_ROW = 5; 1001 1002 private static final int STATE_CONNECTING = 1; 1003 private static final int STATE_CONNECTED = 2; 1004 private static final int STATE_DISCONNECTED = 3; 1005 private static final int STATE_WAITING_BEFORE_RETRY = 4; 1006 1007 private DatagramSocket socket = null; 1008 private AceDatagram sockListener = null; 1009 private Object ipcLock = new Object (); 1010 private InetAddress serverAddress; 1011 private int serverPort; 1012 private int hbInterval; 1013 private AceThread unsolMsgHandler = null; 1014 private AceThread connectHandler = null; 1015 private AceThread disconnectHandler = null; 1016 private byte[] registrationData = null; 1017 private int regDataOffset = 0; 1018 private int regDataLength = 0; 1019 private long userParm; 1020 1021 private int state = STATE_DISCONNECTED; 1022 private int sendTimerId = -1; 1023 private int receiveTimerId = -1; 1024 1025 public static void main (String [] args) 1027 { 1028 1029 class ClientUser extends AceThread 1030 { 1031 class DataSender extends AceThread 1032 { 1033 public DataSender (int send_data_interval, 1034 ClientUser parent) 1035 throws IOException 1036 { 1037 super (); 1038 sendInterval = send_data_interval; 1039 this.parent = parent; 1040 System.out.println ("DATA SENDER THREAD ID = " + getAceThreadId()); 1041 } 1042 1043 public void dispose() 1044 { 1045 } 1046 1047 public void run() 1048 { 1049 int msg_counter = 0; 1050 byte[] msg_data = new byte[4]; 1051 1052 try 1053 { 1054 while (true) 1055 { 1056 sleep (sendInterval); 1057 AceInputSocketStream.intToBytesMsbFirst (++msg_counter, 1058 msg_data, 1059 0); 1060 parent.sendMessage (msg_data, 0, msg_data.length); 1061 } 1062 } 1063 catch (InterruptedException ex) 1064 { 1065 System.err.println ("DataSender sleep interrupted"); 1066 } 1067 } 1068 1069 private int sendInterval; 1070 private ClientUser parent; 1071 } 1072 1073 1074 public ClientUser (String name, 1075 int port, 1076 String server_hostname, 1077 int send_data_interval) 1078 throws IOException, AceException 1079 { 1080 super(name); 1081 1082 1094 byte[] reg_data = {2, 4, 6, 8}; 1096 ipcClient = new AceIPCClient (1000, 1097 name, 1098 server_hostname, 1099 port, 1100 this, 1101 this, 1102 this, 1103 reg_data, 1104 0, 1105 reg_data.length); 1106 1107 if (send_data_interval > 0) 1108 { 1109 dataSender = new DataSender (send_data_interval, this); 1110 } 1111 1112 System.out.println (name + " THREAD ID = " + getAceThreadId()); 1113 } 1114 1115 public void run() 1116 { 1117 ipcClient.start(); 1118 1119 if (dataSender != null) 1120 { 1121 dataSender.start(); 1122 } 1123 1124 while (true) 1125 { 1126 AceMessageInterface message = ipcClient.waitIPCMessage(); 1128 if (message == null) 1129 { 1130 System.err.println (getName() + 1131 " Null message encountered"); 1132 continue; 1133 } 1134 else if ((message instanceof AceSignalMessage) == true) 1135 { 1136 System.out.println (getName() + 1137 " Signal received, ID = " 1138 + ((AceSignalMessage)message).getSignalId() 1139 + ", signal message = " 1140 + ((AceSignalMessage)message).getMessage()); 1141 ipcClient.dispose(); 1142 if (dataSender != null) 1143 { 1144 dataSender.dispose(); 1145 } 1146 super.dispose(); 1147 break; 1148 } 1149 1150 if ((message instanceof AceIPCMessage) == true) 1151 { 1152 AceIPCMessage msg = (AceIPCMessage) message; 1153 switch (msg.getEvent()) 1154 { 1155 case AceIPCMessage.CONNECTION_ESTABLISHED: 1156 { 1157 InetAddress addr = msg.getFarEndAddress(); 1158 int port = msg.getFarEndPort(); 1159 System.out.println (getName() + 1160 '_' + getAceThreadId() + (new Date()) + ' ' + 1161 (new Date().getTime() & 0xFFFF) + 1162 " CONNECTION ESTABLISHED WITH SERVER ADDR = " 1163 + addr 1164 + ", PORT = " 1165 + port 1166 + ", userparm=" 1167 + msg.getUserParm()); 1168 } 1169 break; 1170 case AceIPCMessage.DISCONNECT: 1171 { 1172 System.out.println (getName() + 1173 '_' + getAceThreadId() + (new Date()) + ' ' + 1174 (new Date().getTime() & 0xFFFF) + 1175 " CONNECTION DISCONNECTED, SERVER ADDR = " 1176 + msg.getFarEndAddress() 1177 + ", PORT = " 1178 + msg.getFarEndPort() 1179 + ", userparm=" 1180 + msg.getUserParm()); 1181 } 1182 break; 1183 case AceIPCMessage.MESSAGE_RECEIVED: 1184 { 1185 int msg_num = (int) AceInputSocketStream.octetsToIntMsbFirst (msg.getMessage(), 1186 msg.getUserDataOffset(), 1187 msg.getUserDataLength()); 1188 1189 System.out.println (getName() + 1190 '_' + getAceThreadId() + (new Date()) + ' ' + 1191 (new Date().getTime() & 0xFFFF) + 1192 " RECEIVED " + 1193 ((msg.solicitedMessage() == true) ? "solicited " : "unsolicited ") + 1194 "senderThreadID=" + msg.getSenderThreadId() + 1195 " userparm=" + msg.getUserParm() + 1196 " : " + 1197 msg_num); 1198 1199 if (dataSender == null) 1200 { 1201 byte[] reply = new byte[4]; 1203 AceInputSocketStream.intToBytesMsbFirst (++msg_num, 1204 reply, 1205 0); 1206 if (ipcClient.sendIPCMessage (reply, 0, reply.length, 1207 msg.getSenderThreadId(), 1208 this) == false) 1209 { 1210 System.err.println (getName() + (new Date()) + ' ' + 1211 (new Date().getTime() & 0xFFFF) + 1212 " Message sending failed : " 1213 + getErrorMessage()); 1214 } 1215 } 1216 } 1217 break; 1218 default: 1219 { 1220 System.err.println (getName() + 1221 " Unexpected IPC message event encountered : " 1222 + msg.getEvent()); 1223 } 1224 break; 1225 } 1226 } 1227 else 1228 { 1229 System.err.println (getName() + 1230 " Unexpected Ace message type encountered : " 1231 + message.messageType()); 1232 } 1233 } 1234 1235 } 1236 1237 public void sendMessage (byte[] msg_data, int offset, int length) 1238 { 1239 if (ipcClient.sendIPCMessage (msg_data, offset, length, this) == false) 1240 { 1241 System.err.println (getName() + (new Date()) + ' ' + 1242 (new Date().getTime() & 0xFFFF) + 1243 " Message sending failed : " 1244 + ((AceThread)(Thread.currentThread())).getErrorMessage()); 1245 } 1246 } 1247 1248 private AceIPCClient ipcClient; 1249 private DataSender dataSender = null; 1250 } 1251 1252 1253 1254 1255 try 1256 { 1257 int port = 3000; 1258 String hostname = "localhost"; 1259 int send_data_interval = 0; 1260 1261 if ((args.length != 0) && (args.length != 3)) 1262 { 1263 System.out.println ("Arguments (all or nothing): <server port>, <server hostname>, <send user data interval(ms) - if 0, sends upon receipt>"); 1264 System.out.println ("Defaults: port=" + port + ", hostname=" + hostname + ", send user data interval=" + send_data_interval); 1265 System.exit (0); 1266 } 1267 1268 if (args.length == 3) 1269 { 1270 try 1271 { 1272 port = Integer.parseInt (args[0]); 1273 hostname = args[1]; 1274 send_data_interval = Integer.parseInt (args[2]); 1275 } 1276 catch (NumberFormatException ex) 1277 { 1278 System.err.println ("Input must be numeric"); 1279 System.exit (1); 1280 } 1281 } 1282 1283 AceTimer.Instance().start(); 1285 ClientUser user = new ClientUser ("TestClient", 1286 port, 1287 hostname, 1288 send_data_interval); 1289 1290 user.start(); 1291 user.join(); 1292 System.exit (0); 1293 } 1294 catch (IOException ex) 1295 { 1296 System.err.println ("IOException in main " + ex.getMessage()); 1297 System.exit (1); 1298 } 1299 catch (AceException ex) 1300 { 1301 System.err.println ("AceException in main " + ex.getMessage()); 1302 System.exit (1); 1303 } 1304 catch (InterruptedException ex) 1305 { 1306 System.err.println ("InterruptedException in main " + ex.getMessage()); 1307 System.exit (1); 1308 } 1309 } 1310} 1311 | Popular Tags |