KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > quikj > application > web > talk > feature > operator > Operator


1 /*
2  * Operator.java
3  *
4  * Created on May 13, 2002, 7:23 AM
5  */

6
7 package com.quikj.application.web.talk.feature.operator;
8
9 import com.quikj.server.framework.*;
10 import com.quikj.server.web.*;
11 import com.quikj.application.web.talk.plugin.*;
12 import com.quikj.application.web.talk.messaging.*;
13 import com.quikj.application.web.talk.messaging.oamp.*;
14 import com.quikj.application.web.talk.feature.proactive.server.*;
15 import com.quikj.application.web.talk.feature.proactive.messaging.*;
16 import com.quikj.client.raccess.*;
17
18 import java.util.*;
19 import java.io.*;
20 import java.net.*;
21
22 /**
23  *
24  * @author amit
25  */

26 public class Operator extends AceThread
27 implements FeatureInterface, EndPointInterface, RemoteServiceInterface, GatekeeperInterface
28 {
29     /** Creates a new instance of Operator */
30     public Operator()
31     throws IOException
32     {
33         super("TalkFeatureOperator");
34     }
35     
36     public void dispose()
37     {
38         // drop any calls that may be in the subscriber queue
39
dropAllSubscribers();
40         
41         if (registered == true)
42         {
43             // send unregistration message
44
if (ServiceController.Instance().sendMessage(new UnregistrationEvent(userName)) == false)
45             {
46                 // print error message
47
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
48                 Thread.currentThread().getName()
49                 + "- Operator.dispose() -- Error sending unregistration message to the service controller");
50             }
51             
52             AceRMIImpl rs = AceRMIImpl.getInstance();
53             if (rs != null) // if remote service has been started
54
{
55                 rs.unregisterService ("com.quikj.application.web.talk.feature.operator.Operator:"
56                 + userName);
57             }
58             
59             registered = false;
60         }
61         
62         try
63         {
64             if (callQMessageTimerId != -1)
65             {
66                 AceTimer.Instance().cancelTimer(callQMessageTimerId);
67                 callQMessageTimerId = -1;
68             }
69             
70             if (opmCollectionTimerId != -1)
71             {
72                 AceTimer.Instance().cancelTimer(opmCollectionTimerId);
73                 opmCollectionTimerId = -1;
74             }
75             
76             if (proactiveTimer != -1)
77             {
78                 AceTimer.Instance().cancelTimer(proactiveTimer);
79                 proactiveTimer = -1;
80             }
81             
82             if (proactiveActive == true)
83             {
84                 AceLicenseManager.getInstance().returnUnits("ace-proactive-sessions", 1);
85             }
86         }
87         catch (IOException ex)
88         {
89             ; // ignore
90
}
91         
92         super.dispose();
93     }
94     
95     public String JavaDoc getUserName()
96     {
97         return userName;
98     }
99     
100     public boolean init(String JavaDoc name, Map params)
101     {
102         if (AceLicenseManager.getInstance().licenseFeature("operator-services-feature") == false)
103         {
104             AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
105             "Operator.init() -- Error obtaining license: "
106             + AceLicenseManager.getInstance().getErrorMessage());
107             return false;
108         }
109         
110         userName = name;
111         
112         if (initParams(params) == false)
113         {
114             return false;
115         }
116         
117         if (hostName == null)
118         {
119             try
120             {
121                 hostName = InetAddress.getLocalHost().getHostName();
122             }
123             catch (UnknownHostException ex)
124             {
125                 hostName = "Unknown";
126             }
127         }
128         
129         synchronized(counterLock)
130         {
131             identifier = hostName + ":feature:" + userName + ":" + (new Date()).getTime()
132             + ":" + counter++;
133         }
134         
135         // send registration message to the Service Controller
136
RegistrationRequestMessage reg = new RegistrationRequestMessage();
137         reg.setUserName(name);
138         reg.setPassword(password);
139         boolean ret = ServiceController.Instance().sendEvent(new MessageEvent(MessageEvent.REGISTRATION_REQUEST,
140         this,
141         reg,
142         null));
143         
144         if (ret == false)
145         {
146             // print error message
147
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
148             getName()
149             + "- Operator.init() -- could not send registration message to the service controller");
150             return false;
151         }
152         
153         // license check for proactive feature
154
initProactive();
155         
156         return true;
157     }
158     
159     private void initProactive()
160     {
161         if (proactiveMonitoring == true)
162         {
163             try
164             {
165                 ProactiveUserData data = ProactiveUserData.getInstance();
166                 if (data == null)
167                 {
168                     proactiveActive = false;
169                 }
170                 else
171                 {
172                     // get a license
173
if (AceLicenseManager.getInstance().consumeUnits("ace-proactive-sessions", 1)
174                     == false)
175                     {
176                         AceLogger.Instance().log(AceLogger.ERROR,
177                         AceLogger.SYSTEM_LOG,
178                         getName()
179                         + "- Operator.init() -- could not obtain license for the proactive features: "
180                         + AceLicenseManager.getInstance().getErrorMessage());
181                         proactiveActive = false;
182                     }
183                     else
184                     {
185                         proactiveActive = true;
186                     }
187                 }
188             }
189             catch (Exception JavaDoc ex)
190             {
191                 proactiveActive = false;
192             }
193         }
194         else
195         {
196             proactiveActive = false;
197         }
198     }
199     
200     private boolean initParams(Map params)
201     {
202         synchronized (paramLock)
203         {
204             String JavaDoc proactive_monitoring_s = (String JavaDoc)params.get("proactive-monitoring");
205             if (proactive_monitoring_s != null)
206             {
207                 if (proactive_monitoring_s.equals("yes") == true)
208                 {
209                     proactiveMonitoring = true;
210                 }
211                 else if (proactive_monitoring_s.equals("no") == true)
212                 {
213                     proactiveMonitoring = false;
214                 }
215                 else
216                 {
217                     AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
218                     getName()
219                     + "- Operator.initParams() -- proactive-monitoring must be either \"yes\" or \"no\"");
220                     return false;
221                 }
222             }
223             
224             String JavaDoc max_session_s = (String JavaDoc)params.get("max-sessions");
225             if (max_session_s != null)
226             {
227                 try
228                 {
229                     maxSessionsPerOperator = Integer.parseInt(max_session_s);
230                 }
231                 catch (NumberFormatException JavaDoc ex)
232                 {
233                     // print error message
234
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
235                     getName()
236                     + "- Operator.initParams() -- max-sessions must be numeric");
237                     return false;
238                 }
239             }
240             
241             password = (String JavaDoc)params.get("password");
242             
243             String JavaDoc max_operators_s = (String JavaDoc)params.get("max-operators");
244             if (max_operators_s != null)
245             {
246                 try
247                 {
248                     maxOperators = Integer.parseInt(max_operators_s);
249                 }
250                 catch (NumberFormatException JavaDoc ex)
251                 {
252                     // print error message
253
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
254                     getName()
255                     + "- Operator.initParams() -- max-operators must be numeric");
256                     return false;
257                 }
258             }
259             
260             String JavaDoc max_queue_s = (String JavaDoc)params.get("max-queue-size");
261             if (max_queue_s != null)
262             {
263                 try
264                 {
265                     maxQSize = Integer.parseInt(max_queue_s);
266                 }
267                 catch (NumberFormatException JavaDoc ex)
268                 {
269                     // print error message
270
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
271                     getName()
272                     + "- Operator.initParams() -- max-queue-size must be numeric");
273                     return false;
274                 }
275             }
276         }
277         return true;
278     }
279     
280     public void start()
281     {
282         super.start();
283     }
284     
285     public boolean sendEvent(AceMessageInterface message)
286     {
287         return super.sendMessage(message);
288     }
289     
290     public void run()
291     {
292         if (proactiveActive == true)
293         {
294             try
295             {
296                 // start the proactive timer
297
proactiveTimer = AceTimer.Instance().startTimer(PROACTIVE_TIMER_INTERVAL,
298                 PROACTIVE_UPDATE_TIMER);
299                 if (proactiveTimer == -1)
300                 {
301                     AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
302                     getName()
303                     + "- Operator.run() -- Could not start proactive timer - "
304                     + getErrorMessage());
305                 }
306             }
307             catch (IOException ex)
308             {
309                 // should not happen
310
}
311         }
312         
313         while (true)
314         {
315             AceMessageInterface message = waitMessage();
316             if (message == null)
317             {
318                 // print error message
319
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
320                 getName()
321                 + "- Operator.run() -- A null message was received while waiting for a message - "
322                 + getErrorMessage());
323                 
324                 break;
325             }
326             
327             if ((message instanceof AceSignalMessage) == true)
328             {
329                 // A signal message is received
330

331                 // print informational message
332
AceLogger.Instance().log(AceLogger.INFORMATIONAL, AceLogger.SYSTEM_LOG,
333                 getName()
334                 + " - Operator.run() -- A signal "
335                 + ((AceSignalMessage)message).getSignalId()
336                 + " is received : "
337                 + ((AceSignalMessage)message).getMessage());
338                 break;
339             }
340             else if ((message instanceof MessageEvent) == true)
341             {
342                 boolean ret = processMessageEvent((MessageEvent)message);
343                 if (ret == false)
344                 {
345                     break;
346                 }
347             }
348             else if ((message instanceof AceTimerMessage) == true)
349             {
350                 int parm = (int)((AceTimerMessage)message).getUserSpecifiedParm();
351                 boolean ret = true;
352                 
353                 switch (parm)
354                 {
355                     case CALL_Q_MESSAGE_TIMER:
356                     {
357                         ret = processCallQMessageTimerEvent((AceTimerMessage)message);
358                     }
359                     break;
360                     case OPM_TIMER:
361                     {
362                         ret = processOPMs((AceTimerMessage)message);
363                     }
364                     break;
365                     case PROACTIVE_UPDATE_TIMER:
366                     {
367                         ret = processProactiveTimer((AceTimerMessage)message);
368                     }
369                     break;
370                     default:
371                     {
372                         AceLogger.Instance().log(AceLogger.WARNING, AceLogger.SYSTEM_LOG,
373                         getName()
374                         + "- Operator.run() -- No handling for timer expiry with user parm = "
375                         + parm);
376                     }
377                     break;
378                 }
379                 
380                 if (ret == false)
381                 {
382                     break;
383                 }
384             }
385             else
386             {
387                 AceLogger.Instance().log(AceLogger.WARNING, AceLogger.SYSTEM_LOG,
388                 getName()
389                 + "- Operator.run() -- An unexpected message was received while waiting for a message - "
390                 + message.messageType());
391             }
392             
393         } // while
394

