KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > quikj > server > framework > AceIPCClient


1 package com.quikj.server.framework;
2
3 import java.io.*;
4 import java.net.*;
5 import java.util.*;
6
7 public class AceIPCClient extends AceThread implements AceIPCEntityInterface
8 {
9     // LOCKING:
10
// Activity is locked to prevent any conflict between the following, using ipcLock:
11
// - public methods called by ipc user (inside dispose(), sendIPCMessage())
12
// - internal (this class) processing (inside run())
13

14     public AceIPCClient (long user_parm,
15              String JavaDoc name,
16              String JavaDoc server_host,
17              int server_port,
18              AceThread unsol_msg_handler,
19              AceThread connect_handler,
20              AceThread disconnect_handler,
21              byte[] registration_data,
22              int reg_data_offset,
23              int reg_data_len)
24     throws IOException, AceException
25     {
26     super(name);
27     
28     try
29         {
30         serverAddress = InetAddress.getByName (server_host);
31         }
32     catch (UnknownHostException ex)
33         {
34         throw new AceException ("Could not resolve inet address of host "
35                     + server_host
36                     + ", error " + ex.getMessage());
37         }
38     
39     if (initNewSocket (user_parm) == false)
40         {
41         throw new AceException ("Datagram socket initialization failed");
42         }
43     
44     serverPort = server_port;
45     unsolMsgHandler = unsol_msg_handler;
46     connectHandler = connect_handler;
47     disconnectHandler = disconnect_handler;
48     registrationData = registration_data;
49     regDataOffset = reg_data_offset;
50     regDataLength = reg_data_len;
51     userParm = user_parm;
52     
53     // print trace message
54
//System.out.println (name +
55
// ": AceIPCClient.AceIPCClient() -- IPC Client created");
56
}
57     
58     public void dispose()
59     {
60     synchronized (ipcLock)
61         {
62         dropConnection();
63         dropSocket();
64         }
65     
66     if (this.isAlive() == true)
67         {
68         if (interruptWait (AceSignalMessage.SIGNAL_TERM,
69                    "Normal IPC Client dispose") == false)
70             {
71             // print log message
72
System.err.println (getName()
73                         + ": AceIPCClient.dispose() -- Could not interrupt own wait : "
74                         + getErrorMessage());
75             
76             super.dispose();
77             }
78         }
79     else
80         {
81         super.dispose();
82         
83         // print trace message
84
//System.out.println (getName() +
85
// ": AceIPCClient.dispose() -- IPC Client disposed");
86
}
87     }
88     
89     public void run()
90     {
91     if (initConnection (CONNECT_SHORT_TIMER) == false)
92         {
93         dropSocket();
94         }
95     else
96         {
97         sockListener.start();
98         }
99     
100     while (true)
101         {
102         AceMessageInterface message = waitMessage();
103         if (message == null)
104             {
105             // print trace message
106
//System.out.println (getName() +
107
// ": AceIPCClient.run() -- Null message encountered, state = "
108
// + state);
109
continue;
110             }
111         else if ((message instanceof AceSignalMessage) == true)
112             {
113             // print trace message
114
//System.out.println (getName() +
115
// ": AceIPCClient.run() -- Signal received, ID = "
116
// + ((AceSignalMessage)message).getSignalId()
117
// + ", signal message = "
118
// + ((AceSignalMessage)message).getMessage());
119
super.dispose();
120             break;
121             }
122         
123         synchronized (ipcLock)
124             {
125             switch (state)
126                 {
127                 case STATE_CONNECTING:
128                 {
129                     processConnectingEvent (message);
130                 }
131                 break;
132                 case STATE_CONNECTED:
133                 {
134                     processConnectedEvent (message);
135                 }
136                 break;
137                 case STATE_WAITING_BEFORE_RETRY:
138                 {
139                     processWaitingToRetryEvent (message);
140                 }
141                 break;
142                 case STATE_DISCONNECTED:
143                 break;
144                 default:
145                 {
146                     // print log message
147
System.err.println (getName() +
148                             ": AceIPCClient.run() -- Bad state encountered : "
149                             + state);
150                     reconnect();
151                 }
152                 break;
153                 }
154             }
155         }
156     }
157     
158     public AceMessageInterface waitIPCMessage ()
159     {
160     Thread JavaDoc thr = Thread.currentThread();
161     
162     if ((thr instanceof AceThread) == false)
163         {
164         writeErrorMessage ("This method is not being called from an object which is a sub-class of type AceThread");
165         return null;
166         }
167     
168     AceThread cthread = (AceThread)thr;
169     
170     while (true)
171         {
172         AceMessageInterface msg_received = cthread.waitMessage();
173         if ((msg_received instanceof AceIPCMessage) == true)
174             {
175             if (((AceIPCMessage)msg_received).getEntity() == this)
176                 {
177                 return msg_received;
178                 }
179             }
180         else if ((msg_received instanceof AceSignalMessage) == true)
181             {
182             return msg_received;
183             }
184         }
185     }
186     
187     private boolean initNewSocket (long user_parm)
188     {
189     // handles socket, sockListener
190
try
191         {
192         socket = new DatagramSocket ();
193         }
194     catch (SocketException ex)
195         {
196         // print log meessage
197
System.err.println (getName() +
198                     ": AceIPCClient.initNewSocket() -- Socket error creating DatagramSocket : "
199                     + ex.getMessage());
200         return false;
201         }
202     
203     try
204         {
205         sockListener = new AceDatagram (user_parm,
206                         getName() + "_sockListener",
207                         this,
208                         socket,
209                         AceIPCMessage.MAX_IPC_MSG_SIZE);
210         }
211     catch (IOException ex)
212         {
213         // print log message
214
System.err.println (getName() +
215                     ": AceIPCClient.initNewSocket() -- IO error creating AceDatagram : "
216                     + ex.getMessage());
217         socket.close();
218         socket = null;
219         return false;
220         }
221     catch (AceException ex)
222         {
223         // print log message
224
System.err.println (getName() +
225                     ": AceIPCClient.initNewSocket() -- Ace error creating AceDatagram : "
226                     + ex.getMessage());
227         socket.close();
228         socket = null;
229         return false;
230         }
231     
232     return true;
233     }
234
235     private boolean initNewSocket ()
236     {
237     return initNewSocket (userParm);
238     }
239
240     private void dropSocket ()
241     {
242     if (sockListener != null)
243         {
244         //System.out.println ("AceIPCClient.dropSocket() - dropping socket, nulling socket.");
245
sockListener.dispose();
246         sockListener = null;
247         socket = null;
248         }
249     }
250
251     private boolean initConnection (int timer_value)
252     {
253     // handles sending conn msg, setting state, start timing
254
// returns false if:
255
// - message sending fails MAX_INIT_FAILURES_IN_A_ROW times, each time on a new socket
256
// - can't init a new socket
257
// - can't do timing
258

259     state = STATE_DISCONNECTED; // ensure init state, this method called from anywhere
260

261     for (int i = 0; i < MAX_INIT_FAILURES_IN_A_ROW; i++)
262         {
263         if (sendConnectRequestMessage () == true)
264             {
265             try
266                 {
267                 sendTimerId = AceTimer.Instance().startTimer (timer_value,
268                                           this,
269                                           0);
270                 if (sendTimerId < 0)
271                     {
272                     // print log message
273
System.err.println (getName() +
274                                 ": AceIPCClient.initConnection() -- Failure starting timer, returned ID = "
275                                 + sendTimerId);
276                     sendDisconnectMessage();
277                     return false;
278                     }
279                 else
280                     {
281                     state = STATE_CONNECTING;
282                     return true;
283                     }
284                 }
285             catch (IOException ex)
286                 {
287                 // print log message
288
System.err.println (getName() +
289                             ": AceIPCClient.initConnection() -- IOException starting timer : "
290                             + ex.getMessage());
291                 sendDisconnectMessage();
292                 return false;
293                 }
294             }
295         else
296             {
297             dropSocket();
298             if (initNewSocket() == false)
299                 {
300                 return false;
301                 }
302             }
303         }
304     
305     return false;
306     }
307
308     private void dropConnection ()
309     {
310     dropConnection (true);
311     }
312         
313     private void dropConnection (boolean send_disc_msg)
314     {
315     if (send_disc_msg == true)
316         {
317         switch (state)
318             {
319             case STATE_CONNECTED:
320             case STATE_CONNECTING:
321             {
322                 sendDisconnectMessage();
323             }
324             break;
325             
326             default:
327             break;
328             }
329         }
330     
331     state = STATE_DISCONNECTED;
332     
333     flushMessages(); // except don't want to flush signal
334

335     try
336         {
337         AceTimer.Instance().cancelAllTimers (this);
338         }
339     catch (IOException ex)
340         {
341         // print log message
342
System.err.println (getName() +
343                     ": AceIPCClient.dropConnection() -- Error canceling timers : "
344                     + ex.getMessage());
345         return;
346         }
347     }
348     
349     private void reconnect () // reuse current socket
350
{
351     dropConnection (true);
352     if (initConnection (CONNECT_SHORT_TIMER) == false)
353         {
354         dropSocket();
355         }
356     }
357
358     private void reconnectWithNewSocket ()
359     {
360     dropConnection (false);
361
362     dropSocket();
363
364     if (initNewSocket() == true)
365         {
366         if (initConnection (CONNECT_SHORT_TIMER) == false)
367             {
368             dropSocket();
369             }
370         else
371             {
372             sockListener.start();
373             }
374         }
375     }
376     
377     private void stopTimer (int timer_id)
378     {
379     try
380         {
381         boolean status = AceTimer.Instance().cancelTimer (timer_id, this);
382         }
383     catch (IOException ex)
384         {
385         // print log message
386
System.err.println (getName() +
387                     ": AceIPCClient.stopTimer() -- IOException canceling timer ID = "
388                     + timer_id
389                     + " : "
390                     + ex.getMessage());
391         return;
392         }
393     }
394
395     
396
397     private void processConnectingEvent (AceMessageInterface message)
398     {
399     //System.out.println ("IPCClient processConnectingEvent() - message : " + message.messageType());
400
if ((message instanceof AceDatagramMessage) == true)
401         {
402         
403         stopTimer (sendTimerId);
404         
405         if (((AceDatagramMessage)message).getStatus() == AceDatagramMessage.READ_COMPLETED)
406             {
407             // a message was received
408
try
409                 {
410                 AceIPCMessageParser parser = new AceIPCMessageParser
411                     (((AceDatagramMessage)message).getBuffer(),
412                      ((AceDatagramMessage)message).getLength());
413                 switch (parser.getMessageType())
414                     {
415                     case AceIPCMessageInterface.CONN_RESP_MSG:
416                     {
417                         processConnRespMessage ((AceIPCConnRespMessage) parser.getMessage());
418                     }
419                     break;
420                     default:
421                     {
422                         state = STATE_DISCONNECTED;
423                         flushMessages(); // except don't want to flush signal
424

425                         if (initConnection (CONNECT_SHORT_TIMER) == false)
426                         {
427                             dropSocket();
428                         }
429                     }
430                     break;
431                     }
432                 }
433             catch (AceException ex)
434                 {
435                 // print log message
436
System.err.println (getName() +
437                             ": AceIPCClient.processConnectingEvent() -- Error parsing message, AceException : "
438                             + ex.getMessage()
439                             + ", msg follows: " + '\n'
440                             + AceIPCMessage.dumpRawBytes
441                             (((AceDatagramMessage)message).getBuffer(),
442                              0,
443                              ((AceDatagramMessage)message).getLength()));
444                 return;
445                 }
446             }
447         else
448             {
449             //System.out.println ("AceIPCClient.processConnectingEvent() - RECEIVED ACEDATAGRAMMESSAGE WITH STATUS = READ_ERROR");
450
reconnectWithNewSocket();
451             }
452         }
453     else if ((message instanceof AceTimerMessage) == true)
454         {
455         if (initConnection (CONNECT_LONG_TIMER) == false)
456             {
457             dropSocket();
458             }
459         }
460     else
461         {
462         // print log message
463
System.err.println (getName() +
464                     ": AceIPCClient.processConnectingEvent() -- Unexpected Ace message type encountered : "
465                     + message.messageType());
466         }
467     }
468     
469     private void processConnRespMessage (AceIPCConnRespMessage conn_message)
470     {
471     switch (conn_message.getStatus())
472         {
473         case AceIPCConnRespMessage.STATUS_OK:
474         {
475             state = STATE_CONNECTED;
476             hbInterval = conn_message.getHbInterval();
477             //System.out.println ("Client: received HB interval = " + hbInterval);
478
// start send/receive timers
479
try
480             {
481                 sendTimerId = AceTimer.Instance().startTimer (hbInterval,
482                                       this,
483                                       0);
484                 receiveTimerId = AceTimer.Instance().startTimer ((hbInterval *
485                                           AceIPCHeartbeatMessage.TOLERANCE_FACTOR),
486                                          this,
487                                          0);
488                 if ((sendTimerId < 0) || (receiveTimerId < 0))
489                 {
490                     // print log message
491
System.err.println (getName() +
492                             ": AceIPCClient.processConnRespMessage() -- Failure starting one or more timers, returned IDs = "
493                             + sendTimerId + ", "
494                             + receiveTimerId);
495                     dropConnection();
496                     dropSocket();
497                     return;
498                 }
499                 else
500                 {
501                     if (sendHeartbeatMessage() == false)
502                     {
503                         reconnectWithNewSocket();
504                     }
505                     else
506                     {
507                         if (connectHandler != null)
508                         {
509                             AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.CONNECTION_ESTABLISHED,
510                                                     this,
511                                                     serverAddress,
512                                                     serverPort,
513                                                     userParm);
514                             if (connectHandler.sendMessage (msg_for_user) == false)
515                             {
516                                 // print log message
517
System.err.println (getName()
518                                         + ": AceIPCClient.processConnRespMessage() -- Could not send IPC message to the user connect handler thread : "
519                                         + getErrorMessage());
520                             }
521                         }
522                     }
523                 }
524             }
525             catch (IOException ex)
526             {
527                 // print log message
528
System.err.println (getName() +
529                         ": AceIPCClient.processConnRespMessage() -- IOException starting one or more timers : "
530                         + ex.getMessage());
531                 dropConnection();
532                 dropSocket();
533                 return;
534             }
535         }
536         break;
537         default:
538         {
539             state = STATE_WAITING_BEFORE_RETRY;
540             try
541             {
542                 sendTimerId = AceTimer.Instance().startTimer (TRY_AGAIN_LATER_TIMER,
543                                       this,
544                                       0);
545                 if (sendTimerId < 0)
546                 {
547                     // print log message
548
System.err.println (getName() +
549                             ": AceIPCClient.processConnRespMessage() -- Failure starting timer, returned ID = "
550                             + sendTimerId);
551                     dropConnection();
552                     dropSocket();
553                     return;
554                 }
555             }
556             catch (IOException ex)
557             {
558                 // print log message
559
System.err.println (getName() +
560                         ": AceIPCClient.processConnRespMessage() -- IOException starting timer : "
561                         + ex.getMessage());
562                 dropConnection();
563                 dropSocket();
564                 return;
565             }
566         }
567         break;
568         }
569     }
570     
571     private void processConnectedEvent (AceMessageInterface message)
572     {
573     if ((message instanceof AceDatagramMessage) == true)
574         {
575         if (((AceDatagramMessage)message).getStatus() == AceDatagramMessage.READ_COMPLETED)
576             {
577             // reset receive message timing
578
stopTimer (receiveTimerId);
579             try
580                 {
581                 receiveTimerId = AceTimer.Instance().startTimer ((hbInterval *
582                                           AceIPCHeartbeatMessage.TOLERANCE_FACTOR),
583                                          this,
584                                          0);
585                 if (receiveTimerId < 0)
586                     {
587                     // print log messagge
588
System.err.println (getName() +
589                                 ": AceIPCClient.processConnectedEvent() -- Failure starting timer, returned ID = "
590                                 + receiveTimerId);
591                     notifyUserOfDisc();
592                     dropConnection();
593                     dropSocket();
594                     return;
595                     }
596                 }
597             catch (IOException ex)
598                 {
599                 // print log message
600
System.err.println (getName() +
601                             ": AceIPCClient.processConnectedEvent() -- IOException starting timer : "
602                             + ex.getMessage());
603                 notifyUserOfDisc();
604                 dropConnection();
605                 dropSocket();
606                 return;
607                 }
608             
609             // handle the message
610
try
611                 {
612                 AceIPCMessageParser parser = new AceIPCMessageParser
613                     (((AceDatagramMessage)message).getBuffer(),
614                      ((AceDatagramMessage)message).getLength());
615                 switch (parser.getMessageType())
616                     {
617                     case AceIPCMessageInterface.HB_MSG:
618                     {
619                         if (resetSendTimer(true) == false)
620                         {
621                             notifyUserOfDisc();
622                             dropConnection();
623                             dropSocket();
624                         }
625                         else
626                         {
627                             if (sendHeartbeatMessage() == false)
628                             {
629                                 notifyUserOfDisc();
630                                 reconnectWithNewSocket();
631                             }
632                         }
633                     }
634                     break;
635                     case AceIPCMessageInterface.DISCONNECT_MSG:
636                     {
637                         notifyUserOfDisc();
638                         state = STATE_DISCONNECTED;
639                         flushMessages(); // except don't want to flush signal
640
try
641                         {
642                             AceTimer.Instance().cancelAllTimers (this);
643                         }
644                         catch (IOException ex)
645                         {
646                             // print log message
647
System.err.println (getName() +
648                                     ": AceIPCClient.processConnectedEvent() -- Error canceling timers : "
649                                     + ex.getMessage());
650                         }
651                         finally
652                         {
653                             if (initConnection (CONNECT_SHORT_TIMER) == false)
654                             {
655                                 dropSocket();
656                             }
657                         }
658                     }
659                     break;
660                     case AceIPCMessageInterface.USER_MSG:
661                     {
662                         AceIPCUserMessage received_message =
663                         (AceIPCUserMessage) parser.getMessage();
664                         int to_thread_id = received_message.getToThreadID();
665                         
666                         //System.out.println (" RECEIVED MESSAGE W/TO_THREAD_ID = " + to_thread_id
667
// + ", FROM_THREAD_ID = " + received_message.getFromThreadID() + ' '
668
// + (new Date()) + ' ' +
669
// (new Date().getTime() & 0xFFFF));
670
AceIPCMessage msg_for_user = new AceIPCMessage
671                         (AceIPCMessage.MESSAGE_RECEIVED,
672                          (to_thread_id > 0 ? true : false),
673                          this,
674                          received_message.getFromThreadID(),
675                          received_message.getBytes(),
676                          received_message.userDataOffset(),
677                          received_message.userDataLength(),
678                          ((AceDatagramMessage)message).getAddress(),
679                          ((AceDatagramMessage)message).getPort(),
680                          userParm);
681                         
682                         if (to_thread_id > 0)
683                         {
684                             AceThread to_thread = AceThread.getAceThreadObject (to_thread_id);
685                             if (to_thread != null)
686                             {
687                                 if (to_thread.sendMessage (msg_for_user) == false)
688                                 {
689                                     // print log message
690
System.err.println (getName()
691                                             + ": AceIPCClient.processConnectedEvent() -- Could not send solicited IPC message to thread id = "
692                                             + to_thread_id
693                                             + " : "
694                                             + getErrorMessage());
695                                 }
696                             }
697                             else
698                             {
699                                 // print trace msg
700
//System.out.println (getName()
701
// + ": AceIPCClient.processConnectedEvent() -- Could not find to-thread, to-thread-id = "
702
// + to_thread_id);
703
}
704                         }
705                         else
706                         {
707                             if (unsolMsgHandler != null)
708                             {
709                                 if (unsolMsgHandler.sendMessage (msg_for_user) == false)
710                                 {
711                                     // print log message
712
System.err.println (getName()
713                                             + ": AceIPCClient.processConnectedEvent() -- Could not send IPC message to the unsolicited msg handler thread : "
714                                             + getErrorMessage());
715                                 }
716                             }
717                         }
718                     }
719                     break;
720                     default:
721                     {
722                         // print log message
723
System.err.println (getName() +
724                                 ": AceIPCClient.processConnectedEvent() -- Unexpected message type received : "
725                                 + parser.getMessageType()
726                                 + ", msg follows: " + '\n'
727                                 + AceIPCMessage.dumpRawBytes
728                                 (((AceDatagramMessage)message).getBuffer(),
729                                  0,
730                                  ((AceDatagramMessage)message).getLength()));
731                         notifyUserOfDisc();
732                         reconnect();
733                     }
734                     break;
735                     }
736                 }
737             catch (AceException ex)
738                 {
739                 // print log message
740
System.err.println (getName() +
741                             ": AceIPCClient.processConnectedEvent() -- Error parsing message, AceException : "
742                             + ex.getMessage()
743                             + ", msg follows: " + '\n'
744                             + AceIPCMessage.dumpRawBytes
745                             (((AceDatagramMessage)message).getBuffer(),
746                              0,
747                              ((AceDatagramMessage)message).getLength()));
748                 return;
749                 }
750             }
751         else
752             {
753             notifyUserOfDisc();
754             reconnectWithNewSocket();
755             }
756         }
757     else if ((message instanceof AceTimerMessage) == true)
758         {
759         if (((AceTimerMessage)message).getTimerId() == sendTimerId)
760             {
761             if (resetSendTimer(false) == false)
762                 {
763                 notifyUserOfDisc();
764                 dropConnection();
765                 dropSocket();
766                 }
767             else
768                 {
769                 if (sendHeartbeatMessage() == false)
770                     {
771                     notifyUserOfDisc();
772                     reconnectWithNewSocket();
773                     }
774                 }
775             }
776         else if (((AceTimerMessage)message).getTimerId() == receiveTimerId)
777             {
778             // print info message
779
System.err.println (getName() +
780                         ": AceIPCClient.processConnectedEvent() -- Receive timer expired: LOST HEARTBEAT");
781             notifyUserOfDisc();
782             reconnectWithNewSocket();
783             }
784         else
785             {
786             // print log message
787
System.err.println (getName() +
788                         ": AceIPCClient.processConnectedEvent() -- Message received with unexpected timer ID = "
789                         + ((AceTimerMessage)message).getTimerId());
790             }
791         }
792     else
793         {
794         // print log message
795
System.err.println (getName() +
796                     ": AceIPCClient.processConnectedEvent() -- Unexpected Ace message type encountered : "
797                     + message.messageType());
798         }
799     
800     }
801     
802     private void notifyUserOfDisc ()
803     {
804     if (disconnectHandler != null)
805         {
806         AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.DISCONNECT,
807                                 this,
808                                 serverAddress,
809                                 serverPort,
810                                 userParm);
811         if (disconnectHandler.sendMessage (msg_for_user) == false)
812             {
813             // print log message
814
System.err.println (getName()
815                         + ": AceIPCClient.notifyUserOfDisc() -- Could not send IPC message to the user disconnect handler thread : "
816                         + getErrorMessage());
817             }
818         }
819     }
820     
821     private void processWaitingToRetryEvent (AceMessageInterface message)
822     {
823     if ((message instanceof AceTimerMessage) == true)
824         {
825         if (initConnection (CONNECT_SHORT_TIMER) == false)
826             {
827             dropSocket();
828             }
829         }
830     else
831         {
832         // print log message
833
System.err.println (getName() +
834                     ": AceIPCClient.processWaitingToRetryEvent() -- Unexpected Ace message type encountered : "
835                     + message.messageType());
836         }
837     }
838     
839     private boolean sendConnectRequestMessage ()
840     {
841     return (sendMessage (new AceIPCConnReqMessage(registrationData,
842                               regDataOffset,
843                               regDataLength)));
844     }
845
846     private void sendDisconnectMessage ()
847     {
848     boolean status = sendMessage (new AceIPCDiscMessage());
849     }
850
851     private boolean sendHeartbeatMessage ()
852     {
853     return (sendMessage (new AceIPCHeartbeatMessage()));
854     }
855     
856     private boolean sendMessage (AceIPCMessageInterface message)
857     {
858     DatagramPacket dp = new DatagramPacket (message.getBytes(),
859                         message.getLength(),
860                         serverAddress,
861                         serverPort);
862     
863     try
864         {
865         socket.send (dp);
866         // print trace message
867
//System.out.println (getName() + (new Date()) + ' ' +
868
// (new Date().getTime() & 0xFFFF) +
869
// ": *** TRACE *** MESSAGE SENT to address = "
870
// + serverAddress.toString()
871
// + ", port = "
872
// + serverPort
873
// + " ***** "
874
// + message.traceIPCMessage(true));
875
}
876     catch (IOException ex)
877         {
878         // print log message
879
System.err.println (getName() +
880                     ": AceIPCClient.sendMessage() -- IOException sending message on socket, error : "
881                     + ex.getMessage()
882                     + ", dest address = "
883                     + serverAddress.toString()
884                     + ", dest port = "
885                     + serverPort
886                     + ", message follows: \n"
887                     + message.traceIPCMessage(true));
888         return false;
889         }
890     
891     return true;
892     }
893
894     private boolean resetSendTimer (boolean currently_running)
895     {
896     if (currently_running == true)
897         {
898         stopTimer (sendTimerId);
899         }
900
901     try
902         {
903         sendTimerId = AceTimer.Instance().startTimer (hbInterval,
904                                   this,
905                                   0);
906         if (sendTimerId < 0)
907             {
908             // print log message
909
System.err.println (getName() +
910                         ": AceIPCClient.resetSendTimer() -- Failure starting timer, returned ID = "
911                         + sendTimerId);
912             return false;
913             }
914         }
915     catch (IOException ex)
916         {
917         // print log message
918
System.err.println (getName() +
919                     ": AceIPCClient.resetSendTimer() -- IOException starting timer : "
920                     + ex.getMessage());
921         return false;
922         }
923     
924     return true;
925     }
926     
927     public boolean sendIPCMessage (byte[] message, int offset, int len,
928                    int to_thread_id, AceThread sender)
929     {
930     Thread JavaDoc parent_thread = null;
931     if (sender == null)
932         {
933         parent_thread = Thread.currentThread();
934         }
935     else
936         {
937         parent_thread = sender;
938         }
939     
940     if ((parent_thread instanceof AceThread) == false)
941         {
942         writeErrorMessage ("The calling thread must be an instance of AceThread");
943         return false;
944         }
945             
946     boolean retval = true;
947     
948     synchronized (ipcLock)
949         {
950         if (state != STATE_CONNECTED)
951             {
952             writeErrorMessage ("The client is not currently connected");
953             retval = false;
954             }
955         else if (resetSendTimer(true) == false)
956             {
957             dropConnection();
958             dropSocket();
959             writeErrorMessage ("Fatal timing error encountered");
960             retval = false;
961             }
962         else
963             {
964             AceIPCUserMessage ipc_msg = new AceIPCUserMessage (to_thread_id,
965                                        ((AceThread)parent_thread).getAceThreadId(),
966                                        message,
967                                        offset,
968                                        len);
969             
970             if (sendMessage (ipc_msg) == false)
971                 {
972                 reconnectWithNewSocket();
973                 writeErrorMessage ("Socket error sending message, attempting reconnect");
974                 retval = false;
975                 }
976             }
977         }
978     
979     return retval;
980     }
981     
982     public boolean sendIPCMessage (byte[] message, int offset, int len, int to_thread_id)
983     {
984     return sendIPCMessage (message, offset, len, to_thread_id, null);
985     }
986
987     public boolean sendIPCMessage (byte[] message, int offset, int len)
988     {
989     return sendIPCMessage (message, offset, len, 0, null);
990     }
991     
992     public boolean sendIPCMessage (byte[] message, int offset, int len, AceThread sender)
993     {
994     return sendIPCMessage (message, offset, len, 0, sender);
995     }
996
997     private static final int CONNECT_SHORT_TIMER = 5 * 1000;
998     private static final int CONNECT_LONG_TIMER = 15 * 1000;
999     private static final int TRY_AGAIN_LATER_TIMER = 60 * 1000;
1000    private static final int MAX_INIT_FAILURES_IN_A_ROW = 5;
1001
1002    private static final int STATE_CONNECTING = 1;
1003    private static final int STATE_CONNECTED = 2;
1004    private static final int STATE_DISCONNECTED = 3;
1005    private static final int STATE_WAITING_BEFORE_RETRY = 4;
1006    
1007    private DatagramSocket socket = null;
1008    private AceDatagram sockListener = null;
1009    private Object JavaDoc ipcLock = new Object JavaDoc();
1010    private InetAddress serverAddress;
1011    private int serverPort;
1012    private int hbInterval;
1013    private AceThread unsolMsgHandler = null;
1014    private AceThread connectHandler = null;
1015    private AceThread disconnectHandler = null;
1016    private byte[] registrationData = null;
1017    private int regDataOffset = 0;
1018    private int regDataLength = 0;
1019    private long userParm;
1020    
1021    private int state = STATE_DISCONNECTED;
1022    private int sendTimerId = -1;
1023    private int receiveTimerId = -1;
1024
1025    // test program
1026
public static void main (String JavaDoc[] args)
1027    {
1028
1029    class ClientUser extends AceThread
1030    {
1031        class DataSender extends AceThread
1032        {
1033        public DataSender (int send_data_interval,
1034                   ClientUser parent)
1035            throws IOException
1036        {
1037            super ();
1038            sendInterval = send_data_interval;
1039            this.parent = parent;
1040            System.out.println ("DATA SENDER THREAD ID = " + getAceThreadId());
1041        }
1042        
1043        public void dispose()
1044        {
1045        }
1046        
1047        public void run()
1048        {
1049            int msg_counter = 0;
1050            byte[] msg_data = new byte[4];
1051        
1052            try
1053            {
1054                while (true)
1055                {
1056                    sleep (sendInterval);
1057                    AceInputSocketStream.intToBytesMsbFirst (++msg_counter,
1058                                       msg_data,
1059                                       0);
1060                    parent.sendMessage (msg_data, 0, msg_data.length);
1061                }
1062            }
1063            catch (InterruptedException JavaDoc ex)
1064            {
1065                System.err.println ("DataSender sleep interrupted");
1066            }
1067        }
1068        
1069        private int sendInterval;
1070        private ClientUser parent;
1071        }
1072    
1073    
1074        public ClientUser (String JavaDoc name,
1075                   int port,
1076                   String JavaDoc server_hostname,
1077                   int send_data_interval)
1078        throws IOException, AceException
1079        {
1080        super(name);
1081        
1082// // Connect with no registration data
1083
// ipcClient = new AceIPCClient (1000,
1084
// name,
1085
// server_hostname,
1086
// port,
1087
// this,
1088
// this,
1089
// this,
1090
// null,
1091
// 0,
1092
// 0);
1093

1094        // OR, Connect with registration data
1095
byte[] reg_data = {2, 4, 6, 8};
1096        ipcClient = new AceIPCClient (1000,
1097                          name,
1098                          server_hostname,
1099                          port,
1100                          this,
1101                          this,
1102                          this,
1103                          reg_data,
1104                          0,
1105                          reg_data.length);
1106        
1107        if (send_data_interval > 0)
1108            {
1109            dataSender = new DataSender (send_data_interval, this);
1110            }
1111
1112        System.out.println (name + " THREAD ID = " + getAceThreadId());
1113        }
1114
1115        public void run()
1116        {
1117        ipcClient.start();
1118
1119        if (dataSender != null)
1120            {
1121            dataSender.start();
1122            }
1123
1124        while (true)
1125            {
1126            //AceMessageInterface message = waitMessage(); // this or next
1127
AceMessageInterface message = ipcClient.waitIPCMessage();
1128            if (message == null)
1129                {
1130                System.err.println (getName() +
1131                            " Null message encountered");
1132                continue;
1133                }
1134            else if ((message instanceof AceSignalMessage) == true)
1135                {
1136                System.out.println (getName() +
1137                            " Signal received, ID = "
1138                            + ((AceSignalMessage)message).getSignalId()
1139                            + ", signal message = "
1140                            + ((AceSignalMessage)message).getMessage());
1141                ipcClient.dispose();
1142                if (dataSender != null)
1143                    {
1144                    dataSender.dispose();
1145                    }
1146                super.dispose();
1147                break;
1148                }
1149            
1150            if ((message instanceof AceIPCMessage) == true)
1151                {
1152                AceIPCMessage msg = (AceIPCMessage) message;
1153                switch (msg.getEvent())
1154                    {
1155                    case AceIPCMessage.CONNECTION_ESTABLISHED:
1156                    {
1157                        InetAddress addr = msg.getFarEndAddress();
1158                        int port = msg.getFarEndPort();
1159                        System.out.println (getName() +
1160                                '_' + getAceThreadId() + (new Date()) + ' ' +
1161                                (new Date().getTime() & 0xFFFF) +
1162                                " CONNECTION ESTABLISHED WITH SERVER ADDR = "
1163                                + addr
1164                                + ", PORT = "
1165                                + port
1166                                + ", userparm="
1167                                + msg.getUserParm());
1168                    }
1169                    break;
1170                    case AceIPCMessage.DISCONNECT:
1171                    {
1172                        System.out.println (getName() +
1173                                '_' + getAceThreadId() + (new Date()) + ' ' +
1174                                (new Date().getTime() & 0xFFFF) +
1175                                " CONNECTION DISCONNECTED, SERVER ADDR = "
1176                                + msg.getFarEndAddress()
1177                                + ", PORT = "
1178                                + msg.getFarEndPort()
1179                                + ", userparm="
1180                                + msg.getUserParm());
1181                    }
1182                    break;
1183                    case AceIPCMessage.MESSAGE_RECEIVED:
1184                    {
1185                        int msg_num = (int) AceInputSocketStream.octetsToIntMsbFirst (msg.getMessage(),
1186                                                    msg.getUserDataOffset(),
1187                                                    msg.getUserDataLength());
1188                        
1189                        System.out.println (getName() +
1190                                '_' + getAceThreadId() + (new Date()) + ' ' +
1191                                (new Date().getTime() & 0xFFFF) +
1192                                " RECEIVED " +
1193                                ((msg.solicitedMessage() == true) ? "solicited " : "unsolicited ") +
1194                                "senderThreadID=" + msg.getSenderThreadId() +
1195                                " userparm=" + msg.getUserParm() +
1196                                " : " +
1197                                msg_num);
1198                        
1199                        if (dataSender == null)
1200                        {
1201                            // reply to the received message
1202
byte[] reply = new byte[4];
1203                            AceInputSocketStream.intToBytesMsbFirst (++msg_num,
1204                                               reply,
1205                                               0);
1206                            if (ipcClient.sendIPCMessage (reply, 0, reply.length,
1207                                          msg.getSenderThreadId(),
1208                                          this) == false)
1209                            {
1210                                System.err.println (getName() + (new Date()) + ' ' +
1211                                        (new Date().getTime() & 0xFFFF) +
1212                                        " Message sending failed : "
1213                                        + getErrorMessage());
1214                            }
1215                        }
1216                    }
1217                    break;
1218                    default:
1219                    {
1220                        System.err.println (getName() +
1221                                " Unexpected IPC message event encountered : "
1222                                + msg.getEvent());
1223                    }
1224                    break;
1225                    }
1226                }
1227            else
1228                {
1229                System.err.println (getName() +
1230                            " Unexpected Ace message type encountered : "
1231                            + message.messageType());
1232                }
1233            }
1234
1235        }
1236
1237        public void sendMessage (byte[] msg_data, int offset, int length)
1238        {
1239        if (ipcClient.sendIPCMessage (msg_data, offset, length, this) == false)
1240            {
1241            System.err.println (getName() + (new Date()) + ' ' +
1242                        (new Date().getTime() & 0xFFFF) +
1243                        " Message sending failed : "
1244                        + ((AceThread)(Thread.currentThread())).getErrorMessage());
1245            }
1246        }
1247        
1248        private AceIPCClient ipcClient;
1249        private DataSender dataSender = null;
1250    }
1251
1252    
1253
1254    
1255    try
1256        {
1257        int port = 3000;
1258        String JavaDoc hostname = "localhost";
1259        int send_data_interval = 0;
1260
1261        if ((args.length != 0) && (args.length != 3))
1262            {
1263            System.out.println ("Arguments (all or nothing): <server port>, <server hostname>, <send user data interval(ms) - if 0, sends upon receipt>");
1264            System.out.println ("Defaults: port=" + port + ", hostname=" + hostname + ", send user data interval=" + send_data_interval);
1265            System.exit (0);
1266            }
1267        
1268        if (args.length == 3)
1269            {
1270            try
1271                {
1272                port = Integer.parseInt (args[0]);
1273                hostname = args[1];
1274                send_data_interval = Integer.parseInt (args[2]);
1275                }
1276            catch (NumberFormatException JavaDoc ex)
1277                {
1278                System.err.println ("Input must be numeric");
1279                System.exit (1);
1280                }
1281            }
1282
1283        AceTimer.Instance().start(); // start the timer thread
1284

1285        ClientUser user = new ClientUser ("TestClient",
1286                          port,
1287                          hostname,
1288                          send_data_interval);
1289        
1290        user.start();
1291        user.join();
1292        System.exit (0);
1293        }
1294    catch (IOException ex)
1295        {
1296        System.err.println ("IOException in main " + ex.getMessage());
1297        System.exit (1);
1298        }
1299    catch (AceException ex)
1300        {
1301        System.err.println ("AceException in main " + ex.getMessage());
1302        System.exit (1);
1303        }
1304    catch (InterruptedException JavaDoc ex)
1305        {
1306        System.err.println ("InterruptedException in main " + ex.getMessage());
1307        System.exit (1);
1308        }
1309    }
1310}
1311
Popular Tags