1 6 7 package com.quikj.application.web.talk.feature.messagebox.server; 8 9 import com.quikj.server.framework.*; 10 import com.quikj.server.web.*; 11 import com.quikj.application.web.talk.plugin.*; 12 import com.quikj.application.web.talk.messaging.*; 13 import com.quikj.application.web.talk.messaging.oamp.*; 14 15 import java.util.*; 16 import java.io.*; 17 import java.net.*; 18 import javax.mail.internet.*; 19 20 import java.sql.*; 21 22 26 public class MessageBox extends AceThread implements FeatureInterface, EndPointInterface 27 { 28 29 30 public MessageBox() throws IOException 31 { 32 super("Ace-TalkMessageService"); 33 } 34 35 public void dispose() 36 { 37 synchronized (callList) 39 { 40 Enumeration calls = callList.elements(); 42 while (calls.hasMoreElements() == true) 43 { 44 CallInfo call_info = (CallInfo)calls.nextElement(); 45 46 DbOperation db_op = (DbOperation)pendingDbOps.get(new Long (call_info.getSessionId())); 47 if (db_op != null) 48 { 49 db_op.cancel(); 50 } 51 52 DisconnectMessage message = new DisconnectMessage(); 53 message.setSessionId(call_info.getSessionId()); 54 DisconnectReasonElement disc_element = new DisconnectReasonElement(); 55 disc_element.setReasonCode(0); 56 disc_element.setReasonText(java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", ServiceController.getLocale(call_info.getEndpoint().getParam("language"))).getString("Message_Service_disconnected")); 57 message.setDisconnectReason(disc_element); 58 59 if (ServiceController.Instance().sendMessage(new MessageEvent(MessageEvent.DISCONNECT_MESSAGE, 60 this, 61 message, 62 null)) == false) 63 { 64 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 66 getName() 67 + "- MessageBox.dispose() -- Error sending disconnect message to the service controller"); 68 } 69 } 70 71 callList.clear(); 72 pendingDbOps.clear(); 73 } 74 75 if (registered == true) 76 { 77 if (ServiceController.Instance().sendMessage(new UnregistrationEvent(userName)) == false) 79 { 80 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 82 Thread.currentThread().getName() 83 + "- MessageBox.dispose() -- Error sending unregistration message to the service controller"); 84 } 85 86 registered = false; 87 } 88 89 super.dispose(); 90 } 91 92 public boolean init(String name, Map params) 93 { 94 if (AceMailService.getInstance() == null) { 96 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 98 "Feature " + name 99 + " error : Ace mail service must be active for this feature to operate"); 100 101 return false; 102 } 103 104 userName = name; 105 106 if (initParams(params) == false) 107 { 108 return false; 109 } 110 111 if (hostName == null) 112 { 113 try 114 { 115 hostName = InetAddress.getLocalHost().getHostName(); 116 } 117 catch (UnknownHostException ex) 118 { 119 hostName = "Unknown"; 120 } 121 } 122 123 synchronized(counterLock) 124 { 125 identifier = hostName + ":feature:" + userName + ":" + (new java.util.Date ()).getTime() 126 + ":" + counter++; 127 } 128 129 RegistrationRequestMessage reg = new RegistrationRequestMessage(); 131 reg.setUserName(name); 132 reg.setPassword(password); 133 134 boolean ret = ServiceController.Instance().sendEvent(new MessageEvent(MessageEvent.REGISTRATION_REQUEST, 135 this, 136 reg, 137 null)); 138 139 if (ret == false) 140 { 141 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 143 getName() 144 + "- MessageBox.init() -- could not send registration message to the service controller"); 145 return false; 146 } 147 148 return true; 149 } 150 151 private boolean initParams(Map params) 152 { 153 synchronized (paramLock) 154 { 155 password = (String )params.get("password"); 156 157 defaultFrom = (String )params.get("from"); 158 if (defaultFrom != null) 159 { 160 try 161 { 162 InternetAddress addr = new InternetAddress(defaultFrom); 163 } 164 catch (AddressException ex) 165 { 166 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 167 "Feature " + userName 168 + " error : The format of the 'from' parameter is not a valid internet email address"); 169 170 return false; 171 } 172 } 173 } 174 175 return true; 176 } 177 178 public void start() 179 { 180 super.start(); 181 } 182 183 public void run() 184 { 185 while (true) 186 { 187 AceMessageInterface message = waitMessage(); 188 if (message == null) 189 { 190 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 192 getName() 193 + "- MessageBox.run() -- A null message was received while waiting for a message - " 194 + getErrorMessage()); 195 196 break; 197 } 198 199 if ((message instanceof AceSignalMessage) == true) 200 { 201 203 AceLogger.Instance().log(AceLogger.INFORMATIONAL, AceLogger.SYSTEM_LOG, 205 getName() 206 + " - MessageBox.run() -- A signal " 207 + ((AceSignalMessage)message).getSignalId() 208 + " is received : " 209 + ((AceSignalMessage)message).getMessage()); 210 break; 211 } 212 else if ((message instanceof MessageEvent) == true) 213 { 214 boolean ret = processMessageEvent((MessageEvent)message); 215 if (ret == false) 216 { 217 break; 218 } 219 } 220 else if ((message instanceof AceSQLMessage) == true) 221 { 222 DbOperation op = (DbOperation)((AceSQLMessage)message).getUserParm(); 223 if (op != null) 224 { 225 synchronized (callList) 226 { 227 if (pendingDbOps.contains(op) == true) 228 { 229 if (op.processResponse((AceSQLMessage)message) == true) 230 { 231 pendingDbOps.remove(new Long (op.getSessionId())); 232 } 233 } 234 else 235 { 236 AceLogger.Instance().log(AceLogger.ERROR, 237 AceLogger.SYSTEM_LOG, 238 getName() 239 + "- MessageBox.run() -- database handler not in db operation list."); 240 } 241 } 242 } 243 else 244 { 245 AceLogger.Instance().log(AceLogger.ERROR, 246 AceLogger.SYSTEM_LOG, 247 getName() 248 + "- MessageBox.run() -- No database handler for database event."); 249 } 250 } 251 else 252 { 253 AceLogger.Instance().log(AceLogger.WARNING, AceLogger.SYSTEM_LOG, 254 getName() 255 + "- MessageBox.run() -- An unexpected message was received while waiting for a message - " 256 + message.messageType()); 257 } 258 259 } 261 dispose(); 262 } 263 264 private boolean processMessageEvent(MessageEvent message) 265 { 266 switch (message.getEventType()) 267 { 268 case MessageEvent.REGISTRATION_RESPONSE: 269 return processRegistrationResponseEvent(message); 270 271 case MessageEvent.SETUP_REQUEST: 272 return processSetupRequestEvent(message); 273 274 case MessageEvent.DISCONNECT_MESSAGE: 275 return processDisconnectMessage(message); 276 277 case MessageEvent.RTP_MESSAGE: 278 return processRTPMessage(message); 279 280 default: 281 break; 283 } 284 285 return true; } 287 288 private boolean processRegistrationResponseEvent(MessageEvent event) 289 { 290 if (registered == true) { 292 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 294 getName() 295 + "- MessageBox.processRegistrationResponseEvent() -- A registration response event is received for this feature that is already registered"); 296 return false; 297 } 298 299 if (event.getResponseStatus() != AceHTTPMessage.OK) 301 { 302 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 304 getName() 305 + "- MessageBox.processRegistrationResponseEvent() -- Registration failed, status: " 306 + event.getResponseStatus()); 307 return false; 308 } 309 310 RegistrationResponseMessage resp_message = (RegistrationResponseMessage)event.getMessage(); 311 if (resp_message != null) 312 { 313 selfInfo = resp_message.getCallPartyInfo(); 314 } 315 316 registered = true; 317 return true; 318 } 319 320 private boolean processSetupRequestEvent(MessageEvent event) 321 { 322 if ((event.getMessage() instanceof SetupRequestMessage) == true) 323 { 324 if (registered == false) 325 { 326 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 328 getName() 329 + "- MessageBox.processSetupRequestEvent() -- A setup request event is received but this endpoint is not registered"); 330 return false; 331 } 332 333 EndPointInterface calling_party = event.getFrom(); 334 335 SetupRequestMessage incoming_message = (SetupRequestMessage)event.getMessage(); 336 long session_id = incoming_message.getSessionId(); 337 String messagebox_user = incoming_message.getTransferFrom(); 338 339 if (messagebox_user == null) 340 { 341 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 343 getName() 344 + "- MessageBox.processSetupRequestEvent() -- No transfer-from info available"); 345 346 dropCall(session_id, 0, java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", ServiceController.getLocale(calling_party.getParam("language"))).getString("Message_processing_failure")); 347 348 return true; 349 } 350 351 String enc_key = incoming_message.getEncryptedKey(); 352 String key = null; 353 if (enc_key != null) 354 { 355 key = enc_key; 356 } 357 358 UserElement user_data = EndPointList.Instance().findRegisteredUserData(messagebox_user); 360 361 if (user_data != null) 362 { 363 if ((user_data.getAddress() == null) || (user_data.getAddress().length() == 0)) 364 { 365 dropCall(session_id, 0, 366 java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", ServiceController.getLocale(calling_party.getParam("language"))).getString("Message_service_is_not_enabled_for_this_user._Please_try_your_call_again_later.")); 367 } 368 else 369 { 370 CallInfo call = new CallInfo(session_id, calling_party, messagebox_user); 372 call.setMailboxAddress(user_data.getAddress()); 373 call.setEncryptedKey(key); 374 if (incoming_message.getCallingNameElement() != null) 375 if (incoming_message.getCallingNameElement().getCallParty() != null) 376 { 377 call.setFromAddress(incoming_message.getCallingNameElement().getCallParty().getEmail()); 378 } 379 380 synchronized (callList) 381 { 382 addToCallList(session_id, call); 383 } 384 385 answerCall(call); 386 } 387 388 return true; 389 } 390 391 393 CallInfo call = new CallInfo(session_id, calling_party, messagebox_user); 394 call.setEncryptedKey(key); 395 if (incoming_message.getCallingNameElement() != null) 396 if (incoming_message.getCallingNameElement().getCallParty() != null) 397 { 398 call.setFromAddress(incoming_message.getCallingNameElement().getCallParty().getEmail()); 399 } 400 401 DbOperation op = new DbOperation(call, ServiceController.Instance().getDatabase()); 402 403 if (op.initiate() == false) 404 { 405 AceLogger.Instance().log(AceLogger.ERROR, 406 AceLogger.SYSTEM_LOG, 407 "MessageBox.processSetupRequestEvent() -- Failure initiating DB operation for user " 408 + messagebox_user 409 + ", error : " 410 + op.getLastError()); 411 412 dropCall(session_id, 0, java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", ServiceController.getLocale(calling_party.getParam("language"))).getString("Message_processing_database_failure")); 413 414 return true; 415 } 416 417 synchronized (callList) 418 { 419 pendingDbOps.put(new Long (session_id), op); 420 addToCallList(session_id, call); 421 } 422 423 } 425 426 return true; 427 } 428 429 private boolean processDisconnectMessage(MessageEvent event) 430 { 431 if ((event.getMessage() instanceof DisconnectMessage) == true) 432 { 433 DisconnectMessage message = (DisconnectMessage)event.getMessage(); 434 long session_id = message.getSessionId(); 435 436 CallInfo call = getCallInfo(session_id); 437 if (call == null) 438 { 439 return true; 440 } 441 442 synchronized (callList) 444 { 445 DbOperation db_op = (DbOperation)pendingDbOps.get(new Long (session_id)); 446 if (db_op != null) 447 { 448 db_op.cancel(); 449 pendingDbOps.remove(new Long (session_id)); 450 } 451 452 removeFromCallList(session_id); 454 } 455 } 456 457 return true; 458 } 459 460 private boolean processRTPMessage(MessageEvent event) 461 { 462 if ((event.getMessage() instanceof RTPMessage) == true) 463 { 464 RTPMessage message = (RTPMessage)event.getMessage(); 465 466 long session_id = message.getSessionId(); 467 468 CallInfo call_info = getCallInfo(session_id); 469 if (call_info == null) { 471 return true; 472 } 473 474 if (call_info.isConnected() == false) 475 { 476 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 478 getName() 479 + "- MessageBox.processRTPMessage() -- An RTP message is received for a call that is not connected"); 480 return true; 482 } 483 484 MediaElements media = message.getMediaElements(); 486 int num = media.numMediaElements(); 487 488 for (int i = 0; i < num; i++) 489 { 490 MediaElementInterface medium = media.elementAt(i); 491 if ((medium instanceof com.quikj.application.web.talk.messaging.TextElement) 492 == true) 493 { 494 String text = ((com.quikj.application.web.talk.messaging.TextElement)medium).getMessage(); 495 496 if (text != null) 497 { 498 AceMailMessage out_mail = new AceMailMessage(); 500 501 out_mail.addTo(call_info.getMailboxAddress()); 502 if ((call_info.getFromAddress() == null) || ((call_info.getFromAddress()).length() < 1)) 503 { 504 if (defaultFrom != null) 505 { 506 out_mail.setFrom(defaultFrom); 507 } 508 } 509 else 510 { 511 out_mail.setFrom(call_info.getFromAddress()); 512 } 513 514 out_mail.setBody(text); 515 516 out_mail.setSubject("ACE MAILBOX MESSAGE"); 517 518 if (AceMailService.getInstance().addToMailQueue(out_mail) == true) 519 { 520 dropCall(session_id, 0, 522 java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", 523 ServiceController.getLocale(call_info.getEndpoint().getParam("language"))).getString("Your_message_is_being_sent")); 524 msgCountSuccess++; 525 526 } 527 else 528 { 529 dropCall(session_id, 0, 530 java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", 531 ServiceController.getLocale(call_info.getEndpoint().getParam("language"))).getString("Sorry,_your_message_couldn't_be_sent")); 532 } 533 } 534 } 535 } 536 } 537 538 return true; 539 } 540 541 private void dropCall(long session_id, int reason_code, String reason_text) 542 { 543 545 DisconnectMessage message = new DisconnectMessage(); 546 message.setSessionId(session_id); 547 DisconnectReasonElement disc_reason = new DisconnectReasonElement(); 548 disc_reason.setReasonCode(reason_code); 549 disc_reason.setReasonText(reason_text); 550 message.setDisconnectReason(disc_reason); 551 552 if (ServiceController.Instance().sendMessage(new MessageEvent(MessageEvent.DISCONNECT_MESSAGE, 553 this, 554 message, 555 null)) == false) 556 { 557 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 559 getName() 560 + "- MessageBox.dropCall() -- Error sending disconnect message to the service controller"); 561 } 562 563 synchronized (callList) 565 { 566 removeFromCallList(session_id); 567 } 568 } 569 570 private void answerCall(CallInfo call) 571 { 572 574 SetupResponseMessage response = new SetupResponseMessage(); 575 response.setSessionId(call.getSessionId()); 576 577 CalledNameElement cp = new CalledNameElement(); 578 cp.setCallParty(selfInfo); 579 cp.setTerminal("com.quikj.application.web.talk.feature.messagebox.client.MessageBoxClient"); 580 response.setCalledParty(cp); 581 582 MediaElements media = new MediaElements(); 583 TextElement telem = new TextElement(); 584 telem.setMessage(java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", 585 ServiceController.getLocale(call.getEndpoint().getParam("language"))).getString("No_one_is_available._To_leave_a_message,_type_the_message_below_and_click_on_the_\"Send\"_button")); 586 media.addMediaElement(telem); 587 response.setMediaElements(media); 588 589 if (ServiceController.Instance().sendMessage(new MessageEvent(MessageEvent.SETUP_RESPONSE, 591 this, 592 SetupResponseMessage.CONNECT, 593 java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", 594 ServiceController.getLocale(call.getEndpoint().getParam("language"))).getString("Answered"), 595 response, 596 null)) == false) 597 { 598 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 600 getName() 601 + "- MessageBox.answerCall() -- Error sending message (CONNECT) to the service controller"); 602 603 return; 604 } 605 606 call.setConnected(true); 607 608 } 609 610 private void addToCallList(long session_id, CallInfo call) 611 { 612 callList.put(new Long (session_id), call); 613 614 EndPointList.Instance().setCallCount(this, callList.size()); 615 ServiceController.Instance().groupNotifyOfCallCountChange(this); 616 } 617 618 private void removeFromCallList(long session_id) 619 { 620 if (callList.remove(new Long (session_id)) != null) 621 { 622 EndPointList.Instance().setCallCount(this, callList.size()); 623 ServiceController.Instance().groupNotifyOfCallCountChange(this); 624 } 625 } 626 627 private CallInfo getCallInfo(long session_id) 628 { 629 return (CallInfo)callList.get(new Long (session_id)); 630 } 631 632 633 public String getIdentifier() 634 { 635 return identifier; 636 } 637 638 public String getParam(String key) 639 { 640 synchronized (keyValuePair) 641 { 642 return (String )keyValuePair.get(key); 643 } 644 } 645 646 public Object getParamObject(String key) 647 { 648 synchronized (keyValuePair) 649 { 650 return keyValuePair.get(key); 651 } 652 } 653 654 public String getUserName() 655 { 656 return userName; 657 } 658 659 public void removeParam(String key) 660 { 661 synchronized (keyValuePair) 662 { 663 keyValuePair.remove(key); 664 } 665 } 666 667 public void resynchParam(Map params) 668 { 669 initParams(params); 670 } 671 672 public boolean sendEvent(AceMessageInterface message) 673 { 674 return super.sendMessage(message); 675 } 676 677 public void setParam(String key, String value) 678 { 679 synchronized (keyValuePair) 680 { 681 keyValuePair.put(key, value); 682 } 683 } 684 685 public void setParamObject(String key, Object value) 686 { 687 synchronized (keyValuePair) 688 { 689 keyValuePair.put(key, value); 690 } 691 } 692 693 public RowElement getStatsData() 694 { 695 RowElement ele = new RowElement(); 696 697 ele.addListItem(userName); 698 ele.addListItem(selfInfo.getFullName()); 699 ele.addListItem(new Integer (msgCountSuccess).toString()); 700 701 return ele; 702 } 703 704 public MetaDataElement getStatsMetaData() 705 { 706 MetaDataElement ele = new MetaDataElement(); 707 708 ele.addListItem("Name"); 709 ele.addListItem("Full Name"); 710 ele.addListItem("Msgs Left Last Interval"); 711 712 return ele; 713 } 714 715 public void clearStatsCounts() 716 { 717 msgCountSuccess = 0; 718 } 719 720 private class DbOperation 721 { 722 public DbOperation(CallInfo call, AceSQL database) 723 { 724 this.call = call; 725 this.database = database; 726 } 727 728 public boolean initiate() 729 { 730 try 731 { 732 Statement statement = UserTable.getQueryStatement(database.getConnection(), call.getMailboxUser()); 733 734 operationId = database.executeSQL(statement, 735 (String []) null, 736 this); 737 738 if (operationId == -1) 739 { 740 lastError = MessageBox.this.getErrorMessage(); 741 return false; 742 } 743 744 return true; 745 } 746 catch (SQLException ex) 747 { 748 lastError = "Error creating SQL statement : " + ex.getMessage(); 749 return false; 750 } 751 } 752 753 public String getLastError() 754 { 755 return lastError; 756 } 757 758 public boolean processResponse(AceSQLMessage message) { 760 if (message.getStatus() == AceSQLMessage.SQL_ERROR) 761 { 762 AceLogger.Instance().log(AceLogger.ERROR, 763 AceLogger.SYSTEM_LOG, 764 MessageBox.this.getName() 765 + "- DbOperation.processResponse() -- Database error result."); 766 767 MessageBox.this.dropCall(call.getSessionId(), 0, "Message processing database error"); 769 770 return true; 771 } 772 773 ResultSet r = message.getResultSet(); 774 UserElement user_data = new UserElement(); 775 user_data.setName(call.getMailboxUser()); 776 777 try 778 { 779 if ((r == null) || (r.next() == false)) 780 { 781 MessageBox.this.dropCall(call.getSessionId(), 0, 784 java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", 785 ServiceController.getLocale(call.getEndpoint().getParam("language"))).getString("Message_collection_not_possible_for_this_user._Please_try_your_call_again_later.")); 786 787 return true; 788 } 789 790 UserTable.processQueryResult(user_data, r); 791 } 792 catch (SQLException ex) 793 { 794 AceLogger.Instance().log(AceLogger.ERROR, 795 AceLogger.SYSTEM_LOG, 796 MessageBox.this.getName() 797 + "- DbOperation.processResponse() -- Exception processing database result : " 798 + ex.getMessage()); 799 800 MessageBox.this.dropCall(call.getSessionId(), 0, 801 java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", 802 ServiceController.getLocale(call.getEndpoint().getParam("language"))).getString("Message_processing_database_error_encountered._Please_try_your_call_again_later.")); 803 804 return true; 805 } 806 807 if ((user_data.getAddress() == null) || (user_data.getAddress().length() == 0)) 808 { 809 MessageBox.this.dropCall(call.getSessionId(), 0, 810 java.util.ResourceBundle.getBundle("com.quikj.application.web.talk.feature.messagebox.server.language", 811 ServiceController.getLocale(call.getEndpoint().getParam("language"))).getString("Message_collection_is_not_enabled_for_this_user._Please_try_your_call_again_later.")); 812 } 813 else 814 { 815 call.setMailboxAddress(user_data.getAddress()); 816 817 MessageBox.this.answerCall(call); 818 } 819 820 return true; 821 } 822 823 public void cancel() 824 { 825 database.cancelSQL(operationId, MessageBox.this); 826 } 827 828 public long getSessionId() 829 { 830 return call.getSessionId(); 831 } 832 833 private CallInfo call; 834 private AceSQL database; 835 private int operationId; 836 837 private String lastError = ""; 838 839 } 840 841 private static String hostName; 842 private static int counter = 0; 843 private static Object counterLock = new Object (); 844 private String identifier; 845 private String userName = null; 846 private CallPartyElement selfInfo; 847 private boolean registered = false; 848 private String password; 849 private String defaultFrom; 850 private Object paramLock = new Object (); 851 private HashMap keyValuePair = new HashMap(); 852 private int msgCountSuccess; 853 854 private Hashtable callList = new Hashtable(); private Hashtable pendingDbOps = new Hashtable(); 857 858 } 859 | Popular Tags |