395         dispose();
396     }
397     
398     private void startCallQMessageTimer()
399     {
400         try
401         {
402             // start the periodic timer
403
callQMessageTimerId = AceTimer.Instance().startTimer(CALL_Q_MESSAGE_INTERVAL,
404             CALL_Q_MESSAGE_TIMER);
405             if (callQMessageTimerId == -1)
406             {
407                 // print error message
408
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
409                 getName()
410                 + "- Operator.startCallQMessageTimer() -- Could not start the call queue message timer - "
411                 + getErrorMessage());
412                 
413                 // and continue
414
}
415         }
416         catch (IOException ex)
417         {
418             // should not happend, ignore
419
;
420         }
421     }
422     
423     private void startOPMTimer(Date from_time)
424     {
425         try
426         {
427             Calendar cal = Calendar.getInstance();
428             cal.setTime(from_time);
429             cal.add(Calendar.SECOND, 60 - cal.get(Calendar.SECOND));
430             opmCollectionTime = cal.getTime();
431             
432             opmCollectionTimerId = AceTimer.Instance().startTimer(opmCollectionTime, OPM_TIMER);
433             if (opmCollectionTimerId == -1)
434             {
435                 // print error message
436
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
437                 getName()
438                 + "- Operator.startOPMTimer() -- Could not start the measurements collection timer - "
439                 + getErrorMessage());
440                 
441                 // and continue
442
}
443         }
444         catch (IOException ex)
445         {
446             // should not happend, ignore
447
;
448         }
449     }
450     
451     private boolean processProactiveTimer(AceTimerMessage event)
452     {
453         // iterate through the operator queue and send out web site visitor info
454
ListIterator iter = operatorQueue.listIterator(0);
455         
456         ArrayList proactive_sessions = null;
457         HashMap changes = null;
458         
459         while (iter.hasNext() == true)
460         {
461             OperatorElement element = (OperatorElement)iter.next();
462             EndPointInterface ep =
463             EndPointList.Instance().findRegisteredEndPoint(element.getOperatorInfo().getUser());
464             if (ep == null)
465             {
466                 continue;
467             }
468             
469             if (element.isProactiveSubscribersSent() == false)
470             {
471                 if (proactive_sessions == null)
472                 {
473                     proactive_sessions = ProactiveUserData.getInstance().listSessions(userName);
474                 }
475                 
476                 // send the information to the client
477
UserToUserMessage msg = new UserToUserMessage();
478                 msg.setApplicationClass("com.quikj.application.web.talk.feature.proactive.client.ProactiveOperatorListHandler");
479                 ProactiveNotificationMessage not = new ProactiveNotificationMessage();
480                 
481                 int num = proactive_sessions.size();
482                 for (int i = 0; i < num; i++)
483                 {
484                     UnregisteredInfo info = (UnregisteredInfo)proactive_sessions.get(i);
485                     if (info.getEndPoint() != null)
486                     {
487                         ProactiveSessionElement ele = new ProactiveSessionElement();
488                         ele.setOperation(ProactiveSessionElement.OPERATION_ADD);
489                         ele.setSessionId(info.getIdent().getUniqueIdentifier());
490                         ele.setUrl(((WebInfo)info.getWebInfo().getLast()).getUrl());
491                         ele.setAccessTime(info.getLastAccessTime());
492                         ele.setPageCount(info.getPageCount());
493                         not.addProactiveSessionElement(ele);
494                     }
495                 }
496                 
497                 msg.setApplicationMessage(not.format());
498                 
499                 // send the message to the end-point
500
MessageEvent ev = new MessageEvent(MessageEvent.CLIENT_REQUEST_MESSAGE,
501                 this, msg, null);
502                 if (ep.sendEvent(ev) == false)
503                 {
504                     AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
505                     getName()
506                     + "- Operator.processProactiveTimer -- Could not send user to user message to endpoint "
507                     + element.getOperatorInfo().getUser());
508                 }
509                 else
510                 {
511                     element.setProactiveSubscribersSent(true);
512                 }
513             }
514             
515             if (changes == null)
516             {
517                 changes = ProactiveUserData.getInstance().listChanges(userName);
518             }
519             
520             // send the incremental information to the client
521
if (changes.size() > 0)
522             {
523                 UserToUserMessage msg = new UserToUserMessage();
524                 msg.setApplicationClass("com.quikj.application.web.talk.feature.proactive.client.ProactiveOperatorListHandler");
525                 ProactiveNotificationMessage not = new ProactiveNotificationMessage();
526                 
527                 Set set = changes.keySet();
528                 Iterator it = set.iterator();
529                 
530                 Date now = new Date();
531                 while (it.hasNext() == true)
532                 {
533                     String JavaDoc ident = (String JavaDoc)it.next();
534                     Integer JavaDoc type = (Integer JavaDoc)changes.get(ident);
535                     UnregisteredIdentifier key = new UnregisteredIdentifier(userName, ident);
536                     
537                     ProactiveSessionElement ele = new ProactiveSessionElement();
538                     ele.setOperation(type.intValue());
539                     ele.setSessionId(ident);
540                     
541                     if (type.intValue() == ProactiveUserData.START)
542                     {
543                         UnregisteredInfo inf = ProactiveUserData.getInstance().get(key);
544                         if (inf != null)
545                         {
546                             ele.setAccessTime(inf.getLastAccessTime());
547                             ele.setUrl(((WebInfo)inf.getWebInfo().getLast()).getUrl());
548                             ele.setPageCount(inf.getPageCount());
549                         }
550                         else
551                         {
552                             ele.setAccessTime(now);
553                             ele.setPageCount(0);
554                         }
555                     }
556                     else // STOP
557
{
558                         ele.setAccessTime(now);
559                     }
560                     
561                     not.addProactiveSessionElement(ele);
562                 }
563                 
564                 msg.setApplicationMessage(not.format());
565                 
566                 // send the message to the end-point
567
MessageEvent ev = new MessageEvent(MessageEvent.CLIENT_REQUEST_MESSAGE,
568                 this, msg, null);
569                 if (ep.sendEvent(ev) == false)
570                 {
571                     AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
572                     getName()
573                     + "- Operator.processProactiveTimer -- Could not send user to user message to endpoint "
574                     + element.getOperatorInfo().getUser());
575                 }
576             }
577         }
578         
579         // re-start the timer
580
try
581         {
582             // start the proactive timer
583
proactiveTimer = AceTimer.Instance().startTimer(PROACTIVE_TIMER_INTERVAL,
584             PROACTIVE_UPDATE_TIMER);
585             if (proactiveTimer == -1)
586             {
587                 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
588                 getName()
589                 + "- Operator.processProactiveTimer -- Could not re-start proactive timer - "
590                 + getErrorMessage());
591             }
592         }
593         catch (IOException ex)
594         {
595             // should not happen
596
}
597         
598         return true;
599     }
600     
601     private boolean processCallQMessageTimerEvent(AceTimerMessage event)
602     {
603         // the CALL Q timer must have expired
604
ListIterator iter = subscriberQueue.listIterator();
605         
606         while (iter.hasNext() == true)
607         {
608             SubscriberElement subs = (SubscriberElement)iter.next();
609             
610             // send a progress message to the guy
611
SetupResponseMessage response = new SetupResponseMessage();
612             response.setSessionId(subs.getSessionId());
613             
614             MediaElements media = new MediaElements();
615             TextElement telem = new TextElement();
616             telem.setMessage(java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
617             ServiceController.getLocale(subs.getEndpoint().getParam("language"))).getString("Operator_Services:_All_operators_are_currently_busy_assisting_other_customers,_please_hold_for_the_next_available_representative") + '\n');
618             media.addMediaElement(telem);
619             response.setMediaElements(media);
620             
621             if (subs.getEndpoint().sendEvent(new MessageEvent(MessageEvent.SETUP_RESPONSE,
622             this,
623             SetupResponseMessage.PROG,
624             java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
625             ServiceController.getLocale(subs.getEndpoint().getParam("language"))).getString("Operator_Services:_Please_hold_while_we_transfer_you_to_an_operator"),
626             response,
627             null)) == false)
628             {
629                 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
630                 Thread.currentThread().getName()
631                 + "- Operator.processCallQMessageTimerEvent() -- Error sending progress event to the calling party");
632                 
633                 return true;
634             }
635         }
636         
637         startCallQMessageTimer(); // restart
638

