KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > quikj > server > framework > AceIPCServer


1 package com.quikj.server.framework;
2
3 import java.io.*;
4 import java.net.*;
5 import java.util.*;
6
7 public class AceIPCServer extends AceThread implements AceIPCEntityInterface
8 {
9     // LOCKING:
10
// Use of attributes socket and clientList both require locking.
11
// Interactions include:
12
// - public methods called by ipc user (dispose(), sendIPCMessage())
13
// - protected methods called by child (connectionClosed(), sendHeartbeatMessage())
14
// - internal (this class) processing (processEvent())
15

16     public AceIPCServer (long user_parm,
17              String JavaDoc name,
18              int port,
19              int max_connections,
20              int hb_interval,
21              AceThread unsol_msg_handler,
22              AceThread connect_handler,
23              AceThread disconnect_handler)
24     throws IOException, AceException
25     {
26     super(name);
27     
28     if (initNewSocket (port, user_parm) == false)
29         {
30         throw new AceException ("Datagram socket initialization failed");
31         }
32     
33     userParm = user_parm;
34     maxConnections = max_connections;
35     hbInterval = hb_interval;
36     unsolMsgHandler = unsol_msg_handler;
37     connectHandler = connect_handler;
38     disconnectHandler = disconnect_handler;
39     this.port = port;
40         
41     // print trace message
42
//System.out.println (name +
43
// ": AceIPCServer.AceIPCServer() -- IPC Server created");
44
}
45     
46     public void dispose()
47     {
48     dropConnections();
49     dropSocket();
50     
51     if (this.isAlive() == true)
52         {
53         if (interruptWait (AceSignalMessage.SIGNAL_TERM,
54                    "Normal IPC Server dispose") == false)
55             {
56             // print log message
57
System.err.println (getName()
58                         + ": AceIPCServer.dispose() -- Could not interrupt own wait : "
59                         + getErrorMessage());
60             
61             super.dispose();
62             }
63         }
64     else
65         {
66         super.dispose();
67         
68         // print trace message
69
//System.out.println (getName() +
70
// ": AceIPCServer.dispose() -- IPC Server disposed");
71
}
72     }
73     
74     public void run()
75     {
76     sockListener.start();
77     
78     while (true)
79         {
80         AceMessageInterface message = waitMessage();
81         if (message == null)
82             {
83             // print trace message
84
//System.out.println (getName() +
85
// ": AceIPCServer.run() -- Null message encountered");
86
continue;
87             }
88         else if ((message instanceof AceSignalMessage) == true)
89             {
90             // print trace message
91
//System.out.println (getName() +
92
// ": AceIPCServer.run() -- Signal received, ID = "
93
// + ((AceSignalMessage)message).getSignalId()
94
// + ", signal message = "
95
// + ((AceSignalMessage)message).getMessage() + ' '
96
// + (new Date()) + ' ' +
97
// (new Date().getTime() & 0xFFFF));
98
super.dispose();
99             break;
100             }
101         
102         processEvent (message);
103         }
104     }
105
106     public AceMessageInterface waitIPCMessage ()
107     {
108     Thread JavaDoc thr = Thread.currentThread();
109     
110     if ((thr instanceof AceThread) == false)
111         {
112         writeErrorMessage ("This method is not being called from an object which is a sub-class of type AceThread");
113         return null;
114         }
115     
116     AceThread cthread = (AceThread)thr;
117     
118     while (true)
119         {
120         AceMessageInterface msg_received = cthread.waitMessage();
121         if ((msg_received instanceof AceIPCMessage) == true)
122             {
123             if (((AceIPCMessage)msg_received).getEntity() == this)
124                 {
125                 return msg_received;
126                 }
127             }
128         else if ((msg_received instanceof AceSignalMessage) == true)
129             {
130             return msg_received;
131             }
132         }
133     }
134
135     private boolean initNewSocket (int port, long user_parm)
136     {
137     // handles socket, sockListener
138

139     synchronized (socketLock)
140         {
141         try
142             {
143             socket = new DatagramSocket (port);
144             }
145         catch (SocketException ex)
146             {
147             // print log meessage
148
System.err.println (getName() +
149                         ": AceIPCServer.initNewSocket() -- Socket error creating DatagramSocket : "
150                         + ex.getMessage() + ' '
151                         + (new Date()) + ' ' +
152                         (new Date().getTime() & 0xFFFF));
153             return false;
154             }
155         
156         try
157             {
158             sockListener = new AceDatagram (user_parm,
159                             getName() + "_sockListener",
160                             this,
161                             socket,
162                             AceIPCMessage.MAX_IPC_MSG_SIZE);
163             }
164         catch (IOException ex)
165             {
166             // print log message
167
System.err.println (getName() +
168                         ": AceIPCServer.initNewSocket() -- IO error creating AceDatagram : "
169                         + ex.getMessage());
170             socket.close();
171             socket = null;
172             return false;
173             }
174         catch (AceException ex)
175             {
176             // print log message
177
System.err.println (getName() +
178                         ": AceIPCServer.initNewSocket() -- Ace error creating AceDatagram : "
179                         + ex.getMessage());
180             socket.close();
181             socket = null;
182             return false;
183             }
184         }
185     
186     return true;
187     }
188
189     private void dropSocket ()
190     {
191     synchronized (socketLock)
192         {
193         if (sockListener != null)
194             {
195             sockListener.dispose();
196             sockListener = null;
197             socket = null;
198             }
199         
200         flushMessages(); // except don't want to flush signal
201
}
202     }
203     
204     private void dropConnections ()
205     {
206     synchronized (clientList)
207         {
208         Collection elements = clientList.values();
209         for (Iterator i = elements.iterator(); i.hasNext() == true; )
210             {
211             HashMap element = (HashMap) i.next();
212             Collection subelements = element.values();
213             for (Iterator j = subelements.iterator(); j.hasNext() == true; )
214                 {
215                 AceIPCServerConnection conn = (AceIPCServerConnection) j.next();
216                 InetAddress addr = conn.getClientAddress();
217                 int port = conn.getClientPort();
218                 conn.dispose();
219                 // not sending disc because if it gets IOException,
220
// sendMessage() will get us back in here (deadlock).
221
// Fix later if needed.
222

223                 if (disconnectHandler != null) //notify user
224
{
225                     AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.DISCONNECT,
226                                             this,
227                                             addr,
228                                             port,
229                                             userParm);
230                     if (disconnectHandler.sendMessage (msg_for_user) == false)
231                         {
232                         // print log message
233
System.err.println (getName()
234                                     + ": AceIPCServer.dropConnections() -- Could not send IPC message to the user disconnect handler thread : "
235                                     + getErrorMessage());
236                         }
237                     }
238                 }
239             element.clear();
240             }
241         
242         clientList.clear();
243         }
244     }
245     
246     private void dropConnection (AceIPCServerConnection conn, boolean send_disc_msg)
247     // called when a connection needs to be taken down
248
{
249     InetAddress client_addr = conn.getClientAddress();
250     int client_port = conn.getClientPort();
251     conn.dispose();
252     removeClient (conn);
253     if (send_disc_msg == true)
254         {
255         sendDisconnectMessage (client_addr, client_port);
256         }
257     }
258     
259     protected void connectionClosed (AceIPCServerConnection conn,
260                      boolean send_disc_msg)
261     // called by AceIPCServerConnection to inform parent (this)
262
{
263     removeClient (conn);
264     if (send_disc_msg == true)
265         {
266         sendDisconnectMessage (conn.getClientAddress(), conn.getClientPort());
267         }
268     }
269     
270     private void getNewSocket ()
271     {
272     dropConnections();
273     dropSocket();
274     
275     for (int i = 0; i < MAX_INIT_FAILURES_IN_A_ROW; i++)
276         {
277         if (initNewSocket (port, userParm) == true)
278             {
279             sockListener.start();
280             break;
281             }
282         }
283     }
284     
285     
286     private void processEvent (AceMessageInterface message)
287     {
288     if ((message instanceof AceDatagramMessage) == true)
289         {
290         if (((AceDatagramMessage)message).getStatus() == AceDatagramMessage.READ_COMPLETED)
291             {
292             // handle the message
293
try
294                 {
295                 AceIPCMessageParser parser = new AceIPCMessageParser
296                     (((AceDatagramMessage)message).getBuffer(),
297                      ((AceDatagramMessage)message).getLength());
298                 int msg_type = parser.getMessageType();
299                 
300                 // find associated client in list
301
AceIPCServerConnection client_conn = findClient
302                     (((AceDatagramMessage)message).getAddress(),
303                      ((AceDatagramMessage)message).getPort());
304                 
305                 switch (msg_type)
306                     {
307                     case AceIPCMessageInterface.CONN_REQ_MSG:
308                     {
309                         if (client_conn != null)
310                         {
311                             client_conn.dispose();
312                             removeClient (client_conn);
313                         }
314                         // could improve efficiency here
315

316                         processConnReqMessage ((AceIPCConnReqMessage) parser.getMessage(),
317                                    ((AceDatagramMessage)message).getAddress(),
318                                    ((AceDatagramMessage)message).getPort());
319                     }
320                     break;
321                     default:
322                     {
323                         switch (msg_type)
324                         {
325                         case AceIPCMessageInterface.HB_MSG:
326                             {
327                             if (client_conn != null)
328                                 {
329                                 if (client_conn.resetReceiveTiming() == false)
330                                     {
331                                     dropConnection (client_conn,
332                                             true);
333                                     }
334                                 }
335                             }
336                             break;
337                         case AceIPCMessageInterface.DISCONNECT_MSG:
338                             {
339                             if (client_conn != null)
340                                 {
341                                 client_conn.dispose();
342                                 removeClient (client_conn);
343                                 }
344                             }
345                             break;
346                         case AceIPCMessageInterface.USER_MSG:
347                             {
348                             if (client_conn != null)
349                                 {
350                                 if (client_conn.resetReceiveTiming() == true)
351                                     {
352                                     processUserMessage ((AceIPCUserMessage) parser.getMessage(),
353                                                 ((AceDatagramMessage)message).getAddress(),
354                                                 ((AceDatagramMessage)message).getPort());
355                                     }
356                                 else
357                                     {
358                                     dropConnection (client_conn,
359                                             true);
360                                     }
361                                 }
362                             }
363                             break;
364                         default:
365                             {
366                             // print log message
367
System.err.println (getName() +
368                                         ": AceIPCServer.processEvent() -- Unexpected message type received : "
369                                         + parser.getMessageType()
370                                         + ", msg follows: " + '\n'
371                                         + AceIPCMessage.dumpRawBytes
372                                         (((AceDatagramMessage)message).getBuffer(),
373                                          0,
374                                          ((AceDatagramMessage)message).getLength()));
375                             
376                             }
377                             break;
378                         }
379                     }
380                     break;
381                     }
382                 }
383             catch (AceException ex)
384                 {
385                 // print log message
386
System.err.println (getName() +
387                             ": AceIPCServer.processEvent() -- Error parsing message, AceException : "
388                             + ex.getMessage()
389                             + ", msg follows: " + '\n'
390                             + AceIPCMessage.dumpRawBytes
391                             (((AceDatagramMessage)message).getBuffer(),
392                              0,
393                              ((AceDatagramMessage)message).getLength()));
394                 return;
395                 }
396             }
397         else
398             {
399             // log error
400
System.err.println (getName() +
401                         ": AceIPCServer.processEvent() -- Error on datagram read, status = "
402                         + ((AceDatagramMessage)message).getStatus());
403             getNewSocket();
404             }
405         }
406     else
407         {
408         // print log message
409
System.err.println (getName() +
410                     ": AceIPCServer.processEvent() -- Unexpected Ace message type encountered : "
411                     + message.messageType());
412         }
413     
414     }
415     
416     private void processConnReqMessage (AceIPCConnReqMessage conn_message,
417                     InetAddress addr,
418                     int port)
419     {
420     int current_num_connections = 0;
421     synchronized (clientList)
422         {
423         Collection elements = clientList.values();
424         for (Iterator i = elements.iterator(); i.hasNext() == true; )
425             {
426             current_num_connections += ((HashMap) i.next()).size();
427             }
428         }
429     
430     if (current_num_connections < maxConnections)
431         {
432         try
433             {
434             AceIPCServerConnection client_conn = new AceIPCServerConnection (getName() + '_' + addr + '_' + port,
435                                              addr,
436                                              port,
437                                              hbInterval,
438                                              this);
439             if (sendConnectResponseMessage (AceIPCConnRespMessage.STATUS_OK,
440                             addr, port) == true)
441                 {
442                 addClient (client_conn,
443                        conn_message.getBytes(),
444                        conn_message.userDataOffset(),
445                        conn_message.userDataLength());
446                 client_conn.start();
447                 }
448             else
449                 {
450                 client_conn.dispose();
451                 }
452             }
453         catch (IOException ex)
454             {
455             // print log message
456
System.err.println (getName() +
457                         ": AceIPCServer.processConnReqMessage() -- IOException creating AceIPCServerConnection : "
458                         + ex.getMessage());
459             
460             boolean status = sendConnectResponseMessage (AceIPCConnRespMessage.STATUS_REFUSED,
461                                      addr, port);
462             }
463         }
464     else
465         {
466         boolean status = sendConnectResponseMessage (AceIPCConnRespMessage.STATUS_TRY_LATER,
467                                  addr, port);
468         }
469     }
470     
471     private void processUserMessage (AceIPCUserMessage received_message,
472                      InetAddress addr,
473                      int port)
474     {
475     int to_thread_id = received_message.getToThreadID();
476     
477     //System.out.println ("RECEIVED MESSAGE W/TO_THREAD_ID = "
478
// + to_thread_id
479
// + ", FROM_THREAD_ID = " + received_message.getFromThreadID() + ' '
480
// + (new Date()) + ' ' +
481
// (new Date().getTime() & 0xFFFF));
482

483     AceIPCMessage msg_for_user = new AceIPCMessage
484         (AceIPCMessage.MESSAGE_RECEIVED,
485          (to_thread_id > 0 ? true : false),
486          this,
487          received_message.getFromThreadID(),
488          received_message.getBytes(),
489          received_message.userDataOffset(),
490          received_message.userDataLength(),
491          addr,
492          port,
493          userParm);
494     
495     if (to_thread_id > 0)
496         {
497         AceThread to_thread = AceThread.getAceThreadObject (to_thread_id);
498         if (to_thread != null)
499             {
500             if (to_thread.sendMessage (msg_for_user) == false)
501                 {
502                 // print log message
503
System.err.println (getName()
504                             + ": AceIPCServer.processUserMessage() -- Could not send solicited IPC message to thread id = "
505                             + to_thread_id
506                             + " : "
507                             + getErrorMessage());
508                 }
509             }
510         else
511             {
512             // print trace message
513
//System.out.println (getName()
514
// + ": AceIPCServer.processUserMessage() -- Could not find to-thread, to-thread-id = "
515
// + to_thread_id);
516
}
517         }
518     else
519         {
520         if (unsolMsgHandler != null)
521             {
522             if (unsolMsgHandler.sendMessage (msg_for_user) == false)
523                 {
524                 // print log message
525
System.err.println (getName()
526                             + ": AceIPCServer.processUserMessage() -- Could not send IPC message to the unsolicited msg handler thread : "
527                             + getErrorMessage());
528                 }
529             }
530         }
531     }
532     
533     private boolean sendConnectResponseMessage (int status,
534                         InetAddress addr,
535                         int port)
536     {
537     //System.out.println ("Server: sending HB interval = " + hbInterval);
538
return (sendMessage (new AceIPCConnRespMessage (status, hbInterval),
539                  addr,
540                  port));
541     }
542     
543     private void sendDisconnectMessage (InetAddress addr, int port)
544     {
545     boolean status = sendMessage (new AceIPCDiscMessage(), addr, port);
546     }
547
548     protected boolean sendHeartbeatMessage (InetAddress addr, int port)
549     // called by AceIPCServerConnection when its time to send a heartbeat
550
{
551     return sendMessage (new AceIPCHeartbeatMessage(), addr, port);
552     }
553     
554     private boolean sendMessage (AceIPCMessageInterface message,
555                  InetAddress addr, int port)
556     {
557     boolean retval = true;
558
559     synchronized (socketLock)
560         {
561         if (socket == null)
562             {
563             return false;
564             }
565         
566         DatagramPacket dp = new DatagramPacket (message.getBytes(),
567                             message.getLength(),
568                             addr,
569                             port);
570         
571         try
572             {
573             
574             socket.send (dp);
575             // print trace message
576
//System.out.println (getName() + (new Date()) + ' ' +
577
// (new Date().getTime() & 0xFFFF) +
578
// ": *** TRACE *** MESSAGE SENT to address = "
579
// + addr.toString()
580
// + ", port = "
581
// + port
582
// + " ***** "
583
// + message.traceIPCMessage(true));
584
}
585         catch (IOException ex)
586             {
587             // print log message
588
System.err.println (getName() +
589                         ": AceIPCServer.sendMessage() -- IOException sending message on socket, error : "
590                         + ex.getMessage()
591                         + ", dest address = "
592                         + addr.toString()
593                         + ", dest port = "
594                         + port + ' '
595                         + (new Date()) + ' '
596                         + (new Date().getTime() & 0xFFFF)
597                         + ", message follows: \n"
598                         + message.traceIPCMessage(true));
599             
600             retval = false;
601             }
602         }
603     
604     if (retval == false)
605         {
606         // getNewSocket(); see comment in test program, broadcastMessage()
607
}
608     
609     return retval;
610     }
611
612     public boolean sendIPCMessage (byte[] message, int offset, int len,
613                    int to_thread_id, AceThread sender,
614                    InetAddress addr, int port)
615     {
616     Thread JavaDoc parent_thread = null;
617     if (sender == null)
618         {
619         parent_thread = Thread.currentThread();
620         }
621     else
622         {
623         parent_thread = sender;
624         }
625     
626     if ((parent_thread instanceof AceThread) == false)
627         {
628         writeErrorMessage ("The calling thread must be an instance of AceThread");
629         return false;
630         }
631             
632     boolean retval = true;
633     
634     // find associated client in list
635
AceIPCServerConnection client_conn = findClient (addr, port);
636     
637     if (client_conn == null)
638         {
639         writeErrorMessage ("The client is not currently connected");
640         retval = false;
641         }
642     else if (client_conn.resetSendTiming() == false)
643         {
644         dropConnection (client_conn, true);
645         writeErrorMessage ("Fatal timing error encountered, connection dropped");
646         retval = false;
647         }
648     else
649         {
650         AceIPCUserMessage ipc_msg = new AceIPCUserMessage (to_thread_id,
651                                    ((AceThread)parent_thread).getAceThreadId(),
652                                    message,
653                                    offset,
654                                    len);
655         
656         if (sendMessage (ipc_msg, addr, port) == false)
657             {
658             writeErrorMessage ("Socket error sending message, connection dropped");
659             retval = false;
660             }
661         }
662     
663     return retval;
664     }
665     
666     public boolean sendIPCMessage (byte[] message, int offset, int len, int to_thread_id,
667                    InetAddress addr, int port)
668     {
669     return sendIPCMessage (message, offset, len, to_thread_id, null, addr, port);
670     }
671
672     public boolean sendIPCMessage (byte[] message, int offset, int len, InetAddress addr, int port)
673     {
674     return sendIPCMessage (message, offset, len, 0, null, addr, port);
675     }
676     
677     public boolean sendIPCMessage (byte[] message, int offset, int len, AceThread sender,
678                    InetAddress addr, int port)
679     {
680     return sendIPCMessage (message, offset, len, 0, sender, addr, port);
681     }
682
683
684   
685     private int addressToInt (InetAddress addr)
686     {
687     int ret = 0;
688     
689     byte[] addr_bytes = addr.getAddress();
690     try
691         {
692         ret = (int) AceInputSocketStream.octetsToIntMsbFirst (addr_bytes, 0, 4);
693         }
694     catch (NumberFormatException JavaDoc ex)
695         {
696         // print log message
697
System.err.println (getName() +
698                     ": AceIPCServer.addressToInt() -- NumberFormatException converting InetAddress to int : "
699                     + ex.getMessage());
700         }
701     
702     return ret;
703     }
704
705     private void addClient (AceIPCServerConnection client_element,
706                 byte[] reg_data,
707                 int offset,
708                 int length)
709     {
710     synchronized (clientList)
711         {
712         int addr_int = addressToInt (client_element.getClientAddress());
713         HashMap element = (HashMap) clientList.get (new Integer JavaDoc (addr_int));
714         if (element == null) // first client from this inet addr
715
{
716             element = new HashMap ();
717             element.put (new Integer JavaDoc (client_element.getClientPort()),
718                      client_element);
719             clientList.put (new Integer JavaDoc (addr_int), element);
720             }
721         else
722             {
723             element.put (new Integer JavaDoc (client_element.getClientPort()),
724                      client_element);
725             }
726         }
727     
728     if (connectHandler != null)
729         {
730         AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.CONNECTION_ESTABLISHED,
731                                 this,
732                                 reg_data, offset, length,
733                                 client_element.getClientAddress(),
734                                 client_element.getClientPort(),
735                                 userParm);
736         if (connectHandler.sendMessage (msg_for_user) == false)
737             {
738             // print log message
739
System.err.println (getName()
740                         + ": AceIPCServer.addClient() -- Could not send IPC message to the user connect handler thread : "
741                         + getErrorMessage());
742             }
743         }
744     }
745     
746     private void removeClient (AceIPCServerConnection client_element)
747     {
748     boolean element_removed = false;
749
750     synchronized (clientList)
751         {
752         int addr_int = addressToInt (client_element.getClientAddress());
753         HashMap element = (HashMap) clientList.get (new Integer JavaDoc (addr_int));
754         if (element != null)
755             {
756             AceIPCServerConnection subelement = (AceIPCServerConnection) element.get
757                 (new Integer JavaDoc (client_element.getClientPort()));
758             if (subelement != null)
759                 {
760                 element.remove (new Integer JavaDoc (client_element.getClientPort()));
761                 element_removed = true;
762                 if (element.isEmpty() == true)
763                     {
764                     clientList.remove (new Integer JavaDoc (addr_int));
765                     }
766                 }
767             }
768         }
769     
770     if (element_removed == true)
771         if (disconnectHandler != null)
772         {
773             AceIPCMessage msg_for_user = new AceIPCMessage (AceIPCMessage.DISCONNECT,
774                                     this,
775                                     client_element.getClientAddress(),
776                                     client_element.getClientPort(),
777                                     userParm);
778             if (disconnectHandler.sendMessage (msg_for_user) == false)
779             {
780                 // print log message
781
System.err.println (getName()
782                         + ": AceIPCServer.removeClient() -- Could not send IPC message to the user disconnect handler thread : "
783                         + getErrorMessage());
784             }
785         }
786     }
787     
788     private AceIPCServerConnection findClient (InetAddress addr, int port)
789     {
790     synchronized (clientList)
791         {
792         int addr_int = addressToInt (addr);
793         HashMap element = (HashMap) clientList.get (new Integer JavaDoc (addr_int));
794         if (element != null)
795             {
796             return ((AceIPCServerConnection) element.get (new Integer JavaDoc (port)));
797             }
798         }
799     
800     return null;
801     }
802
803     private static final int MAX_INIT_FAILURES_IN_A_ROW = 10;
804     
805     private DatagramSocket socket = null;
806     private AceDatagram sockListener = null;
807     private Object JavaDoc socketLock = new Object JavaDoc();
808     private HashMap clientList = new HashMap();
809     private int maxConnections;
810     private int hbInterval;
811     private AceThread unsolMsgHandler = null;
812     private AceThread connectHandler = null;
813     private AceThread disconnectHandler = null;
814     private long userParm;
815     private int port;
816
817
818     // test program
819
public static void main (String JavaDoc[] args)
820     {
821     class ServerUser extends AceThread
822     {
823         class DataSender extends AceThread
824         {
825         public DataSender (int send_data_interval,
826                    ServerUser parent)
827             throws IOException
828         {
829             super ();
830             sendInterval = send_data_interval;
831             this.parent = parent;
832             System.out.println ("DATA SENDER THREAD ID = " + getAceThreadId());
833         }
834
835         public void dispose()
836         {
837         }
838         
839         public void run()
840         {
841             int msg_counter = 0;
842             byte[] msg_data = new byte[4];
843
844             try
845             {
846                 while (true)
847                 {
848                     sleep (sendInterval);
849                     AceInputSocketStream.intToBytesMsbFirst (++msg_counter,
850                                        msg_data,
851                                        0);
852                     parent.broadcastMessage (msg_data, 0, msg_data.length);
853                 }
854             }
855             catch (InterruptedException JavaDoc ex)
856             {
857                 System.err.println ("DataSender sleep interrupted");
858             }
859         }
860         
861         private int sendInterval;
862         private ServerUser parent;
863         }
864
865         class ConnectedClient
866         {
867         public ConnectedClient (InetAddress addr, int port)
868         {
869             this.address = addr;
870             this.port = port;
871         }
872         
873         public InetAddress getAddress()
874         {
875             return address;
876         }
877
878         public int getPort()
879         {
880             return port;
881         }
882
883         private InetAddress address;
884         private int port;
885         }
886         
887         public ServerUser (String JavaDoc name,
888                    int port,
889                    int max_connections,
890                    int hb_interval,
891                    int send_data_interval)
892         throws IOException, AceException
893         {
894         super(name);
895         
896         ipcServer = new AceIPCServer (5000,
897                           name,
898                           port,
899                           max_connections,
900                           hb_interval,
901                           this,
902                           this,
903                           this);
904         
905         if (send_data_interval > 0)
906             {
907             dataSender = new DataSender (send_data_interval, this);
908             }
909         
910         System.out.println (name + " THREAD ID = " + getAceThreadId());
911         }
912         
913         public void run()
914         {
915         ipcServer.start();
916
917         if (dataSender != null)
918             {
919             dataSender.start();
920             }
921
922         while (true)
923             {
924             AceMessageInterface message = waitMessage(); // this or next
925
//AceMessageInterface message = ipcServer.waitIPCMessage();
926
if (message == null)
927                 {
928                 System.err.println (getName() +
929                             " Null message encountered");
930                 continue;
931                 }
932             else if ((message instanceof AceSignalMessage) == true)
933                 {
934                 System.out.println (getName() +
935                             " Signal received, ID = "
936                             + ((AceSignalMessage)message).getSignalId()
937                             + ", signal message = "
938                             + ((AceSignalMessage)message).getMessage());
939                 ipcServer.dispose();
940                 if (dataSender != null)
941                     {
942                     dataSender.dispose();
943                     }
944                 super.dispose();
945                 break;
946                 }
947             
948             if ((message instanceof AceIPCMessage) == true)
949                 {
950                 AceIPCMessage msg = (AceIPCMessage) message;
951                 switch (msg.getEvent())
952                     {
953                     case AceIPCMessage.CONNECTION_ESTABLISHED:
954                     {
955                         InetAddress addr = msg.getFarEndAddress();
956                         int port = msg.getFarEndPort();
957                         System.out.println (getName() +
958                                 '_' + getAceThreadId()
959                                 + (new Date()) + ' ' +
960                                 (new Date().getTime() & 0xFFFF)
961                                 + " CONNECTION ESTABLISHED WITH CLIENT ADDR = "
962                                 + addr
963                                 + ", PORT = "
964                                 + port
965                                 + ", registration data size = "
966                                 + msg.getUserDataLength()
967                                 + ", reg bytes = "
968                                 + AceIPCMessage.dumpRawBytes (msg.getMessage(),
969                                             msg.getUserDataOffset(),
970                                             msg.getUserDataLength())
971                                 + ", userparm="
972                                 + msg.getUserParm());
973                         if (dataSender != null)
974                         {
975                             // add client to list
976
synchronized (clientList)
977                             {
978                                 if (clientList.add (new ConnectedClient (addr, port)) == false)
979                                 {
980                                     System.err.println (getName() +
981                                             " Couldn't add new client into list, addr = " + addr + ", port = " + port);
982                                 }
983                             }
984                         }
985                     }
986                     break;
987                     case AceIPCMessage.DISCONNECT:
988                     {
989                         System.out.println (getName() +
990                                 '_' + getAceThreadId() + (new Date()) + ' ' +
991                                 (new Date().getTime() & 0xFFFF) +
992                                 " CONNECTION DISCONNECTED, CLIENT ADDR = "
993                                 + msg.getFarEndAddress()
994                                 + ", PORT = "
995                                 + msg.getFarEndPort()
996                                 + ", userparm="
997                                 + msg.getUserParm());
998                         if (dataSender != null)
999                         {
1000                            // remove client from list
1001
synchronized (clientList)
1002                            {
1003                                int num_elements = clientList.size();
1004                                int i;
1005                                for (i = 0; i < num_elements; i++)
1006                                {
1007                                    ConnectedClient element = (ConnectedClient) clientList.get(i);
1008                                    if (element.getAddress().equals(msg.getFarEndAddress()))
1009                                    if (element.getPort() == msg.getFarEndPort())
1010                                        {
1011                                        break;
1012                                        }
1013                                }
1014                                if (i < num_elements)
1015                                {
1016                                    clientList.remove (i);
1017                                }
1018                            }
1019                        }
1020                    }
1021                    break;
1022                    case AceIPCMessage.MESSAGE_RECEIVED:
1023                    {
1024                        int msg_num = (int) AceInputSocketStream.octetsToIntMsbFirst (msg.getMessage(),
1025                                                    msg.getUserDataOffset(),
1026                                                    msg.getUserDataLength());
1027                
1028                        System.out.println (getName() +
1029                                '_' + getAceThreadId() + (new Date()) + ' ' +
1030                                (new Date().getTime() & 0xFFFF) +
1031                                " RECEIVED " +
1032                                ((msg.solicitedMessage() == true) ? "solicited " : "unsolicited ") +
1033                                "senderThreadID=" + msg.getSenderThreadId() +
1034                                " userparm=" + msg.getUserParm() +
1035                                " : " +
1036                                msg_num);
1037                
1038                       
1039                        if (dataSender == null)
1040                        {
1041                            // reply to the received message
1042
byte[] reply = new byte[4];
1043                            AceInputSocketStream.intToBytesMsbFirst (++msg_num,
1044                                               reply,
1045                                               0);
1046                            if (ipcServer.sendIPCMessage (reply, 0, reply.length,
1047                                          msg.getSenderThreadId(),
1048                                          this,
1049                                          msg.getFarEndAddress(),
1050                                          msg.getFarEndPort()) == false)
1051                            {
1052                                System.err.println (getName() + (new Date()) + ' ' +
1053                                        (new Date().getTime() & 0xFFFF) +
1054                                        " Message sending failed : "
1055                                        + getErrorMessage());
1056                            }
1057                        }
1058                    }
1059                    break;
1060                    default:
1061                    {
1062                        System.err.println (getName() +
1063                                " Unexpected IPC message event encountered : "
1064                                + msg.getEvent());
1065                    }
1066                    break;
1067                    }
1068                }
1069            else
1070                {
1071                System.err.println (getName() +
1072                            " Unexpected Ace message type encountered : "
1073                            + message.messageType());
1074                }
1075            }
1076
1077        }
1078
1079        public void broadcastMessage (byte[] msg_data, int offset, int length)
1080        {
1081        synchronized (clientList)
1082            {
1083            int num_elements = clientList.size();
1084            for (int i = 0; i < num_elements; i++)
1085                {
1086                ConnectedClient element = (ConnectedClient) clientList.get(i);
1087                if (ipcServer.sendIPCMessage (msg_data, offset, length, this,
1088                                  element.getAddress(),
1089                                  element.getPort()) == false)
1090                    {
1091                    System.err.println (getName() + (new Date()) + ' ' +
1092                                (new Date().getTime() & 0xFFFF) +
1093                                " Broadcast sending failed to client addr = "
1094                                + (element.getAddress()).toString()
1095                                + ", port = "
1096                                + element.getPort() + " : "
1097                                + ((AceThread)(Thread.currentThread())).getErrorMessage());
1098                    }
1099                // The following was needed when call to getNewSocket() uncommented
1100
// at bottom of AceIPCServer.sendMessage() and multiple
1101
// clients are killed during one DataSender interval. Related
1102
// to java linux socket bug, see
1103
// http://developer.java.sun.com/developer/bugParade/bugs/4301860.html
1104
// Without the following, in that scenario, the old socket doesn't
1105
// get cleaned up and is totally hosed.
1106
// Test scenario:
1107
// 2 client runs:
1108
// - java com.ace.server.framework.AceIPCClient 3000 localhost 1500
1109
// 1 server run:
1110
// - java com.ace.server.framework.AceIPCServer 3000 5 2000 3500
1111
// try
1112
// {
1113
// Thread.sleep(10);
1114
// }
1115
// catch (InterruptedException ex)
1116
// {
1117
// }
1118
}
1119            }
1120        }
1121        
1122        private AceIPCServer ipcServer;
1123        private DataSender dataSender = null;
1124        private ArrayList clientList = new ArrayList();
1125    }
1126    
1127    
1128    try
1129        {
1130        int port = 3000;
1131        int max_connections = 5;
1132        int hb_interval = 2000;
1133        int send_data_interval = 0;
1134
1135        if ((args.length != 0) && (args.length != 4))
1136            {
1137            System.out.println ("Arguments (all or nothing): <port>, <max connections>, <hb interval(ms)>, <send user data interval(ms) - if 0, sends upon receipt>");
1138            System.out.println ("Defaults: port=" + port + ", max connections=" + max_connections + ", hb interval=" + hb_interval + ", send user data interval=" + send_data_interval);
1139            System.exit (0);
1140            }
1141        
1142        if (args.length == 4)
1143            {
1144            try
1145                {
1146                port = Integer.parseInt (args[0]);
1147                max_connections = Integer.parseInt (args[1]);
1148                hb_interval = Integer.parseInt (args[2]);
1149                send_data_interval = Integer.parseInt (args[3]);
1150                }
1151            catch (NumberFormatException JavaDoc ex)
1152                {
1153                System.err.println ("Input must be numeric");
1154                System.exit (1);
1155                }
1156            }
1157
1158        AceTimer.Instance().start(); // start the timer thread
1159

1160        ServerUser user = new ServerUser ("TestServer",
1161                          port,
1162                          max_connections,
1163                          hb_interval,
1164                          send_data_interval);
1165    
1166        user.start();
1167        user.join();
1168        System.exit (0);
1169        }
1170    catch (IOException ex)
1171        {
1172        System.err.println ("IOException in main " + ex.getMessage());
1173        System.exit (1);
1174        }
1175    catch (AceException ex)
1176        {
1177        System.err.println ("AceException in main " + ex.getMessage());
1178        System.exit (1);
1179        }
1180    catch (InterruptedException JavaDoc ex)
1181        {
1182        System.err.println ("InterruptedException in main " + ex.getMessage());
1183        System.exit (1);
1184        }
1185    }
1186}
1187
Popular Tags