639         return true;
640     }
641     
642     private boolean processOPMs(AceTimerMessage event)
643     {
644         // collect sampled OPMs
645

646         opms.collectOPM(opmCollectionTime, OPM_ACTIVE_OPERATOR_COUNT, operatorQueue.size());
647         
648         opms.collectOPM(opmCollectionTime, OPM_USERS_WAITING, subscriberQueue.size());
649         
650         // determine number of users being served, note it will also reflect opr-opr calls
651
int num_opr_calls = 0;
652         ListIterator iter = operatorQueue.listIterator(0);
653         while (iter.hasNext() == true)
654         {
655             num_opr_calls += ((OperatorElement)iter.next()).getOperatorInfo().getCallCount();
656         } //end while
657

658         opms.collectOPM(opmCollectionTime, OPM_USERS_TALKING, num_opr_calls);
659         
660         //check OPM storage interval mark, if it's time, store the data in the database
661

662         Calendar cal = Calendar.getInstance();
663         cal.setTime(opmCollectionTime);
664         if ((cal.get(Calendar.MINUTE) % OPM_STORAGE_INTERVAL) == 0)
665         {
666             // process non-sampled OPMs for this interval
667

668             // accumulate/reset current user wait times:
669
long curr_time = cal.getTime().getTime();
670             iter = subscriberQueue.listIterator();
671             while (iter.hasNext() == true)
672             {
673                 SubscriberElement subs = (SubscriberElement)iter.next();
674                 int wait_time = (int)(curr_time - subs.getStartWaitTime()) / 1000;
675                 if (wait_time >= 0)
676                 {
677                     opms.collectOPM(opmCollectionTime, OPM_USER_WAIT_TIME, wait_time);
678                     subs.setStartWaitTime(curr_time);
679                 }
680             }
681             
682             // for non-sampled OPMs, if no entries this period, add one with value zero so that
683
// the opm is present in the db
684
if (opms.getNumCollectedOPMs(OPM_USER_WAIT_TIME) == 0)
685             {
686                 opms.collectOPM(opmCollectionTime, OPM_USER_WAIT_TIME, 0);
687             }
688             
689             // average collected OPMs
690
opms.averageOPMs(opmCollectionTime);
691             
692             // store the averages in the database
693
if (opms.storeOPMs() == false)
694             {
695                 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
696                 Thread.currentThread().getName()
697                 + "- Operator.processOPMs() -- Failure storing Operator feature OPMs in the database.");
698             }
699             
700             // clear the opms for the next interval
701
opms.clearOPMs();
702         }
703         
704         startOPMTimer(opmCollectionTime); // restart
705

706         return true;
707     }
708     
709     private boolean processMessageEvent(MessageEvent message)
710     {
711         switch (message.getEventType())
712         {
713             case MessageEvent.REGISTRATION_RESPONSE:
714                 return processRegistrationResponseEvent(message);
715                 
716             case MessageEvent.CLIENT_REQUEST_MESSAGE:
717                 return processClientRequestMessage(message);
718                 
719             case MessageEvent.SETUP_REQUEST:
720                 return processSetupRequestEvent(message);
721                 
722             case MessageEvent.DISCONNECT_MESSAGE:
723                 return processDisconnectMessage(message);
724                 
725             default:
726                 // ignore other messages
727
break;
728         }
729         
730         return true; // ignore unknown message event
731
}
732     
733     private boolean processRegistrationResponseEvent(MessageEvent event)
734     {
735         if (registered == true) // if already registered
736
{
737             // something must be wrong
738

739             // print error message
740
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
741             getName()
742             + "- Operator.processRegistrationResponseEvent() -- A registration response event is received for this feature that is already registered");
743             return false;
744         }
745         
746         // check the status
747
if (event.getResponseStatus() != AceHTTPMessage.OK)
748         {
749             // print error message
750
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
751             getName()
752             + "- Operator.processRegistrationResponseEvent() -- Registration failed, status: "
753             + event.getResponseStatus());
754             return false;
755         }
756         
757         RegistrationResponseMessage resp_message = (RegistrationResponseMessage)event.getMessage();
758         if (resp_message != null)
759         {
760             selfInfo = resp_message.getCallPartyInfo();
761             com.quikj.application.web.talk.messaging.GroupElement gel = resp_message.getGroup();
762             
763             if (gel != null)
764             {
765                 int size = gel.numElements();
766                 
767                 // add all the operators to the queue
768
for (int i = 0; i < size; i++)
769                 {
770                     if (isMyOperator(gel.elementAt(i)) == true)
771                     {
772                         addToQueue(gel.elementAt(i));
773                     }
774                 }
775             }
776         }
777         
778         // set group name
779
UserElement user_data = EndPointList.Instance().findRegisteredUserData(this);
780         if (user_data != null)
781         {
782             StringBuffer JavaDoc groupname = new StringBuffer JavaDoc();
783             
784             String JavaDoc[] groups = user_data.getOwnsGroups(); // at least 1, should be only 1 for this app
785
for (int i = 0; i < groups.length; i++)
786             {
787                 groupname.append(groups[i]);
788                 if (i < (groups.length-1))
789                 {
790                     groupname.append(',');
791                 }
792             }
793             groupName = groupname.toString();
794         }
795         
796         opms = new OPMUtil();
797         opms.setTableName(OPM_TABLE_NAME);
798         opms.setKeyColumnValue(groupName);
799         startOPMTimer(new Date());
800         
801         AceRMIImpl rs = AceRMIImpl.getInstance();
802         if (rs != null) // if remote service has been started
803
{
804             rs.registerService ("com.quikj.application.web.talk.feature.operator.Operator:"
805             + userName,
806             this);
807         }
808         registered = true;
809         return true;
810     }
811     
812     private boolean processSetupRequestEvent(MessageEvent event)
813     {
814         if ((event.getMessage() instanceof SetupRequestMessage) == true)
815         {
816             // peg call received
817
callsReceived++;
818             
819             SetupRequestMessage setup = (SetupRequestMessage)event.getMessage();
820             
821             SubscriberElement subs = new SubscriberElement();
822             subs.setSessionId(setup.getSessionId());
823             subs.setEndpoint(event.getFrom());
824             
825             int max_q_size = 0;
826             synchronized (paramLock)
827             {
828                 max_q_size = maxQSize;
829             }
830             
831             if (max_q_size > 0) // if the max queue has been initialized
832
{
833                 if (subscriberQueue.size() >= max_q_size)
834                 {
835                     // send a BUSY message
836
SetupResponseMessage resp = new SetupResponseMessage();
837                     resp.setSessionId(setup.getSessionId());
838                     
839                     // send the message
840
if (ServiceController.Instance().sendMessage(new MessageEvent(MessageEvent.SETUP_RESPONSE,
841                     this,
842                     SetupResponseMessage.BUSY,
843                     java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
844                     ServiceController.getLocale(event.getFrom().getParam("language"))).getString("All_operators_are_currently_busy,_please_try_again_later"),
845                     resp,
846                     null)) == false)
847                     {
848                         // print error message
849
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
850                         Thread.currentThread().getName()
851                         + "- Operator.processSetupRequestEvent() -- Error sending BUSY message to the service controller");
852                     }
853                     
854                     // peg call missed
855
callsMissed++;
856                     
857                     return true;
858                 }
859             }
860             
861             // send a call progress message
862
SetupResponseMessage response = new SetupResponseMessage();
863             response.setSessionId(setup.getSessionId());
864             
865             CalledNameElement called = new CalledNameElement();
866             called.setCallParty(selfInfo);
867             response.setCalledParty(called);
868             
869             MediaElements media = new MediaElements();
870             TextElement telem = new TextElement();
871             telem.setMessage(java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
872             ServiceController.getLocale(event.getFrom().getParam("language"))).getString("Operator_Services:_Please_hold_while_we_transfer_you_to_an_operator"));
873             media.addMediaElement(telem);
874             response.setMediaElements(media);
875             
876             if (event.getFrom().sendEvent(new MessageEvent(MessageEvent.SETUP_RESPONSE,
877             this,
878             SetupResponseMessage.PROG,
879             java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
880             ServiceController.getLocale(event.getFrom().getParam("language"))).getString("Operator_Services:_Please_hold,_while_the_call_is_being_transferred_to_an_operator"),
881             response,
882             null)) == false)
883             {
884                 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
885                 Thread.currentThread().getName()
886                 + "- Operator.processSetupRequestEvent() -- Error sending progress event to the calling party");
887                 
888                 return true;
889             }
890             
891             // add the subscriber to the call queue
892
subs.setRequestId(event.getRequestId());
893             subs.setStartWaitTime(new Date().getTime());
894             
895             subscriberQueue.addLast(subs);
896             
897             if (callQMessageTimerId == -1)
898             {
899                 startCallQMessageTimer();
900             }
901             
902             // Check if any operator is available, etc.
903
// if yes, transfer the call
904
checkQueueStatus(true);
905         }
906         return true;
907     }
908     
909     private boolean processDisconnectMessage(MessageEvent event)
910     {
911         if ((event.getMessage() instanceof DisconnectMessage) == true)
912         {
913             DisconnectMessage disc = (DisconnectMessage)event.getMessage();
914             long session = disc.getSessionId();
915             
916             ListIterator iter = subscriberQueue.listIterator();
917             
918             while (iter.hasNext() == true)
919             {
920                 SubscriberElement element = (SubscriberElement)iter.next();
921                 if (element.getSessionId() == session)
922                 {
923                     iter.remove();
924                     
925                     // peg wait time opm
926
int wait_time = (int) (new Date().getTime() - element.getStartWaitTime())/1000;
927                     opms.collectOPM(OPM_USER_WAIT_TIME, wait_time);
928                     
929                     break;
930                 }
931             }
932         }
933         return true;
934     }
935     
936     private boolean processClientRequestMessage(MessageEvent event)
937     {
938         if ((event.getMessage() instanceof GroupActivityMessage) == true)
939         {
940             GroupActivityMessage gam = (GroupActivityMessage)event.getMessage();
941             com.quikj.application.web.talk.messaging.GroupElement gm = gam.getGroup();
942             
943             int num = gm.numElements();
944             for (int i = 0; i < num; i++)
945             {
946                 GroupMemberElement ge = gm.elementAt(i);
947                 int operation = ge.getOperation();
948                 switch (operation)
949                 {
950                     case GroupMemberElement.OPERATION_ADD_LIST:
951                         if (isMyOperator(ge) == true)
952                         {
953                             int max_operators = 0;
954                             synchronized (paramLock)
955                             {
956                                 max_operators = maxOperators;
957                             }
958                             
959                             if (max_operators > 0) // if a size has been specified
960
{
961                                 if (operatorQueue.size() >= max_operators)
962                                 {
963                                     AceLogger.Instance().log(AceLogger.INFORMATIONAL,
964                                     AceLogger.SYSTEM_LOG,
965                                     Thread.currentThread().getName()
966                                     + "- Operator.processClientRequestMessage() -- "
967                                     + "Operator queue " + userName
968                                     + " has reached its capacity, ubable to add new user");
969                                     
970                                     return true;
971                                 }
972                             }
973                             
974                             addToQueue(ge);
975                             
976                             // check if anyone is waiting in the subscriber queue
977
// if, yes, maybe, an operator is available for a call
978
checkQueueStatus(false);
979                         }
980                         break;
981                         
982                     case GroupMemberElement.OPERATION_MOD_LIST:
983                         if (isMyOperator(ge) == true)
984                         {
985                             adjustQueueEntry(ge);
986                             
987                             // check if anyone is waiting in the subscriber queue
988
// if, yes, maybe an operator is available for a call
989
checkQueueStatus(false);
990                         }
991                         break;
992                         
993                     case GroupMemberElement.OPERATION_REM_LIST:
994                         removeFromQueue(ge);
995                         
996                         // check if all operators are gone, if yes and if there
997
// are subscribers in the queue, drop them
998
checkQueueStatus(false);
999                         break;
1000                }
1001            }
1002            
1003        }
1004        else if ((event.getMessage() instanceof UserToUserMessage) == true)
1005        {
1006            UserToUserMessage msg = (UserToUserMessage)event.getMessage();
1007            
1008            ProactiveMessageParser parser = null;
1009            try
1010            {
1011                parser = new ProactiveMessageParser(XMLBuilder.Instance().getDocumentBuilder());
1012            }
1013            catch (AceMessageException ex)
1014            {
1015                // print error message
1016
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
1017                Thread.currentThread().getName()
1018                + "- Operator.processClientRequestMessage -- Could not obtain XML parser: "
1019                + ex.getMessage());
1020                return true;
1021            }
1022            
1023            if (parser.parse(msg.getApplicationMessage()) == false)
1024            {
1025                // print error message
1026
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
1027                Thread.currentThread().getName()
1028                + "- Operator.processClientRequestMessage -- Could not parse message: "
1029                + parser.getErrorMessage());
1030                return true;
1031            }
1032            
1033            ProactiveMessageInterface pmi = parser.getMessage();
1034            if ((pmi instanceof CallRequestMessage) == true)
1035            {
1036                CallRequestMessage crqm = (CallRequestMessage)pmi;
1037                int call_status = CallResponseMessage.OK;
1038                String JavaDoc reason = "OK";
1039                
1040                UnregisteredIdentifier id = new UnregisteredIdentifier(userName,
1041                crqm.getCalled());
1042                UnregisteredInfo info = ProactiveUserData.getInstance().get(id);
1043                if (info == null)
1044                {
1045                    call_status = CallResponseMessage.NOT_PRESENT;
1046                    reason =
1047                    java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
1048                    ServiceController.getLocale(event.getFrom().getParam("language"))).getString("User_not_found");
1049                }
1050                
1051                EndPointInterface ep = null;
1052                if (call_status == CallResponseMessage.OK)
1053                {
1054                    ep = info.getEndPoint();
1055                    if (ep == null)
1056                    {
1057                        call_status = CallResponseMessage.NOT_PRESENT;
1058                        reason = java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
1059                        ServiceController.getLocale(event.getFrom().getParam("language"))).getString("User_not_found");
1060                    }
1061                }
1062                
1063                if (call_status == CallResponseMessage.OK)
1064                {
1065                    if (info.isHelped() == true)
1066                    {
1067                        call_status = CallResponseMessage.TAKEN;
1068                        reason = java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
1069                        ServiceController.getLocale(event.getFrom().getParam("language"))).getString("User_is_being_attended_by_another_operator"); }
1070                }
1071                
1072                if (call_status == CallResponseMessage.OK)
1073                {
1074                    ConversationInitiationMessage conv = new ConversationInitiationMessage();
1075                    conv.setGroup(userName);
1076                    conv.setSessionId(crqm.getCalled());
1077                    conv.setOperator(crqm.getOperator());
1078                    
1079                    // send this event to the end-point
1080
if (ep.sendEvent(conv) == false)
1081                    {
1082                        call_status = CallResponseMessage.SEND_ERROR;
1083                        reason = java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
1084                        ServiceController.getLocale(event.getFrom().getParam("language"))).getString("Could_not_send_the_request_to_the_user");
1085                    }
1086                }
1087                
1088                UserToUserMessage utu = new UserToUserMessage();
1089                utu.setApplicationClass(""); // to keep the parser happy
1090

1091                CallResponseMessage crm = new CallResponseMessage();
1092                crm.setStatus(call_status);
1093                utu.setApplicationMessage(crm.format());
1094                
1095                MessageEvent ev = new MessageEvent(MessageEvent.CLIENT_RESPONSE_MESSAGE,
1096                this,
1097                AceHTTPMessage.OK,
1098                reason,
1099                utu,
1100                null,
1101                event.getRequestId());
1102                if (event.getFrom().sendEvent(ev) == false)
1103                {
1104                    AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
1105                    getName()
1106                    + "- Operator.processProactiveTimer -- Could not send user to user response message to endpoint "
1107                    + event.getFrom().getIdentifier());
1108                }
1109            }
1110            else if ((pmi instanceof InformationRequestMessage) == true)
1111            {
1112                InformationRequestMessage irm = (InformationRequestMessage)pmi;
1113                UnregisteredIdentifier ident = new UnregisteredIdentifier(userName,
1114                irm.getSessionId());
1115                UnregisteredInfo info = ProactiveUserData.getInstance().get(ident);
1116                
1117                UserToUserMessage utu = new UserToUserMessage();
1118                utu.setApplicationClass(""); // to keep the parser happy
1119

1120                InformationResponseMessage inrm = new InformationResponseMessage();
1121                inrm.setSessionId(irm.getSessionId());
1122                
1123                if (info == null)
1124                {
1125                    inrm.setStatus(InformationResponseMessage.STATUS_NOT_FOUND);
1126                    utu.setApplicationMessage(inrm.format());
1127                    
1128                    MessageEvent ev = new MessageEvent(MessageEvent.CLIENT_RESPONSE_MESSAGE,
1129                    this,
1130                    AceHTTPMessage.OK,
1131                    "OK",
1132                    utu,
1133                    null,
1134                    event.getRequestId());
1135                    if (event.getFrom().sendEvent(ev) == false)
1136                    {
1137                        AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
1138                        getName()
1139                        + "- Operator.processProactiveTimer -- Could not send user to user response message to endpoint "
1140                        + event.getFrom().getIdentifier());
1141                    }
1142                    
1143                    return true;
1144                } // not found
1145

1146                // information found
1147
if (info.getEndPoint() == null)
1148                {
1149                    inrm.setStatus(InformationResponseMessage.STATUS_INACTIVE);
1150                }
1151                else
1152                {
1153                    inrm.setStatus(InformationResponseMessage.STATUS_OK);
1154                }
1155                
1156                inrm.setPageCount(info.getPageCount());
1157                inrm.setLastAccessTime(info.getLastAccessTime());
1158                inrm.setServed(info.isHelped());
1159                
1160                LinkedList l = info.getWebInfo();
1161                Iterator iter = l.iterator();
1162                while (iter.hasNext() == true)
1163                {
1164                    WebInfo winfo = (WebInfo)iter.next();
1165                    WebInfoElement wel = new WebInfoElement();
1166                    wel.setAccessTime(winfo.getAccessTime());
1167                    wel.setUrl(winfo.getUrl());
1168                    inrm.addWebInfoElement(wel);
1169                }
1170                
1171                utu.setApplicationMessage(inrm.format());
1172                
1173                MessageEvent ev = new MessageEvent(MessageEvent.CLIENT_RESPONSE_MESSAGE,
1174                this,
1175                AceHTTPMessage.OK,
1176                "OK",
1177                utu,
1178                null,
1179                event.getRequestId());
1180                if (event.getFrom().sendEvent(ev) == false)
1181                {
1182                    AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
1183                    getName()
1184                    + "- Operator.processProactiveTimer -- Could not send user to user response message to endpoint "
1185                    + event.getFrom().getIdentifier());
1186                }
1187                
1188            }
1189            else
1190            {
1191                AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
1192                Thread.currentThread().getName()
1193                + "- Operator.processClientRequestMessage -- Unknown message: "
1194                + pmi.getClass().getName());
1195                return true;
1196            }
1197        }
1198        return true;
1199    }
1200    
1201    private boolean isMyOperator(GroupMemberElement element)
1202    {
1203        UserElement my_info =
1204        EndPointList.Instance().findRegisteredUserData(selfInfo.getUserName());
1205        
1206        UserElement op_info =
1207        EndPointList.Instance().findRegisteredUserData(element.getUser());
1208        
1209        if (op_info == null)
1210        {
1211            return false;
1212        }
1213        
1214        String JavaDoc[] my_groups = my_info.getOwnsGroups();
1215        
1216        for (int i = 0; i < my_groups.length; i++)
1217        {
1218            if (op_info.belongsToGroup(my_groups[i]) == true)
1219            {
1220                return true;
1221            }
1222        }
1223        
1224        return false;
1225    }
1226    
1227    private void dropAllSubscribers()
1228    {
1229        ListIterator iter = subscriberQueue.listIterator();
1230        long curr_time = new Date().getTime();
1231        
1232        while (iter.hasNext() == true)
1233        {
1234            SubscriberElement element = (SubscriberElement)iter.next();
1235            // send a BUSY message
1236
SetupResponseMessage resp = new SetupResponseMessage();
1237            resp.setSessionId(element.getSessionId());
1238            
1239            // send the message
1240
if (ServiceController.Instance().sendMessage(new MessageEvent(MessageEvent.SETUP_RESPONSE,
1241            this,
1242            SetupResponseMessage.NOANS,
1243            java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
1244            ServiceController.getLocale(element.getEndpoint().getParam("language"))).getString("No_operators_are_currently_available,_please_try_again_later"),
1245            resp,
1246            null)) == false)
1247            {
1248                // print error message
1249
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
1250                Thread.currentThread().getName()
1251                + "- Operator.dropAllSubscribers() -- Error sending NOANS message to the service controller");
1252            }
1253            
1254            //peg missed call
1255
callsMissed++;
1256            
1257            // peg wait time opm
1258
int wait_time = (int) (curr_time - element.getStartWaitTime())/1000;
1259            opms.collectOPM(OPM_USER_WAIT_TIME, wait_time);
1260        }
1261        subscriberQueue.clear();
1262    }
1263    
1264    private boolean transferCallToOperator(SubscriberElement subscriber,
1265    GroupMemberElement operator)
1266    {
1267        DisconnectMessage message = new DisconnectMessage();
1268        message.setSessionId(subscriber.getSessionId());
1269        DisconnectReasonElement reason = new DisconnectReasonElement();
1270        reason.setReasonCode(0);
1271        
1272        String JavaDoc user = null;
1273        if (operator.getFullName() != null)
1274        {
1275            user = operator.getFullName();
1276        }
1277        else
1278        {
1279            user = operator.getUser();
1280        }
1281        reason.setReasonText(java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.operator.language",
1282        ServiceController.getLocale(subscriber.getEndpoint().getParam("language"))).getString("You_are_being_connected_to_operator_") + ' ' + user);
1283        message.setDisconnectReason(reason);
1284        
1285        CalledNameElement called = new CalledNameElement();
1286        CallPartyElement party = new CallPartyElement();
1287        party.setName(operator.getUser());
1288        party.setFullName(operator.getFullName());
1289        called.setCallParty(party);
1290        message.setCalledInfo(called);
1291        
1292        if (ServiceController.Instance().sendMessage(new MessageEvent(MessageEvent.DISCONNECT_MESSAGE,
1293        this,
1294        message,
1295        null)) == false)
1296        {
1297            // print error message
1298
AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
1299            Thread.currentThread().getName()
1300            + "- Operator.transferCallToOperator() -- Error sending disconnect message to the service controller");
1301            return false;
1302        }
1303        
1304        return true;
1305    }
1306    
1307    private void checkQueueStatus(boolean processing_setup_request)
1308    {
1309        if (subscriberQueue.size() <= 0) // no one there
1310
{
1311            return;
1312        }
1313        
1314        if (operatorQueue.size() <= 0) // no operator
1315
{
1316            dropAllSubscribers();
1317            return;
1318        }
1319        
1320        OperatorElement operator = (OperatorElement)operatorQueue.removeFirst();
1321        
1322        synchronized (paramLock)
1323        {
1324            if (operator.getOperatorInfo().getCallCount() >= maxSessionsPerOperator)
1325            {
1326                // if the operators' hands are full
1327
operatorQueue.addFirst(operator); // add him back
1328
return;
1329            }
1330        }
1331        
1332        // subscriber available, operator available, transfer call
1333
SubscriberElement subscriber = (SubscriberElement)subscriberQueue.removeFirst();
1334        
1335        if (transferCallToOperator(subscriber, operator.getOperatorInfo()) == true)
1336        {
1337            operator.getOperatorInfo().setCallCount(operator.getOperatorInfo().getCallCount()
1338            + 1);
1339            
1340            // add operator back to queue with re-adjusted count
1341
addToQueue(operator.getOperatorInfo(), operator.isProactiveSubscribersSent());
1342            
1343            if ((processing_setup_request == true) && (subscriberQueue.size() == 0))
1344            {
1345                callsProcessedImmediately++;
1346            }
1347            
1348            // peg wait time opm
1349
int wait_time = (int) (new Date().getTime() - subscriber.getStartWaitTime())/1000;
1350            opms.collectOPM(OPM_USER_WAIT_TIME, wait_time);
1351        }
1352        else // call transfer failed
1353
{
1354            operatorQueue.addFirst(operator); // add him back
1355
subscriberQueue.addFirst(subscriber);
1356        }
1357    }
1358    
1359    
1360    private void adjustQueueEntry(GroupMemberElement operator)
1361    {
1362        ListIterator iter = operatorQueue.listIterator(0);
1363        String JavaDoc name = operator.getUser();
1364        int call_count = operator.getCallCount();
1365        boolean entry_removed = false;
1366        
1367        OperatorElement element = null;
1368        while (iter.hasNext() == true)
1369        {
1370            element = (OperatorElement)iter.next();
1371            if (name.equals(element.getOperatorInfo().getUser()) == true)
1372            {
1373                if (call_count == element.getOperatorInfo().getCallCount())
1374                {
1375                    // if there is no change in count, there is nothing to
1376
// be done
1377
return;
1378                }
1379                else
1380                {
1381                    iter.remove(); // remove it temporarily
1382
entry_removed = true;
1383                    break;
1384                }
1385            }
1386        } //end while
1387

1388        boolean proactive_sent = false;
1389        if (entry_removed == true)
1390        {
1391            // copy the operator name to the new element
1392
operator.setFullName(element.getOperatorInfo().getFullName());
1393            proactive_sent = element.isProactiveSubscribersSent();
1394        }
1395        
1396        addToQueue(operator, proactive_sent);
1397    }
1398    
1399    private void removeFromQueue(GroupMemberElement operator)
1400    {
1401        ListIterator iter = operatorQueue.listIterator(0);
1402        String JavaDoc name = operator.getUser();
1403        while (iter.hasNext() == true)
1404        {
1405            OperatorElement element = (OperatorElement)iter.next();
1406            if (name.equals(element.getOperatorInfo().getUser()) == true)
1407            {
1408                iter.remove();
1409                break;
1410            }
1411        }
1412    }
1413    
1414    private void addToQueue(GroupMemberElement gel, boolean pro)
1415    {
1416        OperatorElement operator = new OperatorElement();
1417        operator.setOperatorInfo(gel);
1418        operator.setProactiveSubscribersSent(pro);
1419        
1420        ListIterator iter = operatorQueue.listIterator(0);
1421        boolean added = false;
1422        
1423        int call_count = gel.getCallCount();
1424        while (iter.hasNext() == true)
1425        {
1426            GroupMemberElement element = ((OperatorElement)iter.next()).getOperatorInfo();
1427            if (element.getCallCount() > call_count)
1428            {
1429                try
1430                {
1431                    iter.previous(); // go back to the previous entry
1432
iter.add(operator);
1433                }
1434                catch (NoSuchElementException ex)
1435                {
1436                    operatorQueue.addFirst(operator);
1437                }
1438                added = true;
1439                break;
1440            }
1441        }
1442        
1443        if (added == false)
1444        {
1445            operatorQueue.addLast(operator);
1446        }
1447    }
1448    
1449    private void addToQueue(GroupMemberElement gel)
1450    {
1451        addToQueue(gel, false);
1452    }
1453    
1454    public String JavaDoc getIdentifier()
1455    {
1456        return identifier;
1457    }
1458    
1459    public void resynchParam(Map params)
1460    {
1461        // set default values
1462
proactiveMonitoring = false;
1463        maxSessionsPerOperator = 1;
1464        maxOperators = -1;
1465        maxQSize = -1;
1466        initParams(params);
1467        
1468        if (proactiveMonitoring == true)
1469        {
1470            if (proactiveActive == false)
1471            {
1472                // the proactive feature just got turned on
1473
try
1474                {
1475                    ProactiveUserData data = ProactiveUserData.getInstance();
1476                    if (data == null)
1477                    {
1478                        proactiveActive = false;
1479                    }
1480                    else
1481                    {
1482                        // get a license
1483
if (AceLicenseManager.getInstance().consumeUnits("ace-proactive-sessions", 1)
1484                        == false)
1485                        {
1486                            AceLogger.Instance().log(AceLogger.ERROR,
1487                            AceLogger.SYSTEM_LOG,
1488                            getName()
1489                            + "- Operator.resynchParam() -- could not obtain license for the proactive features: "
1490                            + AceLicenseManager.getInstance().getErrorMessage());
1491                            proactiveActive = false;
1492                        }
1493                        else
1494                        {
1495                            proactiveActive = true;
1496                            
1497                            try
1498                            {
1499                                // start the proactive timer
1500
proactiveTimer = AceTimer.Instance().startTimer(PROACTIVE_TIMER_INTERVAL, this,
1501                                PROACTIVE_UPDATE_TIMER);
1502                                if (proactiveTimer == -1)
1503                                {
1504                                    AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG,
1505                                    getName()
1506                                    + "- Operator.resynchParam() -- Could not start proactive timer - "
1507                                    + getErrorMessage());
1508                                }
1509                            }
1510                            catch (IOException ex)
1511                            {
1512                                // should not happen
1513
}
1514                        }
1515                    }
1516                }
1517                catch (Exception JavaDoc ex)
1518                {
1519                    proactiveActive = false;
1520                }
1521            }
1522        }
1523        else // if proactiveMonitoring == false
1524
{
1525            if (proactiveActive == true)
1526            {
1527                // the proactive feature just got turned off
1528

1529                // first, return the license
1530
AceLicenseManager.getInstance().returnUnits("ace-proactive-sessions", 1);
1531                
1532                // stop the proactive timer
1533
if (proactiveTimer != -1)
1534                {
1535                    try
1536                    {
1537                        AceTimer.Instance().cancelTimer(proactiveTimer);
1538                    }
1539                    catch (IOException ex)
1540                    {
1541                        // should not happen
1542
}
1543                    
1544                    proactiveTimer = -1;
1545                }
1546                
1547                proactiveActive = false;
1548            }
1549        }
1550    }
1551    
1552    public void setParam(String JavaDoc key, String JavaDoc value)
1553    {
1554        synchronized (keyValuePair)
1555        {
1556            keyValuePair.put(key, value);
1557        }
1558    }
1559    
1560    public String JavaDoc getParam(String JavaDoc key)
1561    {
1562        synchronized (keyValuePair)
1563        {
1564            return (String JavaDoc)keyValuePair.get(key);
1565        }
1566    }
1567    
1568    public void setParamObject(String JavaDoc key, Object JavaDoc value)
1569    {
1570        synchronized (keyValuePair)
1571        {
1572            keyValuePair.put(key, value);
1573        }
1574    }
1575    
1576    public Object JavaDoc getParamObject(String JavaDoc key)
1577    {
1578        synchronized (keyValuePair)
1579        {
1580            return keyValuePair.get(key);
1581        }
1582    }
1583    
1584    public void removeParam(String JavaDoc key)
1585    {
1586        synchronized (keyValuePair)
1587        {
1588            keyValuePair.remove(key);
1589        }
1590    }
1591    
1592    public RowElement getStatsData()
1593    {
1594        RowElement ele = new RowElement();
1595        
1596        ele.addListItem(userName);
1597        ele.addListItem(selfInfo.getFullName());
1598        ele.addListItem(new Integer JavaDoc(operatorQueue.size()).toString());
1599        ele.addListItem(new Integer JavaDoc(subscriberQueue.size()).toString());
1600        ele.addListItem(new Integer JavaDoc(callsReceived).toString());
1601        ele.addListItem(new Integer JavaDoc(callsMissed).toString());
1602        ele.addListItem(new Integer JavaDoc(callsProcessedImmediately).toString());
1603        
1604        return ele;
1605    }
1606    
1607    public MetaDataElement getStatsMetaData()
1608    {
1609        MetaDataElement ele = new MetaDataElement();
1610        
1611        ele.addListItem("Name ");
1612        ele.addListItem("Full Name ");
1613        ele.addListItem("# Active Operators");
1614        ele.addListItem("# Queued Callers");
1615        ele.addListItem("Calls Received");
1616        ele.addListItem("Calls Missed");
1617        ele.addListItem("Calls Processed Immediately");
1618        
1619        return ele;
1620    }
1621    
1622    public void clearStatsCounts()
1623    {
1624        callsReceived = 0;
1625        callsMissed = 0;
1626        callsProcessedImmediately = 0;
1627    }
1628    
1629    public String JavaDoc getRMIParam(String JavaDoc key)
1630    {
1631        if (key.equals("operator-queue-size") == true)
1632        {
1633            return (new Integer JavaDoc(operatorQueue.size())).toString();
1634        }
1635        else if (key.equals("all-operators-busy") == true)
1636        {
1637            int max_q_size = 0;
1638            synchronized (paramLock)
1639            {
1640                max_q_size = maxQSize;
1641            }
1642            
1643            if (max_q_size > 0) // if the max queue has been initialized
1644
{
1645                if (subscriberQueue.size() >= max_q_size)
1646                {
1647                    return "true";
1648                }
1649                else
1650                {
1651                    return "false";
1652                }
1653            }
1654            else
1655            {
1656                return "false";
1657            }
1658            
1659        }
1660        else if (key.equals("subscriber-queue-size") == true)
1661        {
1662            return (new Integer JavaDoc(subscriberQueue.size())).toString();
1663        }
1664        
1665        return null;
1666    }
1667    
1668    public boolean setRMIParam(String JavaDoc key, String JavaDoc value)
1669    {
1670        return false;
1671    }
1672    
1673    public boolean allow(EndPointInterface ep, EndPointInfo info)
1674    {
1675        if (operatorQueue.size() >= maxOperators)
1676        {
1677            return false;
1678        }
1679        
1680        return true;
1681    }
1682    
1683    private static String JavaDoc hostName;
1684    private static int counter = 0;
1685    private static Object JavaDoc counterLock = new Object JavaDoc();
1686    private String JavaDoc identifier;
1687    private String JavaDoc userName = null;
1688    private String JavaDoc groupName = null;
1689    private LinkedList operatorQueue= new LinkedList();
1690    private LinkedList subscriberQueue = new LinkedList();
1691    private int maxSessionsPerOperator = 1;
1692    private CallPartyElement selfInfo;
1693    private boolean registered = false;
1694    private int maxOperators = -1;
1695    private int maxQSize = -1;
1696    private String JavaDoc password;
1697    private Object JavaDoc paramLock = new Object JavaDoc();
1698    
1699    private HashMap keyValuePair = new HashMap();
1700    
1701    // Timer IDs
1702
private static final int CALL_Q_MESSAGE_TIMER = 0; // timer parm
1703

1704    // OPM_STORAGE_INTERVAL must divide evenly into 60
1705
private static final int OPM_TIMER = 1; // timer parm
1706

1707    private static final int PROACTIVE_UPDATE_TIMER = 3;
1708    
1709    // call queue message timing
1710
private static final long CALL_Q_MESSAGE_INTERVAL = 2 * 60 * 100; // a minute
1711
private int callQMessageTimerId = -1;
1712    
1713    // operational measurements collection
1714
private Date opmCollectionTime; // collect at top of each minute
1715
private static final int OPM_STORAGE_INTERVAL = 15; // store in DB every 15 min
1716
private int opmCollectionTimerId = -1;
1717    
1718    private OPMUtil opms;
1719    private static final String JavaDoc OPM_TABLE_NAME = "opm_operator_tbl";
1720    private static final String JavaDoc OPM_ACTIVE_OPERATOR_COUNT = "actv_ops"; // sampled
1721
private static final String JavaDoc OPM_USERS_WAITING = "users_waiting"; // sampled
1722
private static final String JavaDoc OPM_USERS_TALKING = "users_talking"; // sampled
1723
private static final String JavaDoc OPM_USER_WAIT_TIME = "user_wait_time";
1724    
1725    // realtime monitoring statistics
1726
private int callsReceived = 0;
1727    private int callsMissed = 0;
1728    private int callsProcessedImmediately = 0;
1729    
1730    private static final long PROACTIVE_TIMER_INTERVAL = 5 * 1000L;
1731    private boolean proactiveActive = false;
1732    private int proactiveTimer = -1;
1733    private boolean proactiveMonitoring = false;
1734}
1735
Popular Tags