1 24 package org.objectweb.joram.mom.dest; 25 26 import java.io.ByteArrayOutputStream ; 27 import java.io.PrintWriter ; 28 import java.lang.reflect.Method ; 29 import java.util.Enumeration ; 30 import java.util.Hashtable ; 31 import java.util.Properties ; 32 import java.util.Vector ; 33 34 import org.objectweb.joram.mom.notifications.ClientMessages; 35 import org.objectweb.joram.mom.notifications.ClusterRequest; 36 import org.objectweb.joram.mom.notifications.DestinationAdminRequestNot; 37 import org.objectweb.joram.mom.notifications.GetProxyIdListNot; 38 import org.objectweb.joram.mom.notifications.GetProxyIdNot; 39 import org.objectweb.joram.mom.notifications.Monit_FreeAccess; 40 import org.objectweb.joram.mom.notifications.Monit_FreeAccessRep; 41 import org.objectweb.joram.mom.notifications.Monit_GetCluster; 42 import org.objectweb.joram.mom.notifications.Monit_GetClusterRep; 43 import org.objectweb.joram.mom.notifications.Monit_GetDMQSettings; 44 import org.objectweb.joram.mom.notifications.Monit_GetDMQSettingsRep; 45 import org.objectweb.joram.mom.notifications.Monit_GetFather; 46 import org.objectweb.joram.mom.notifications.Monit_GetFatherRep; 47 import org.objectweb.joram.mom.notifications.Monit_GetNbMaxMsg; 48 import org.objectweb.joram.mom.notifications.Monit_GetNbMaxMsgRep; 49 import org.objectweb.joram.mom.notifications.Monit_GetNumberRep; 50 import org.objectweb.joram.mom.notifications.Monit_GetPendingMessages; 51 import org.objectweb.joram.mom.notifications.Monit_GetPendingRequests; 52 import org.objectweb.joram.mom.notifications.Monit_GetReaders; 53 import org.objectweb.joram.mom.notifications.Monit_GetStat; 54 import org.objectweb.joram.mom.notifications.Monit_GetStatRep; 55 import org.objectweb.joram.mom.notifications.Monit_GetSubscriptions; 56 import org.objectweb.joram.mom.notifications.Monit_GetUsersRep; 57 import org.objectweb.joram.mom.notifications.Monit_GetWriters; 58 import org.objectweb.joram.mom.notifications.RegisterDestNot; 59 import org.objectweb.joram.mom.notifications.RegisterTmpDestNot; 60 import org.objectweb.joram.mom.notifications.RegisteredDestNot; 61 import org.objectweb.joram.mom.notifications.RequestGroupNot; 62 import org.objectweb.joram.mom.notifications.SetDMQRequest; 63 import org.objectweb.joram.mom.notifications.SetFatherRequest; 64 import org.objectweb.joram.mom.notifications.SetNbMaxMsgRequest; 65 import org.objectweb.joram.mom.notifications.SetRightRequest; 66 import org.objectweb.joram.mom.notifications.SetThreshRequest; 67 import org.objectweb.joram.mom.notifications.SpecialAdminRequest; 68 import org.objectweb.joram.mom.notifications.UnclusterRequest; 69 import org.objectweb.joram.mom.notifications.UnsetFatherRequest; 70 import org.objectweb.joram.mom.notifications.UserAdminRequestNot; 71 import org.objectweb.joram.mom.proxies.AdminNotification; 72 import org.objectweb.joram.mom.proxies.SendReplyNot; 73 import org.objectweb.joram.mom.proxies.UserAgent; 74 import org.objectweb.joram.shared.JoramTracing; 75 import org.objectweb.joram.shared.admin.AddDomainRequest; 76 import org.objectweb.joram.shared.admin.AddServerRequest; 77 import org.objectweb.joram.shared.admin.AddServiceRequest; 78 import org.objectweb.joram.shared.admin.AdminReply; 79 import org.objectweb.joram.shared.admin.AdminRequest; 80 import org.objectweb.joram.shared.admin.CreateDestinationReply; 81 import org.objectweb.joram.shared.admin.CreateDestinationRequest; 82 import org.objectweb.joram.shared.admin.CreateUserReply; 83 import org.objectweb.joram.shared.admin.CreateUserRequest; 84 import org.objectweb.joram.shared.admin.DeleteDestination; 85 import org.objectweb.joram.shared.admin.DeleteUser; 86 import org.objectweb.joram.shared.admin.GetConfigRequest; 87 import org.objectweb.joram.shared.admin.GetDomainNames; 88 import org.objectweb.joram.shared.admin.GetDomainNamesRep; 89 import org.objectweb.joram.shared.admin.GetLocalServer; 90 import org.objectweb.joram.shared.admin.GetLocalServerRep; 91 import org.objectweb.joram.shared.admin.GetSubscriberIds; 92 import org.objectweb.joram.shared.admin.Monitor_GetCluster; 93 import org.objectweb.joram.shared.admin.Monitor_GetClusterRep; 94 import org.objectweb.joram.shared.admin.Monitor_GetDMQSettings; 95 import org.objectweb.joram.shared.admin.Monitor_GetDMQSettingsRep; 96 import org.objectweb.joram.shared.admin.Monitor_GetDestinations; 97 import org.objectweb.joram.shared.admin.Monitor_GetDestinationsRep; 98 import org.objectweb.joram.shared.admin.Monitor_GetFather; 99 import org.objectweb.joram.shared.admin.Monitor_GetFatherRep; 100 import org.objectweb.joram.shared.admin.Monitor_GetFreeAccess; 101 import org.objectweb.joram.shared.admin.Monitor_GetFreeAccessRep; 102 import org.objectweb.joram.shared.admin.Monitor_GetNbMaxMsg; 103 import org.objectweb.joram.shared.admin.Monitor_GetNbMaxMsgRep; 104 import org.objectweb.joram.shared.admin.Monitor_GetNumberRep; 105 import org.objectweb.joram.shared.admin.Monitor_GetPendingMessages; 106 import org.objectweb.joram.shared.admin.Monitor_GetPendingRequests; 107 import org.objectweb.joram.shared.admin.Monitor_GetReaders; 108 import org.objectweb.joram.shared.admin.Monitor_GetServersIds; 109 import org.objectweb.joram.shared.admin.Monitor_GetServersIdsRep; 110 import org.objectweb.joram.shared.admin.Monitor_GetStat; 111 import org.objectweb.joram.shared.admin.Monitor_GetStatRep; 112 import org.objectweb.joram.shared.admin.Monitor_GetSubscriptions; 113 import org.objectweb.joram.shared.admin.Monitor_GetUsers; 114 import org.objectweb.joram.shared.admin.Monitor_GetUsersRep; 115 import org.objectweb.joram.shared.admin.Monitor_GetWriters; 116 import org.objectweb.joram.shared.admin.QueueAdminRequest; 117 import org.objectweb.joram.shared.admin.RemoveDomainRequest; 118 import org.objectweb.joram.shared.admin.RemoveServerRequest; 119 import org.objectweb.joram.shared.admin.RemoveServiceRequest; 120 import org.objectweb.joram.shared.admin.SetCluster; 121 import org.objectweb.joram.shared.admin.SetDefaultDMQ; 122 import org.objectweb.joram.shared.admin.SetDefaultThreshold; 123 import org.objectweb.joram.shared.admin.SetDestinationDMQ; 124 import org.objectweb.joram.shared.admin.SetFather; 125 import org.objectweb.joram.shared.admin.SetNbMaxMsg; 126 import org.objectweb.joram.shared.admin.SetQueueThreshold; 127 import org.objectweb.joram.shared.admin.SetReader; 128 import org.objectweb.joram.shared.admin.SetRight; 129 import org.objectweb.joram.shared.admin.SetUserDMQ; 130 import org.objectweb.joram.shared.admin.SetUserThreshold; 131 import org.objectweb.joram.shared.admin.SetWriter; 132 import org.objectweb.joram.shared.admin.SpecialAdmin; 133 import org.objectweb.joram.shared.admin.StopServerRequest; 134 import org.objectweb.joram.shared.admin.UnsetCluster; 135 import org.objectweb.joram.shared.admin.UnsetDefaultDMQ; 136 import org.objectweb.joram.shared.admin.UnsetDefaultThreshold; 137 import org.objectweb.joram.shared.admin.UnsetDestinationDMQ; 138 import org.objectweb.joram.shared.admin.UnsetFather; 139 import org.objectweb.joram.shared.admin.UnsetQueueThreshold; 140 import org.objectweb.joram.shared.admin.UnsetReader; 141 import org.objectweb.joram.shared.admin.UnsetUserDMQ; 142 import org.objectweb.joram.shared.admin.UnsetUserThreshold; 143 import org.objectweb.joram.shared.admin.UnsetWriter; 144 import org.objectweb.joram.shared.admin.UpdateUser; 145 import org.objectweb.joram.shared.admin.UserAdminRequest; 146 import org.objectweb.joram.shared.excepts.AccessException; 147 import org.objectweb.joram.shared.excepts.MomException; 148 import org.objectweb.joram.shared.excepts.RequestException; 149 import org.objectweb.joram.shared.messages.Message; 150 import org.objectweb.util.monolog.api.BasicLevel; 151 152 import fr.dyade.aaa.agent.Agent; 153 import fr.dyade.aaa.agent.AgentId; 154 import fr.dyade.aaa.agent.AgentServer; 155 import fr.dyade.aaa.agent.DeleteNot; 156 import fr.dyade.aaa.agent.Notification; 157 import fr.dyade.aaa.agent.ServerConfigHelper; 158 import fr.dyade.aaa.agent.UnknownAgent; 159 import fr.dyade.aaa.agent.UnknownServerException; 160 import fr.dyade.aaa.agent.conf.A3CML; 161 import fr.dyade.aaa.agent.conf.A3CMLConfig; 162 import fr.dyade.aaa.agent.conf.A3CMLDomain; 163 import fr.dyade.aaa.agent.conf.A3CMLNetwork; 164 import fr.dyade.aaa.agent.conf.A3CMLServer; 165 166 170 public final class AdminTopicImpl extends TopicImpl implements AdminTopicImplMBean { 171 172 private static AdminTopicImpl ref; 173 174 public static AdminTopicImpl getReference() { 175 return ref; 176 } 177 178 179 private int serverId; 180 181 187 private Hashtable destinationsTable; 188 189 195 private Hashtable usersTable; 196 202 private Hashtable proxiesTable; 203 204 210 private Hashtable requestsTable; 211 212 private long msgCounter = 0; 213 214 218 private AgentId defaultDMQId; 219 220 private Integer defaultThreshold; 221 222 227 public AdminTopicImpl(AgentId topicId) { 228 super(topicId, topicId, null); 229 serverId = AgentServer.getServerId(); 230 destinationsTable = new Hashtable (); 231 usersTable = new Hashtable (); 232 proxiesTable = new Hashtable (); 233 requestsTable = new Hashtable (); 234 } 235 236 public String toString() { 237 return "AdminTopicImpl:" + destId.toString(); 238 } 239 240 241 252 public AgentId getProxyId(String name, 253 String pass, 254 String inaddr) throws Exception { 255 String userPass = null; 256 AgentId userProxId = null; 257 258 userPass = (String ) usersTable.get(name); 259 if (userPass != null) { 260 if (! userPass.equals(pass)) 261 throw new Exception ("Invalid password for user [" + name + "]"); 262 userProxId = (AgentId) proxiesTable.get(name); 263 if (userProxId == null) 264 throw new Exception ("No proxy deployed for user [" + name + "]"); 265 266 return userProxId; 267 } 268 else 269 throw new Exception ("User [" + name + "] does not exist"); 270 } 271 272 273 public String getName(AgentId proxyId) { 274 String name; 275 for (Enumeration e = proxiesTable.keys(); e.hasMoreElements();) { 276 name = (String ) e.nextElement(); 277 if (proxyId.equals(proxiesTable.get(name))) 278 return name; 279 } 280 return null; 281 } 282 283 284 public String getPassword(AgentId proxyId) { 285 String name; 286 for (Enumeration e = proxiesTable.keys(); e.hasMoreElements();) { 287 name = (String ) e.nextElement(); 288 if (proxyId.equals(proxiesTable.get(name))) 289 return (String ) usersTable.get(name);; 290 } 291 return null; 292 } 293 294 295 public boolean isTaken(String name) { 296 return usersTable.containsKey(name); 297 } 298 299 300 public AgentId getId() { 301 return destId; 302 } 303 304 309 public void AdminNotification(AgentId from, AdminNotification adminNot) { 310 String name = adminNot.getName(); 311 String pass = adminNot.getPass(); 312 313 usersTable.put(name, pass); 314 proxiesTable.put(name, adminNot.getProxyId()); 315 316 clients.put(adminNot.getProxyId(), new Integer (READWRITE)); 317 318 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 319 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, name + " successfully" 320 + " set as admin client."); 321 } 322 323 327 public void AdminRequestNot(AgentId from, AdminRequestNot adminNot) { 328 processAdminRequests(adminNot.replyTo, adminNot.msgId, adminNot.request, from); 330 } 331 332 339 public void AdminReply(AgentId from, 340 org.objectweb.joram.mom.notifications.AdminReply not) { 341 String requestId = not.getRequestId(); 342 if (requestId == null) return; 343 344 AgentId replyTo = (AgentId) requestsTable.remove(requestId); 345 if (replyTo == null) return; 346 347 AdminReply reply; 348 349 if (not instanceof Monit_GetUsersRep) 350 reply = doProcess((Monit_GetUsersRep) not); 351 else if (not instanceof Monit_FreeAccessRep) 352 reply = doProcess((Monit_FreeAccessRep) not); 353 else if (not instanceof Monit_GetDMQSettingsRep) 354 reply = doProcess((Monit_GetDMQSettingsRep) not); 355 else if (not instanceof Monit_GetFatherRep) 356 reply = doProcess((Monit_GetFatherRep) not); 357 else if (not instanceof Monit_GetClusterRep) 358 reply = doProcess((Monit_GetClusterRep) not); 359 else if (not instanceof Monit_GetNumberRep) 360 reply = doProcess((Monit_GetNumberRep) not); 361 else if (not instanceof Monit_GetStatRep) 362 reply = doProcess((Monit_GetStatRep) not); 363 else if (not instanceof Monit_GetNbMaxMsgRep) 364 reply = doProcess((Monit_GetNbMaxMsgRep) not); 365 else 366 reply = new AdminReply(not.getSuccess(), 367 not.getInfo(), 368 not.getReplyObject()); 369 370 distributeReply(replyTo, requestId, reply); 371 } 372 373 public void GetProxyIdNot(GetProxyIdNot not) { 374 try { 375 AgentId proxyId = getProxyId(not.getUserName(), 376 not.getPassword(), 377 not.getInAddr()); 378 not.Return(proxyId); 379 } catch (Exception exc) { 380 not.Throw(exc); 381 } 382 } 383 384 public void GetProxyIdListNot(GetProxyIdListNot not) { 385 Vector idList = new Vector (); 386 Enumeration ids = proxiesTable.elements(); 387 while (ids.hasMoreElements()) { 388 AgentId aid = (AgentId)ids.nextElement(); 389 idList.addElement(aid); 390 } 391 AgentId[] res = new AgentId[idList.size()]; 392 idList.copyInto(res); 393 not.Return(res); 394 } 395 396 public void RegisterTmpDestNot(RegisterTmpDestNot not) { 397 String destName = not.getTmpDestId().toString(); 398 if (not.toAdd()) { 399 String type; 400 String className; 401 if (not.isTopic()) { 402 type = "topic.tmp"; 403 className = Topic.class.getName(); 404 } else { 405 type = "queue.tmp"; 406 className = Queue.class.getName(); 407 } 408 DestinationDesc destDesc = 409 new DestinationDesc( 410 not.getTmpDestId(), 411 destName, 412 className, 413 type); 414 destinationsTable.put(destName, destDesc); 415 } else { 416 destinationsTable.remove(destName); 417 } 418 } 419 420 public void RegisterDestNot(RegisterDestNot not) { 421 String name = not.getName(); 422 if (name == null || destinationsTable.contains(name)) 423 return; 424 425 DestinationDesc destDesc = 426 new DestinationDesc( 427 not.getId(), 428 not.getName(), 429 not.getClassName(), 430 not.getType()); 431 destinationsTable.put(name, destDesc); 432 } 433 434 public void RegisteredDestNot(AgentId from, RegisteredDestNot not) { 435 DestinationDesc destDesc = 436 (DestinationDesc) destinationsTable.get(not.getName()); 437 if (destDesc != null) 438 not.setDestination(destDesc.getId()); 439 forward(not.getReply(), not); 440 } 441 442 446 private AdminReply doProcess(Monit_GetUsersRep not) { 447 Vector users = not.getUsers(); 448 449 String name; 450 AgentId proxyId; 451 Monitor_GetUsersRep reply = new Monitor_GetUsersRep(); 452 453 for (Enumeration names = proxiesTable.keys(); names.hasMoreElements();) { 454 name = (String ) names.nextElement(); 455 proxyId = (AgentId) proxiesTable.get(name); 456 457 if (users.contains(proxyId)) 458 reply.addUser(name, proxyId.toString()); 459 } 460 return reply; 461 } 462 463 467 private AdminReply doProcess(Monit_FreeAccessRep not) { 468 return new Monitor_GetFreeAccessRep(not.getFreeReading(), 469 not.getFreeWriting()); 470 } 471 472 476 private AdminReply doProcess(Monit_GetDMQSettingsRep not) { 477 return new Monitor_GetDMQSettingsRep(not.getDMQId(), not.getThreshold()); 478 } 479 480 484 private AdminReply doProcess(Monit_GetFatherRep not) { 485 return new Monitor_GetFatherRep(not.getFatherId()); 486 } 487 488 492 private AdminReply doProcess(Monit_GetClusterRep not) { 493 return new Monitor_GetClusterRep(not.getTopics()); 494 } 495 496 500 private AdminReply doProcess(Monit_GetNumberRep not) { 501 return new Monitor_GetNumberRep(not.getNumber()); 502 } 503 504 508 private AdminReply doProcess(Monit_GetStatRep not) { 509 return new Monitor_GetStatRep(not.getStats()); 510 } 511 512 516 private AdminReply doProcess(Monit_GetNbMaxMsgRep not) { 517 return new Monitor_GetNbMaxMsgRep(not.getNbMaxMsg()); 518 } 519 520 526 public void setRightRequest(AgentId from, SetRightRequest request) throws AccessException { 527 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 528 JoramTracing.dbgDestination.log(BasicLevel.WARN, 529 "Unexpected request: " + request); 530 } 531 532 538 public void setDMQRequest(AgentId from, SetDMQRequest request) throws AccessException { 539 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 540 JoramTracing.dbgDestination.log(BasicLevel.WARN, 541 "Unexpected request: " + request); 542 } 543 544 public void requestGroupNot(AgentId from, RequestGroupNot not) { 545 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 546 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 547 "AdminTopicImpl.requestGroupNot(" + not + ')'); 548 Enumeration en = not.getClientMessages(); 549 while (en.hasMoreElements()) { 550 ClientMessages cm = (ClientMessages) en.nextElement(); 551 try { 552 clientMessages(from, cm); 553 } catch (Exception exc) { 554 } 555 } 556 } 557 558 public SetRightRequest preProcess(SetRightRequest req) { 559 return req; 561 } 562 public void postProcess(SetRightRequest req) { 563 } 565 566 571 public ClientMessages preProcess(AgentId from, ClientMessages msgs) { 572 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 573 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 574 "AdminTopicImpl.clientMessages(" + msgs + ')'); 575 if (! msgs.getPersistent() && !msgs.getAsyncSend()) { 576 forward(from, 579 new SendReplyNot( 580 msgs.getClientContext(), 581 msgs.getRequestId())); 582 } 583 584 processAdminRequests(msgs); 586 587 return null; 588 } 589 590 594 public void deleteNot(AgentId from, DeleteNot not) { 595 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 596 JoramTracing.dbgDestination.log(BasicLevel.WARN, 597 "Unexpected request: " + not); 598 } 599 600 606 public void clusterRequest(AgentId from, ClusterRequest request) throws AccessException { 607 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 608 JoramTracing.dbgDestination.log(BasicLevel.WARN, 609 "Unexpected request: " + request); 610 } 611 612 616 public void clusterTest(AgentId from, ClusterTest request) { 617 forward(from, new ClusterAck(request, false, 618 "Topic [" + destId 619 + "] is an admin topic")); 620 } 621 622 626 public void clusterAck(AgentId from, ClusterAck ack) { 627 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 628 JoramTracing.dbgDestination.log(BasicLevel.WARN, 629 "Unexpected notification: " + ack); 630 } 631 632 637 public void clusterNot(AgentId from, ClusterNot not) { 638 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 639 JoramTracing.dbgDestination.log(BasicLevel.WARN, 640 "Unexpected notification: " + not); 641 } 642 643 649 public void unclusterRequest(AgentId from, UnclusterRequest request) throws MomException { 650 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 651 JoramTracing.dbgDestination.log(BasicLevel.WARN, 652 "Unexpected request: " + request); 653 } 654 655 661 public void setFatherRequest(AgentId from, SetFatherRequest request) 662 throws MomException { 663 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 664 JoramTracing.dbgDestination.log(BasicLevel.WARN, 665 "Unexpected request: " + request); 666 } 667 668 672 public void fatherTest(AgentId from, FatherTest not) { 673 forward(from, new FatherAck(not, false, 674 "Topic [" + destId 675 + "] can't accept topic [" + from 676 + "] as a son as it is an AdminTopic")); 677 } 678 679 683 public void fatherAck(AgentId from, FatherAck ack) { 684 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 685 JoramTracing.dbgDestination.log(BasicLevel.WARN, 686 "Unexpected notification: " + ack); 687 } 688 689 695 public void unsetFatherRequest(AgentId from, UnsetFatherRequest request) throws MomException { 696 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 697 JoramTracing.dbgDestination.log(BasicLevel.WARN, 698 "Unexpected request: " + request); 699 } 700 701 702 706 public void topicForwardNot(AgentId from, TopicForwardNot not) { 707 processAdminRequests(not.messages); 708 } 709 710 713 protected void doUnknownAgent(UnknownAgent uA) { 714 AgentId agId = uA.agent; 715 Notification not = uA.not; 716 717 if (not instanceof org.objectweb.joram.mom.notifications.AdminRequest) { 719 String reqId = 720 ((org.objectweb.joram.mom.notifications.AdminRequest) not).getId(); 721 722 if (reqId != null) { 723 AgentId replyTo = (AgentId) requestsTable.remove(reqId); 724 725 String info = strbuf.append("Request [") 726 .append(not.getClass().getName()) 727 .append("], sent to AdminTopic on server [") 728 .append(serverId) 729 .append("], successful [false]: unknown agent [") 730 .append(agId).append("]").toString(); 731 strbuf.setLength(0); 732 733 distributeReply(replyTo, reqId, new AdminReply(false, info)); 734 } 735 } else { 736 super.doUnknownAgent(uA); 737 } 738 } 739 740 744 private void processAdminRequests(ClientMessages not) { 745 Message msg; 746 String msgId = null; 747 AgentId replyTo = null; 748 String info = null; 749 AdminRequest request = null; 750 751 if (not == null) return; 752 753 Enumeration messages = not.getMessages().elements(); 754 755 while (messages.hasMoreElements()) { 756 nbMsgsReceiveSinceCreation = nbMsgsReceiveSinceCreation + 1; 757 msg = (Message) messages.nextElement(); 758 msgId = msg.id; 759 replyTo = AgentId.fromString(msg.getReplyToId()); 760 request = null; 761 762 try { 763 request = (AdminRequest) msg.getObject(); 764 765 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 766 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 767 "--- " + this + ": got " 768 + msg.getObject()); 769 } catch (ClassCastException exc) { 770 JoramTracing.dbgDestination.log(BasicLevel.ERROR, 771 "--- " + this + ": got bad object"); 772 if (request == null) { 773 info = strbuf.append("Unexpected request to AdminTopic on server [") 774 .append(serverId).append("]: ").append(exc.getMessage()).toString(); 775 strbuf.setLength(0); 776 } else { 777 info = strbuf.append("Request [").append(request.getClass().getName()) 778 .append("], sent to AdminTopic on server [").append(serverId) 779 .append("], successful [false]: ") 780 .append(exc.getMessage()).toString(); 781 strbuf.setLength(0); 782 } 783 784 distributeReply(replyTo, msgId, new AdminReply(false, info)); 785 } catch (Exception exc) {} 786 787 processAdminRequests(replyTo, msgId, request, null); 788 } 789 } 790 791 795 private void processAdminRequests(AgentId replyTo, 796 String msgId, 797 AdminRequest request, 798 AgentId from) { 799 String info = null; 800 801 setSave(); 803 804 try { 805 if (request instanceof StopServerRequest) 806 doProcess((StopServerRequest) request, replyTo, msgId); 807 else if (request instanceof CreateDestinationRequest) 808 doProcess((CreateDestinationRequest) request, replyTo, msgId); 809 else if (request instanceof DeleteDestination) 810 doProcess((DeleteDestination) request, replyTo, msgId); 811 else if (request instanceof SetCluster) 812 doProcess((SetCluster) request, replyTo, msgId); 813 else if (request instanceof UnsetCluster) 814 doProcess((UnsetCluster) request, replyTo, msgId); 815 else if (request instanceof SetFather) 816 doProcess((SetFather) request, replyTo, msgId); 817 else if (request instanceof UnsetFather) 818 doProcess((UnsetFather) request, replyTo, msgId); 819 else if (request instanceof CreateUserRequest) 820 doProcess((CreateUserRequest) request, replyTo, msgId); 821 else if (request instanceof UpdateUser) 822 doProcess((UpdateUser) request, replyTo, msgId); 823 else if (request instanceof DeleteUser) 824 doProcess((DeleteUser) request, replyTo, msgId); 825 else if (request instanceof SetRight) 826 doProcess((SetRight) request, replyTo, msgId); 827 else if (request instanceof SetDefaultDMQ) 828 doProcess((SetDefaultDMQ) request, replyTo, msgId); 829 else if (request instanceof SetDestinationDMQ) 830 doProcess((SetDestinationDMQ) request, replyTo, msgId); 831 else if (request instanceof SetUserDMQ) 832 doProcess((SetUserDMQ) request, replyTo, msgId); 833 else if (request instanceof SetNbMaxMsg) 834 doProcess((SetNbMaxMsg) request, replyTo, msgId); 835 else if (request instanceof SetDefaultThreshold) 836 doProcess((SetDefaultThreshold) request, replyTo, msgId); 837 else if (request instanceof SetQueueThreshold) 838 doProcess((SetQueueThreshold) request, replyTo, msgId); 839 else if (request instanceof SetUserThreshold) 840 doProcess((SetUserThreshold) request, replyTo, msgId); 841 else if (request instanceof UnsetDefaultDMQ) 842 doProcess((UnsetDefaultDMQ) request, replyTo, msgId); 843 else if (request instanceof UnsetDestinationDMQ) 844 doProcess((UnsetDestinationDMQ) request, replyTo, msgId); 845 else if (request instanceof UnsetUserDMQ) 846 doProcess((UnsetUserDMQ) request, replyTo, msgId); 847 else if (request instanceof UnsetDefaultThreshold) 848 doProcess((UnsetDefaultThreshold) request, replyTo, msgId); 849 else if (request instanceof UnsetQueueThreshold) 850 doProcess((UnsetQueueThreshold) request, replyTo, msgId); 851 else if (request instanceof UnsetUserThreshold) 852 doProcess((UnsetUserThreshold) request, replyTo, msgId); 853 else if (request instanceof Monitor_GetServersIds) 854 doProcess((Monitor_GetServersIds) request, replyTo, msgId); 855 else if (request instanceof GetDomainNames) 856 doProcess((GetDomainNames) request, replyTo, msgId); 857 else if (request instanceof GetLocalServer) 858 doProcess((GetLocalServer) request, replyTo, msgId); 859 else if (request instanceof Monitor_GetDestinations) 860 doProcess((Monitor_GetDestinations) request, replyTo, msgId); 861 else if (request instanceof Monitor_GetUsers) 862 doProcess((Monitor_GetUsers) request, replyTo, msgId); 863 else if (request instanceof Monitor_GetReaders) 864 doProcess((Monitor_GetReaders) request, replyTo, msgId); 865 else if (request instanceof Monitor_GetWriters) 866 doProcess((Monitor_GetWriters) request, replyTo, msgId); 867 else if (request instanceof Monitor_GetFreeAccess) 868 doProcess((Monitor_GetFreeAccess) request, replyTo, msgId); 869 else if (request instanceof Monitor_GetDMQSettings) 870 doProcess((Monitor_GetDMQSettings) request, replyTo, msgId); 871 else if (request instanceof Monitor_GetFather) 872 doProcess((Monitor_GetFather) request, replyTo, msgId); 873 else if (request instanceof Monitor_GetCluster) 874 doProcess((Monitor_GetCluster) request, replyTo, msgId); 875 else if (request instanceof Monitor_GetPendingMessages) 876 doProcess((Monitor_GetPendingMessages) request, replyTo, msgId); 877 else if (request instanceof Monitor_GetPendingRequests) 878 doProcess((Monitor_GetPendingRequests) request, replyTo, msgId); 879 else if (request instanceof Monitor_GetStat) 880 doProcess((Monitor_GetStat) request, replyTo, msgId); 881 else if (request instanceof Monitor_GetNbMaxMsg) 882 doProcess((Monitor_GetNbMaxMsg) request, replyTo, msgId); 883 else if (request instanceof Monitor_GetSubscriptions) 884 doProcess((Monitor_GetSubscriptions) request, replyTo, msgId); 885 else if (request instanceof SpecialAdmin) 886 doProcess((SpecialAdmin) request, replyTo, msgId); 887 else if (request instanceof AddServerRequest) 888 doProcess((AddServerRequest) request, replyTo, msgId, from); 889 else if (request instanceof AddDomainRequest) 890 doProcess((AddDomainRequest) request, replyTo, msgId, from); 891 else if (request instanceof RemoveServerRequest) 892 doProcess((RemoveServerRequest) request, replyTo, msgId, from); 893 else if (request instanceof RemoveDomainRequest) 894 doProcess((RemoveDomainRequest) request, replyTo, msgId, from); 895 else if (request instanceof GetConfigRequest) 896 doProcess((GetConfigRequest) request, replyTo, msgId); 897 else if (request instanceof UserAdminRequest) 898 doProcess((UserAdminRequest) request, replyTo, msgId); 899 else if (request instanceof GetSubscriberIds) 900 doProcess((GetSubscriberIds) request, replyTo, msgId); 901 else if (request instanceof QueueAdminRequest) 902 doProcess((QueueAdminRequest) request, replyTo, msgId); 903 } catch (UnknownServerException exc) { 904 info = strbuf.append("Request [").append(request.getClass().getName()) 906 .append("], successful [false]: ") 907 .append(exc.getMessage()).toString(); 908 strbuf.setLength(0); 909 910 distributeReply(replyTo, msgId, new AdminReply(false, info)); 911 } catch (MomException exc) { 912 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 913 JoramTracing.dbgDestination.log(BasicLevel.WARN, exc); 914 915 if (request == null) { 916 info = strbuf.append("Unexpected request to AdminTopic on server [") 917 .append(serverId).append("]: ").append(exc.getMessage()).toString(); 918 strbuf.setLength(0); 919 } else { 920 info = strbuf.append("Request [").append(request.getClass().getName()) 921 .append("], sent to AdminTopic on server [").append(serverId) 922 .append("], successful [false]: ") 923 .append(exc.getMessage()).toString(); 924 strbuf.setLength(0); 925 } 926 927 distributeReply(replyTo, msgId, new AdminReply(false, info)); 928 } 929 } 930 931 935 private void doProcess(StopServerRequest request, 936 AgentId replyTo, 937 String msgId) throws UnknownServerException { 938 if (checkServerId(request.getServerId())) { 939 distributeReply(replyTo, msgId, 941 new AdminReply(true, "Server stopped")); 942 AgentServer.stop(false, 500L, true); 943 } else { 944 forward(AdminTopic.getDefault((short) request.getServerId()), 946 new AdminRequestNot(replyTo, msgId, request)); 947 } 948 } 949 950 957 private void doProcess(CreateDestinationRequest request, 958 AgentId replyTo, 959 String msgId) 960 throws UnknownServerException, RequestException { 961 962 if (checkServerId(request.getServerId())) { 963 String destName = request.getDestinationName(); 965 boolean destNameActivated = (destName != null && ! destName.equals("")); 966 967 DestinationDesc destDesc; 968 969 Agent dest = null; 970 String info; 971 Properties properties = request.getProperties(); 972 973 if (destNameActivated && destinationsTable.containsKey(destName)) { 975 destDesc = (DestinationDesc) destinationsTable.get(destName); 976 if (! destDesc.isAssignableTo(request.getExpectedType())) { 977 throw new RequestException("Destination type not compliant"); 978 } 979 info = strbuf.append("Request [").append(request.getClass().getName()) 980 .append("], processed by AdminTopic on server [").append(serverId) 981 .append("], successful [true]: destination [") 982 .append(destName).append("] has been retrieved").toString(); 983 strbuf.setLength(0); 984 } else { 985 String className = request.getClassName(); 987 Class clazz; 988 String destType; 989 try { 990 clazz = Class.forName(className); 991 dest = (Agent) clazz.newInstance(); 992 if (destName != null) { 993 dest.name = destName; 994 } 995 ((AdminDestinationItf) dest).init(this.destId, properties); 996 997 Method getTypeM = clazz.getMethod("getDestinationType", new Class [0]); 998 destType = (String )getTypeM.invoke(null, new Object [0]); 999 } catch (Exception exc) { 1000 if (exc instanceof ClassCastException ) { 1001 throw new RequestException( 1002 "Class [" + className + 1003 "] is not a Destination class."); 1004 } else { 1005 throw new RequestException( 1006 "Could not instanciate Destination class [" + 1007 className + "]: " + exc); 1008 } 1009 } 1010 1011 AgentId createdDestId = dest.getId(); 1012 1013 if (! destNameActivated) 1014 destName = createdDestId.toString(); 1015 1016 destDesc = new DestinationDesc(createdDestId, destName, 1017 className, destType); 1018 if (! destDesc.isAssignableTo(request.getExpectedType())) { 1019 throw new RequestException("Destination type not compliant"); 1020 } 1021 1022 try { 1023 dest.deploy(); 1024 destinationsTable.put(destName, destDesc); 1025 1026 info = strbuf.append("Request [").append(request.getClass().getName()) 1027 .append("], processed by AdminTopic on server [").append(serverId) 1028 .append("], successful [true]: ").append(className).append(" [") 1029 .append(createdDestId.toString()).append("] has been created and deployed") 1030 .toString(); 1031 strbuf.setLength(0); 1032 } 1033 catch (Exception exc) { 1034 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 1035 JoramTracing.dbgDestination.log(BasicLevel.ERROR, "xxx", exc); 1036 1037 1038 throw new RequestException("Error while deploying Destination [" + 1039 clazz + "]: " + exc); 1040 } 1041 } 1042 1043 distributeReply( 1044 replyTo, 1045 msgId, 1046 new CreateDestinationReply( 1047 destDesc.getId().toString(), 1048 destDesc.getName(), 1049 destDesc.getType(), 1050 info)); 1051 1052 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1053 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 1054 } else { 1055 forward(AdminTopic.getDefault((short) request.getServerId()), 1057 new AdminRequestNot(replyTo, msgId, request)); 1058 } 1059 } 1060 1061 1065 private void doProcess(DeleteDestination request, 1066 AgentId replyTo, 1067 String msgId) throws UnknownServerException 1068 { 1069 AgentId destId = AgentId.fromString(request.getId()); 1070 1071 if (checkServerId(destId.getTo())) { 1073 String info; 1075 1076 Enumeration destinations = destinationsTable.elements(); 1077 while (destinations.hasMoreElements()) { 1078 DestinationDesc destDesc = 1079 (DestinationDesc)destinations.nextElement(); 1080 if (destDesc.getId().equals(destId)) { 1081 destinationsTable.remove(destDesc.getName()); 1082 break; 1083 } 1084 } 1085 1086 forward(destId, new DeleteNot()); 1087 1088 info = strbuf.append("Request [").append(request.getClass().getName()) 1089 .append("], sent to AdminTopic on server [").append(serverId) 1090 .append("], successful [true]: destination [").append(destId) 1091 .append("], successfuly notified for deletion").toString(); 1092 strbuf.setLength(0); 1093 1094 distributeReply(replyTo, msgId, new AdminReply(true, info)); 1095 1096 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1097 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 1098 } else { 1099 forward(AdminTopic.getDefault(destId.getTo()), 1101 new AdminRequestNot(replyTo, msgId, request)); 1102 } 1103 } 1104 1105 1109 private void doProcess(SetCluster request, 1110 AgentId replyTo, 1111 String msgId) throws UnknownServerException 1112 { 1113 AgentId initId = AgentId.fromString(request.getInitId()); 1114 AgentId topId = AgentId.fromString(request.getTopId()); 1115 1116 if (checkServerId(initId.getTo())) { 1117 forward(initId, new ClusterRequest(msgId, topId)); 1119 if (replyTo != null) requestsTable.put(msgId, replyTo); 1120 } else { 1121 forward(AdminTopic.getDefault(initId.getTo()), 1123 new AdminRequestNot(replyTo, msgId, request)); 1124 } 1125 } 1126 1127 1131 private void doProcess(UnsetCluster request, 1132 AgentId replyTo, 1133 String msgId) throws UnknownServerException 1134 { 1135 AgentId topId = AgentId.fromString(request.getTopId()); 1136 1137 if (checkServerId(topId.getTo())) { 1138 forward(topId, new UnclusterRequest(msgId)); 1140 if (replyTo != null) requestsTable.put(msgId, replyTo); 1141 } else { 1142 forward(AdminTopic.getDefault(topId.getTo()), 1144 new AdminRequestNot(replyTo, msgId, request)); 1145 } 1146 } 1147 1148 1152 private void doProcess(SetFather request, 1153 AgentId replyTo, 1154 String msgId) throws UnknownServerException 1155 { 1156 AgentId fatherId = AgentId.fromString(request.getFather()); 1157 AgentId sonId = AgentId.fromString(request.getSon()); 1158 1159 if (checkServerId(sonId.getTo())) { 1160 forward(sonId, new SetFatherRequest(msgId, fatherId)); 1162 if (replyTo != null) requestsTable.put(msgId, replyTo); 1163 } else { 1164 forward(AdminTopic.getDefault(sonId.getTo()), 1166 new AdminRequestNot(replyTo, msgId, request)); 1167 } 1168 } 1169 1170 1174 private void doProcess(UnsetFather request, 1175 AgentId replyTo, 1176 String msgId) throws UnknownServerException 1177 { 1178 AgentId topId = AgentId.fromString(request.getTopId()); 1179 1180 if (checkServerId(topId.getTo())) { 1181 forward(topId, new UnsetFatherRequest(msgId)); 1183 if (replyTo != null) requestsTable.put(msgId, replyTo); 1184 } else { 1185 forward(AdminTopic.getDefault(topId.getTo()), 1187 new AdminRequestNot(replyTo, msgId, request)); 1188 } 1189 } 1190 1191 1199 private void doProcess(CreateUserRequest request, 1200 AgentId replyTo, 1201 String msgId) 1202 throws UnknownServerException, RequestException 1203 { 1204 if (checkServerId(request.getServerId())) { 1205 String name = request.getUserName(); 1207 String pass = request.getUserPass(); 1208 1209 AgentId proxId = (AgentId) proxiesTable.get(name); 1210 String info; 1211 1212 if (proxId != null) { 1214 if (! pass.equals((String ) usersTable.get(name))) { 1215 throw new RequestException("User [" + name + "] already exists" 1216 + " but with a different password."); 1217 } 1218 info = strbuf.append("Request [").append(request.getClass().getName()) 1219 .append("], processed by AdminTopic on server [").append(serverId) 1220 .append("], successful [true]: proxy [").append(proxId.toString()) 1221 .append("] of user [").append(name) 1222 .append("] has been retrieved").toString(); 1223 strbuf.setLength(0); 1224 } else { 1225 UserAgent proxy = new UserAgent(); 1226 if (name != null) { 1227 proxy.name = name; 1228 } 1229 proxId = proxy.getId(); 1230 1231 try { 1232 proxy.deploy(); 1233 usersTable.put(name, request.getUserPass()); 1234 proxiesTable.put(name, proxy.getId()); 1235 1236 info = strbuf.append("Request [").append(request.getClass().getName()) 1237 .append("], processed by AdminTopic on server [").append(serverId) 1238 .append("], successful [true]: proxy [") 1239 .append(proxId.toString()).append("] for user [").append(name) 1240 .append("] has been created and deployed").toString(); 1241 strbuf.setLength(0); 1242 } 1243 catch (Exception exc) { 1244 throw new RequestException("User proxy not deployed: " + exc); 1245 } 1246 } 1247 1248 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1249 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 1250 1251 distributeReply(replyTo, 1252 msgId, 1253 new CreateUserReply(proxId.toString(), info)); 1254 } else { 1255 forward(AdminTopic.getDefault((short) request.getServerId()), 1257 new AdminRequestNot(replyTo, msgId, request)); 1258 } 1259 } 1260 1261 1268 private void doProcess(UpdateUser request, 1269 AgentId replyTo, 1270 String msgId) 1271 throws RequestException, UnknownServerException 1272 { 1273 String name = request.getUserName(); 1274 AgentId proxId = AgentId.fromString(request.getProxId()); 1275 1276 if (checkServerId(proxId.getTo())) { 1277 String info; 1279 1280 if (! usersTable.containsKey(name)) 1282 throw new RequestException("User [" + name + "] does not exist"); 1283 1284 String newName = request.getNewName(); 1285 if (! newName.equals(name) 1288 && (usersTable.containsKey(newName))) 1289 throw new RequestException("Name [" + newName + "] already used"); 1290 1291 String newPass = request.getNewPass(); 1292 1293 if (usersTable.containsKey(name)) { 1294 usersTable.remove(name); 1295 proxiesTable.remove(name); 1296 usersTable.put(newName, request.getNewPass()); 1297 proxiesTable.put(newName, proxId); 1298 } 1299 1300 info = strbuf.append("Request [").append(request.getClass().getName()) 1301 .append("], processed by AdminTopic on server [").append(serverId) 1302 .append("], successful [true]: user [").append(name) 1303 .append("] has been updated to [").append(newName). 1304 append("]").toString(); 1305 strbuf.setLength(0); 1306 1307 distributeReply(replyTo, msgId, new AdminReply(true, info)); 1308 1309 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1310 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 1311 } else { 1312 forward(AdminTopic.getDefault(proxId.getTo()), 1314 new AdminRequestNot(replyTo, msgId, request)); 1315 } 1316 } 1317 1318 1322 private void doProcess(DeleteUser request, 1323 AgentId replyTo, 1324 String msgId) throws UnknownServerException 1325 { 1326 String name = request.getUserName(); 1327 AgentId proxId = AgentId.fromString(request.getProxId()); 1328 1329 if (checkServerId(proxId.getTo())) { 1330 String info; 1332 1333 if (usersTable.containsKey(name)) { 1334 forward(proxId, new DeleteNot()); 1335 usersTable.remove(name); 1336 proxiesTable.remove(name); 1337 1338 info = strbuf.append("Request [").append(request.getClass().getName()) 1339 .append("], sent to AdminTopic on server [").append(serverId) 1340 .append("], successful [true]: proxy [").append(proxId) 1341 .append("], of user [").append(name) 1342 .append("] has been notified of deletion").toString(); 1343 strbuf.setLength(0); 1344 } else { 1345 info = strbuf.append("Request [").append(request.getClass().getName()) 1346 .append("], sent to AdminTopic on server [").append(serverId) 1347 .append("], successful [false]: user [").append(name) 1348 .append(" does not exist").toString(); 1349 strbuf.setLength(0); 1350 } 1351 distributeReply(replyTo, msgId, new AdminReply(true, info)); 1352 1353 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1354 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 1355 } else { 1356 forward(AdminTopic.getDefault(proxId.getTo()), 1358 new AdminRequestNot(replyTo, msgId, request)); 1359 } 1360 } 1361 1362 1366 private void doProcess(SetRight request, 1367 AgentId replyTo, 1368 String msgId) throws UnknownServerException 1369 { 1370 AgentId destId = AgentId.fromString(request.getDestId()); 1371 1372 if (checkServerId(destId.getTo())) { 1373 1375 AgentId userId = null; 1376 if (request.getUserProxId() != null) 1377 userId = AgentId.fromString(request.getUserProxId()); 1378 1379 int right = 0; 1380 if (request instanceof SetReader) 1381 right = READ; 1382 else if (request instanceof SetWriter) 1383 right = WRITE; 1384 else if (request instanceof UnsetReader) 1385 right = - READ; 1386 else if (request instanceof UnsetWriter) 1387 right = - WRITE; 1388 1389 forward(destId, new SetRightRequest(msgId, userId, right)); 1390 if (replyTo != null) requestsTable.put(msgId, replyTo); 1391 } else { 1392 forward(AdminTopic.getDefault(destId.getTo()), 1394 new AdminRequestNot(replyTo, msgId, request)); 1395 } 1396 } 1397 1398 1404 private void doProcess(SetDefaultDMQ request, 1405 AgentId replyTo, 1406 String msgId) throws UnknownServerException 1407 { 1408 if (checkServerId(request.getServerId())) { 1409 String info; 1411 1412 AgentId dmqId = null; 1413 if (request.getDmqId() != null) 1414 dmqId = AgentId.fromString(request.getDmqId()); 1415 1416 DeadMQueueImpl.id = dmqId; 1417 1418 info = strbuf.append("Request [").append(request.getClass().getName()) 1419 .append("], sent to AdminTopic on server [").append(serverId) 1420 .append("], successful [true]: dmq [").append(dmqId.toString()) 1421 .append("], has been successfuly set as the default one").toString(); 1422 strbuf.setLength(0); 1423 1424 distributeReply(replyTo, msgId, new AdminReply(true, info)); 1425 1426 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1427 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 1428 } else { 1429 forward(AdminTopic.getDefault((short) request.getServerId()), 1431 new AdminRequestNot(replyTo, msgId, request)); 1432 } 1433 } 1434 1435 1439 private void doProcess(SetDestinationDMQ request, 1440 AgentId replyTo, 1441 String msgId) throws UnknownServerException 1442 { 1443 AgentId destId = AgentId.fromString(request.getDestId()); 1444 1445 if (checkServerId(destId.getTo())) { 1446 AgentId dmqId = AgentId.fromString(request.getDmqId()); 1448 forward(destId, new SetDMQRequest(msgId, dmqId)); 1449 if (replyTo != null) requestsTable.put(msgId, replyTo); 1450 } else { 1451 forward(AdminTopic.getDefault(destId.getTo()), 1453 new AdminRequestNot(replyTo, msgId, request)); 1454 } 1455 } 1456 1457 1461 private void doProcess(SetUserDMQ request, 1462 AgentId replyTo, 1463 String msgId) throws UnknownServerException 1464 { 1465 AgentId userId = AgentId.fromString(request.getUserProxId()); 1466 1467 if (checkServerId(userId.getTo())) { 1468 AgentId dmqId = AgentId.fromString(request.getDmqId()); 1470 forward(userId, new SetDMQRequest(msgId, dmqId)); 1471 if (replyTo != null) requestsTable.put(msgId, replyTo); 1472 } else { 1473 forward(AdminTopic.getDefault(userId.getTo()), 1475 new AdminRequestNot(replyTo, msgId, request)); 1476 } 1477 } 1478 1479 1485 private void doProcess(SetDefaultThreshold request, 1486 AgentId replyTo, 1487 String msgId) throws UnknownServerException 1488 { 1489 if (checkServerId(request.getServerId())) { 1490 String info; 1492 1493 DeadMQueueImpl.threshold = new Integer (request.getThreshold()); 1494 1495 info = strbuf.append("Request [").append(request.getClass().getName()) 1496 .append("], sent to AdminTopic on server [").append(serverId) 1497 .append("], successful [true]: default threshold [") 1498 .append(request.getThreshold()).append("] has been set").toString(); 1499 strbuf.setLength(0); 1500 1501 distributeReply(replyTo, msgId, new AdminReply(true, info)); 1502 1503 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1504 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 1505 } else { 1506 forward(AdminTopic.getDefault((short) request.getServerId()), 1508 new AdminRequestNot(replyTo, msgId, request)); 1509 } 1510 } 1511 1512 1516 private void doProcess(SetNbMaxMsg request, 1517 AgentId replyTo, 1518 String msgId) 1519 throws UnknownServerException { 1520 AgentId destId = AgentId.fromString(request.getId()); 1521 1522 if (checkServerId(destId.getTo())) { 1523 int nbMaxMsg = request.getNbMaxMsg(); 1525 String subName = request.getSubName(); 1526 forward(destId, new SetNbMaxMsgRequest(msgId, nbMaxMsg, subName)); 1527 if (replyTo != null) requestsTable.put(msgId, replyTo); 1528 } else { 1529 forward(AdminTopic.getDefault(destId.getTo()), 1531 new AdminRequestNot(replyTo, msgId, request)); 1532 } 1533 } 1534 1535 1540 private void doProcess(SetQueueThreshold request, 1541 AgentId replyTo, 1542 String msgId) throws UnknownServerException 1543 { 1544 AgentId destId = AgentId.fromString(request.getQueueId()); 1545 1546 if (checkServerId(destId.getTo())) { 1547 int thresh = request.getThreshold(); 1549 forward(destId, new SetThreshRequest(msgId, new Integer (thresh))); 1550 if (replyTo != null) requestsTable.put(msgId, replyTo); 1551 } else { 1552 forward(AdminTopic.getDefault(destId.getTo()), 1554 new AdminRequestNot(replyTo, msgId, request)); 1555 } 1556 } 1557 1558 1563 private void doProcess(SetUserThreshold request, 1564 AgentId replyTo, 1565 String msgId) throws UnknownServerException 1566 { 1567 AgentId userId = AgentId.fromString(request.getUserProxId()); 1568 1569 if (checkServerId(userId.getTo())) { 1570 int thresh = request.getThreshold(); 1572 forward(userId, new SetThreshRequest(msgId, new Integer (thresh))); 1573 if (replyTo != null) requestsTable.put(msgId, replyTo); 1574 } else { 1575 forward(AdminTopic.getDefault(userId.getTo()), 1577 new AdminRequestNot(replyTo, msgId, request)); 1578 } 1579 } 1580 1581 1587 private void doProcess(UnsetDefaultDMQ request, 1588 AgentId replyTo, 1589 String msgId) throws UnknownServerException 1590 { 1591 if (checkServerId(request.getServerId())) { 1592 String info; 1594 1595 DeadMQueueImpl.id = null; 1596 1597 info = strbuf.append("Request [").append(request.getClass().getName()) 1598 .append("], sent to AdminTopic on server [").append(serverId) 1599 .append("], successful [true]: default dmq has been unset").toString(); 1600 strbuf.setLength(0); 1601 1602 distributeReply(replyTo, msgId, new AdminReply(true, info)); 1603 1604 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1605 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 1606 } else { 1607 forward(AdminTopic.getDefault((short) request.getServerId()), 1609 new AdminRequestNot(replyTo, msgId, request)); 1610 } 1611 } 1612 1613 1617 private void doProcess(UnsetDestinationDMQ request, 1618 AgentId replyTo, 1619 String msgId) throws UnknownServerException 1620 { 1621 AgentId destId = AgentId.fromString(request.getDestId()); 1622 1623 if (checkServerId(destId.getTo())) { 1624 forward(destId, new SetDMQRequest(msgId, null)); 1626 if (replyTo != null) requestsTable.put(msgId, replyTo); 1627 } else { 1628 forward(AdminTopic.getDefault(destId.getTo()), 1630 new AdminRequestNot(replyTo, msgId, request)); 1631 } 1632 } 1633 1634 1638 private void doProcess(UnsetUserDMQ request, 1639 AgentId replyTo, 1640 String msgId) throws UnknownServerException 1641 { 1642 AgentId userId = AgentId.fromString(request.getUserProxId()); 1643 1644 if (checkServerId(userId.getTo())) { 1645 forward(userId, new SetDMQRequest(msgId, null)); 1647 if (replyTo != null)requestsTable.put(msgId, replyTo); 1648 } else { 1649 forward(AdminTopic.getDefault(userId.getTo()), 1651 new AdminRequestNot(replyTo, msgId, request)); 1652 } 1653 } 1654 1655 1661 private void doProcess(UnsetDefaultThreshold request, 1662 AgentId replyTo, 1663 String msgId) throws UnknownServerException 1664 { 1665 if (checkServerId(request.getServerId())) { 1666 String info; 1668 1669 DeadMQueueImpl.threshold = null; 1670 1671 info = strbuf.append("Request [").append(request.getClass().getName()) 1672 .append("], sent to AdminTopic on server [").append(serverId) 1673 .append("], successful [true]: default threshold has been unset") 1674 .toString(); 1675 strbuf.setLength(0); 1676 1677 distributeReply(replyTo, msgId, new AdminReply(true, info)); 1678 1679 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1680 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 1681 "Default threshold unset."); 1682 } else { 1683 forward(AdminTopic.getDefault((short) request.getServerId()), 1685 new AdminRequestNot(replyTo, msgId, request)); 1686 } 1687 } 1688 1689 1693 private void doProcess(UnsetQueueThreshold request, 1694 AgentId replyTo, 1695 String msgId) throws UnknownServerException 1696 { 1697 AgentId destId = AgentId.fromString(request.getQueueId()); 1698 1699 if (checkServerId(destId.getTo())) { 1700 forward(destId, new SetThreshRequest(msgId, null)); 1702 if (replyTo != null) requestsTable.put(msgId, replyTo); 1703 } else { 1704 forward(AdminTopic.getDefault(destId.getTo()), 1706 new AdminRequestNot(replyTo, msgId, request)); 1707 } 1708 } 1709 1710 1714 private void doProcess(UnsetUserThreshold request, 1715 AgentId replyTo, 1716 String msgId) throws UnknownServerException 1717 { 1718 AgentId userId = AgentId.fromString(request.getUserProxId()); 1719 1720 if (checkServerId(userId.getTo())) { 1721 forward(userId, new SetThreshRequest(msgId, null)); 1723 if (replyTo != null) requestsTable.put(msgId, replyTo); 1724 } else { 1725 forward(AdminTopic.getDefault(userId.getTo()), 1727 new AdminRequestNot(replyTo, msgId, request)); 1728 } 1729 } 1730 1731 1737 private void doProcess(Monitor_GetServersIds request, 1738 AgentId replyTo, 1739 String msgId) throws UnknownServerException { 1740 if (checkServerId(request.getServerId())) { 1741 try { 1742 int[] ids; 1743 String [] names; 1744 String [] hostNames; 1745 String domainName = request.getDomainName(); 1746 Enumeration servers; 1747 A3CMLConfig config = AgentServer.getConfig(); 1748 int serversCount; 1749 if (domainName != null) { 1750 A3CMLDomain domain = config.getDomain(domainName); 1751 servers = domain.servers.elements(); 1752 serversCount = domain.servers.size(); 1753 } else { 1754 servers = config.servers.elements(); 1755 serversCount = config.servers.size(); 1756 } 1757 ids = new int[serversCount]; 1758 names = new String [serversCount]; 1759 hostNames = new String [serversCount]; 1760 int i = 0; 1761 while (servers.hasMoreElements()) { 1762 A3CMLServer server = (A3CMLServer)servers.nextElement(); 1763 ids[i] = (int)server.sid; 1764 names[i] = server.name; 1765 hostNames[i] = server.hostname; 1766 i++; 1767 } 1768 Monitor_GetServersIdsRep reply = new Monitor_GetServersIdsRep( 1769 ids, names, hostNames); 1770 distributeReply(replyTo, msgId, reply); 1771 } catch (Exception exc) { 1772 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1773 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 1774 distributeReply(replyTo, msgId, 1775 new AdminReply(false, exc.toString())); 1776 } 1777 } else { 1778 forward(AdminTopic.getDefault((short) request.getServerId()), 1780 new AdminRequestNot(replyTo, msgId, request)); 1781 } 1782 } 1783 1784 private void doProcess(GetLocalServer request, 1785 AgentId replyTo, 1786 String msgId) throws UnknownServerException { 1787 try { 1788 A3CMLConfig config = AgentServer.getConfig(); 1789 A3CMLServer a3cmlServer = config.getServer(AgentServer.getServerId()); 1790 distributeReply(replyTo, msgId, 1791 new GetLocalServerRep(a3cmlServer.sid, 1792 a3cmlServer.name, 1793 a3cmlServer.hostname)); 1794 } catch (Exception exc) { 1795 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1796 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 1797 distributeReply(replyTo, msgId, 1798 new AdminReply(false, exc.toString())); 1799 } 1800 } 1801 1802 private void doProcess(GetDomainNames request, 1803 AgentId replyTo, 1804 String msgId) { 1805 try { 1806 A3CMLConfig config = AgentServer.getConfig(); 1807 A3CMLServer server = config.getServer((short)request.getServerId()); 1808 String [] domainNames = new String [server.networks.size()]; 1809 for (int i = 0; i < server.networks.size(); i++) { 1810 A3CMLNetwork nw = (A3CMLNetwork)server.networks.elementAt(i); 1811 domainNames[i] = nw.domain; 1812 } 1813 GetDomainNamesRep reply = new GetDomainNamesRep(domainNames); 1814 distributeReply(replyTo, msgId, reply); 1815 } catch (Exception exc) { 1816 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 1817 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 1818 distributeReply(replyTo, msgId, 1819 new AdminReply(false, exc.toString())); 1820 } 1821 } 1822 1823 1829 private void doProcess(Monitor_GetDestinations request, 1830 AgentId replyTo, 1831 String msgId) throws UnknownServerException 1832 { 1833 if (checkServerId(request.getServerId())) { 1834 Enumeration destinations = destinationsTable.elements(); 1835 String [] ids = new String [destinationsTable.size()]; 1836 String [] names = new String [destinationsTable.size()]; 1837 String [] types = new String [destinationsTable.size()]; 1838 int i = 0; 1839 while (destinations.hasMoreElements()) { 1840 DestinationDesc destDesc = 1841 (DestinationDesc)destinations.nextElement(); 1842 ids[i] = destDesc.getId().toString(); 1843 names[i] = destDesc.getName(); 1844 types[i] = destDesc.getType(); 1845 i++; 1846 } 1847 Monitor_GetDestinationsRep reply = 1848 new Monitor_GetDestinationsRep(ids, names, types); 1849 distributeReply(replyTo, msgId, reply); 1850 } else { 1851 forward(AdminTopic.getDefault((short) request.getServerId()), 1853 new AdminRequestNot(replyTo, msgId, request)); 1854 } 1855 } 1856 1857 1863 private void doProcess(Monitor_GetUsers request, 1864 AgentId replyTo, 1865 String msgId) throws UnknownServerException 1866 { 1867 if (checkServerId(request.getServerId())) { 1868 Monitor_GetUsersRep reply = new Monitor_GetUsersRep(); 1869 1870 String name; 1871 for (Enumeration names = proxiesTable.keys(); names.hasMoreElements();) { 1872 name = (String ) names.nextElement(); 1873 reply.addUser(name, ((AgentId) proxiesTable.get(name)).toString()); 1874 } 1875 1876 distributeReply(replyTo, msgId, reply); 1877 } else { 1878 forward(AdminTopic.getDefault((short) request.getServerId()), 1880 new AdminRequestNot(replyTo, msgId, request)); 1881 } 1882 } 1883 1884 1888 private void doProcess(Monitor_GetReaders request, 1889 AgentId replyTo, 1890 String msgId) throws UnknownServerException 1891 { 1892 AgentId destId = AgentId.fromString(request.getDest()); 1893 1894 if (checkServerId(destId.getTo())) { 1895 forward(destId, new Monit_GetReaders(msgId)); 1897 if (replyTo != null) requestsTable.put(msgId, replyTo); 1898 } else { 1899 forward(AdminTopic.getDefault(destId.getTo()), 1901 new AdminRequestNot(replyTo, msgId, request)); 1902 } 1903 } 1904 1905 1909 private void doProcess(Monitor_GetWriters request, 1910 AgentId replyTo, 1911 String msgId) throws UnknownServerException 1912 { 1913 AgentId destId = AgentId.fromString(request.getDest()); 1914 1915 if (checkServerId(destId.getTo())) { 1916 forward(destId, new Monit_GetWriters(msgId)); 1918 if (replyTo != null) requestsTable.put(msgId, replyTo); 1919 } else { 1920 forward(AdminTopic.getDefault(destId.getTo()), 1922 new AdminRequestNot(replyTo, msgId, request)); 1923 } 1924 } 1925 1926 1930 private void doProcess(Monitor_GetFreeAccess request, 1931 AgentId replyTo, 1932 String msgId) throws UnknownServerException 1933 { 1934 AgentId destId = AgentId.fromString(request.getDest()); 1935 1936 if (checkServerId(destId.getTo())) { 1937 forward(destId, new Monit_FreeAccess(msgId)); 1939 if (replyTo != null) requestsTable.put(msgId, replyTo); 1940 } else { 1941 forward(AdminTopic.getDefault(destId.getTo()), 1943 new AdminRequestNot(replyTo, msgId, request)); 1944 } 1945 } 1946 1947 1954 private void doProcess(Monitor_GetDMQSettings request, 1955 AgentId replyTo, 1956 String msgId) throws UnknownServerException 1957 { 1958 if (request.getServerId() != -1) { 1959 if (checkServerId(request.getServerId())) { 1960 Monitor_GetDMQSettingsRep reply; 1961 String id = null; 1962 if (DeadMQueueImpl.id != null) 1963 id = DeadMQueueImpl.id.toString(); 1964 reply = new Monitor_GetDMQSettingsRep(id, DeadMQueueImpl.threshold); 1965 distributeReply(replyTo, msgId, reply); 1966 } else { 1967 forward(AdminTopic.getDefault((short) request.getServerId()), 1969 new AdminRequestNot(replyTo, msgId, request)); 1970 } 1971 } else { 1972 if (request.getTarget() != null) { 1973 AgentId targetId = AgentId.fromString(request.getTarget()); 1974 1975 if (checkServerId(targetId.getTo())) { 1976 forward(targetId, new Monit_GetDMQSettings(msgId)); 1977 1978 if (replyTo != null) 1979 requestsTable.put(msgId, replyTo); 1980 } else { 1981 forward(AdminTopic.getDefault(targetId.getTo()), 1983 new AdminRequestNot(replyTo, msgId, request)); 1984 } 1985 } else { 1986 } 1988 } 1989 } 1990 1991 1995 private void doProcess(Monitor_GetFather request, 1996 AgentId replyTo, 1997 String msgId) throws UnknownServerException 1998 { 1999 AgentId topicId = AgentId.fromString(request.getTopic()); 2000 2001 if (checkServerId(topicId.getTo())) { 2002 forward(topicId, new Monit_GetFather(msgId)); 2004 if (replyTo != null) requestsTable.put(msgId, replyTo); 2005 } else { 2006 forward(AdminTopic.getDefault(topicId.getTo()), 2008 new AdminRequestNot(replyTo, msgId, request)); 2009 } 2010 } 2011 2012 2016 private void doProcess(Monitor_GetCluster request, 2017 AgentId replyTo, 2018 String msgId) throws UnknownServerException 2019 { 2020 AgentId topicId = AgentId.fromString(request.getTopic()); 2021 2022 if (checkServerId(topicId.getTo())) { 2023 forward(topicId, new Monit_GetCluster(msgId)); 2025 if (replyTo != null) requestsTable.put(msgId, replyTo); 2026 } else { 2027 forward(AdminTopic.getDefault(topicId.getTo()), 2029 new AdminRequestNot(replyTo, msgId, request)); 2030 } 2031 } 2032 2033 2037 private void doProcess(Monitor_GetPendingMessages request, 2038 AgentId replyTo, 2039 String msgId) throws UnknownServerException 2040 { 2041 AgentId destId = AgentId.fromString(request.getDest()); 2042 2043 if (checkServerId(destId.getTo())) { 2044 forward(destId, new Monit_GetPendingMessages(msgId)); 2046 if (replyTo != null) requestsTable.put(msgId, replyTo); 2047 } else { 2048 forward(AdminTopic.getDefault(destId.getTo()), 2050 new AdminRequestNot(replyTo, msgId, request)); 2051 } 2052 } 2053 2054 2058 private void doProcess(Monitor_GetPendingRequests request, 2059 AgentId replyTo, 2060 String msgId) throws UnknownServerException 2061 { 2062 AgentId destId = AgentId.fromString(request.getDest()); 2063 2064 if (checkServerId(destId.getTo())) { 2065 forward(destId, new Monit_GetPendingRequests(msgId)); 2067 if (replyTo != null) requestsTable.put(msgId, replyTo); 2068 } else { 2069 forward(AdminTopic.getDefault(destId.getTo()), 2071 new AdminRequestNot(replyTo, msgId, request)); 2072 } 2073 } 2074 2075 2079 private void doProcess(Monitor_GetStat request, 2080 AgentId replyTo, 2081 String msgId) 2082 throws UnknownServerException { 2083 AgentId destId = AgentId.fromString(request.getDest()); 2084 2085 if (checkServerId(destId.getTo())) { 2086 forward(destId, new Monit_GetStat(msgId)); 2088 if (replyTo != null) requestsTable.put(msgId, replyTo); 2089 } else { 2090 forward(AdminTopic.getDefault(destId.getTo()), 2092 new AdminRequestNot(replyTo, msgId, request)); 2093 } 2094 } 2095 2096 2100 private void doProcess(Monitor_GetNbMaxMsg request, 2101 AgentId replyTo, 2102 String msgId) 2103 throws UnknownServerException { 2104 AgentId destId = AgentId.fromString(request.getId()); 2105 2106 if (checkServerId(destId.getTo())) { 2107 String subName = request.getSubName(); 2109 forward(destId, new Monit_GetNbMaxMsg(msgId, subName)); 2110 if (replyTo != null) requestsTable.put(msgId, replyTo); 2111 } else { 2112 forward(AdminTopic.getDefault(destId.getTo()), 2114 new AdminRequestNot(replyTo, msgId, request)); 2115 } 2116 } 2117 2118 2122 private void doProcess(Monitor_GetSubscriptions request, 2123 AgentId replyTo, 2124 String msgId) throws UnknownServerException 2125 { 2126 AgentId destId = AgentId.fromString(request.getDest()); 2127 2128 if (checkServerId(destId.getTo())) { 2129 forward(destId, new Monit_GetSubscriptions(msgId)); 2131 if (replyTo != null) requestsTable.put(msgId, replyTo); 2132 } else { 2133 forward(AdminTopic.getDefault(destId.getTo()), 2135 new AdminRequestNot(replyTo, msgId, request)); 2136 } 2137 } 2138 2139 private void doProcess(SpecialAdmin request, 2140 AgentId replyTo, 2141 String msgId) throws UnknownServerException { 2142 AgentId destId = AgentId.fromString(request.getDestId()); 2143 2144 if (checkServerId(destId.getTo())) { 2145 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2147 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 2148 "AdminTopicImpl.doProcess " + 2149 "SpecialAdminRequest destId=" + destId); 2150 2151 if (getId().equals(destId)) { 2152 distributeReply(replyTo, msgId, 2154 new AdminReply(false, "destId mustn't be TopicAdmin.")); 2155 return; 2156 } 2157 2158 forward(destId, new SpecialAdminRequest(msgId,request)); 2159 if (replyTo != null) requestsTable.put(msgId, replyTo); 2160 } else { 2161 forward(AdminTopic.getDefault(destId.getTo()), 2163 new AdminRequestNot(replyTo, msgId, request)); 2164 } 2165 } 2166 2167 private void doProcess(AddDomainRequest request, 2168 AgentId replyTo, 2169 String msgId, 2170 AgentId from) { 2171 try { 2172 ServerConfigHelper helper = new ServerConfigHelper(true); 2173 if (helper.addDomain(request.getDomainName(), 2174 request.getServerId(), 2175 request.getPort())) { 2176 distributeReply(replyTo, msgId, 2177 new AdminReply(true, "Domain added")); 2178 } 2179 if (from == null) { 2180 broadcastRequest(request, 2181 -1, 2182 replyTo, msgId); 2183 } 2184 } catch (ServerConfigHelper.NameAlreadyUsedException exc) { 2185 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2186 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2187 distributeReply(replyTo, msgId, 2188 new AdminReply( 2189 false, 2190 AdminReply.NAME_ALREADY_USED, 2191 exc.getMessage(), null)); 2192 } catch (ServerConfigHelper.StartFailureException exc) { 2193 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2194 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2195 distributeReply(replyTo, msgId, 2196 new AdminReply( 2197 false, 2198 AdminReply.START_FAILURE, 2199 exc.getMessage(), null)); 2200 } catch (Exception exc) { 2201 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2202 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2203 distributeReply(replyTo, msgId, 2204 new AdminReply(false, exc.toString())); 2205 } 2206 } 2207 2208 private void doProcess(RemoveDomainRequest request, 2209 AgentId replyTo, 2210 String msgId, 2211 AgentId from) { 2212 try { 2213 ServerConfigHelper helper = new ServerConfigHelper(true); 2214 if (helper.removeDomain(request.getDomainName())) { 2215 distributeReply(replyTo, msgId, 2216 new AdminReply(true, "Domain removed")); 2217 } 2218 if (from == null) { 2219 broadcastRequest(request, 2220 -1, 2221 replyTo, msgId); 2222 } 2223 } catch (Exception exc) { 2224 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2225 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2226 distributeReply(replyTo, msgId, 2227 new AdminReply(false, exc.toString())); 2228 } 2229 } 2230 2231 private void doProcess(AddServerRequest request, 2232 AgentId replyTo, 2233 String msgId, 2234 AgentId from) { 2235 try { 2236 ServerConfigHelper helper = new ServerConfigHelper(false); 2237 helper.addServer( 2238 request.getServerId(), 2239 request.getHostName(), 2240 request.getDomainName(), 2241 request.getPort(), 2242 request.getServerName()); 2243 helper.addService( 2244 request.getServerId(), 2245 "org.objectweb.joram.mom.proxies.ConnectionManager", 2246 "root root"); 2247 String [] serviceNames = request.getServiceNames(); 2248 String [] serviceArgs = request.getServiceArgs(); 2249 for (int i = 0; i < serviceNames.length; i++) { 2250 helper.addService(request.getServerId(), 2251 serviceNames[i], 2252 serviceArgs[i]); 2253 } 2254 helper.commit(); 2255 distributeReply(replyTo, msgId, 2256 new AdminReply(true, "Server added")); 2257 if (from == null) { 2258 broadcastRequest(request, 2259 request.getServerId(), 2260 replyTo, msgId); 2261 } 2262 } catch (ServerConfigHelper.ServerIdAlreadyUsedException exc) { 2263 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2264 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2265 distributeReply(replyTo, msgId, 2266 new AdminReply( 2267 false, 2268 AdminReply.SERVER_ID_ALREADY_USED, 2269 exc.getMessage(), null)); 2270 } catch (Exception exc) { 2271 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2272 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2273 distributeReply(replyTo, msgId, 2274 new AdminReply(false, exc.toString())); 2275 } 2276 } 2277 2278 private void doProcess(RemoveServerRequest request, 2279 AgentId replyTo, 2280 String msgId, 2281 AgentId from) { 2282 try { 2283 ServerConfigHelper helper = new ServerConfigHelper(true); 2284 helper.removeServer(request.getServerId()); 2285 distributeReply(replyTo, msgId, 2286 new AdminReply(true, "Server removed")); 2287 if (from == null) { 2288 broadcastRequest(request, 2289 request.getServerId(), 2290 replyTo, msgId); 2291 } 2292 } catch (UnknownServerException exc) { 2293 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2294 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2295 distributeReply(replyTo, msgId, 2296 new AdminReply( 2297 false, 2298 AdminReply.UNKNOWN_SERVER, 2299 exc.getMessage(), null)); 2300 } catch (Exception exc) { 2301 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2302 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2303 distributeReply(replyTo, msgId, 2304 new AdminReply(false, exc.toString())); 2305 } 2306 } 2307 2308 private void doProcess(AddServiceRequest request, 2309 AgentId replyTo, 2310 String msgId, 2311 AgentId from) { 2312 try { 2313 ServerConfigHelper helper = new ServerConfigHelper(true); 2314 helper.addService( 2315 request.getServerId(), 2316 request.getClassName(), 2317 request.getArgs()); 2318 distributeReply(replyTo, msgId, 2319 new AdminReply(true, "Service added")); 2320 if (from == null) { 2321 broadcastRequest(request, 2322 -1, 2323 replyTo, msgId); 2324 } 2325 } catch (Exception exc) { 2326 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2327 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2328 distributeReply(replyTo, msgId, 2329 new AdminReply(false, exc.toString())); 2330 } 2331 } 2332 2333 private void doProcess(RemoveServiceRequest request, 2334 AgentId replyTo, 2335 String msgId, 2336 AgentId from) { 2337 try { 2338 ServerConfigHelper helper = new ServerConfigHelper(true); 2339 helper.removeService( 2340 request.getServerId(), 2341 request.getClassName()); 2342 distributeReply(replyTo, msgId, 2343 new AdminReply(true, "Service removed")); 2344 if (from == null) { 2345 broadcastRequest(request, 2346 -1, 2347 replyTo, msgId); 2348 } 2349 } catch (Exception exc) { 2350 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2351 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2352 distributeReply(replyTo, msgId, 2353 new AdminReply(false, exc.toString())); 2354 } 2355 } 2356 2357 private void broadcastRequest(AdminRequest req, 2358 int avoidServerId, 2359 AgentId replyTo, 2360 String msgId) { 2361 AdminRequestNot not = new AdminRequestNot(replyTo, msgId, req); 2362 Enumeration ids = AgentServer.getServersIds(); 2363 while (ids.hasMoreElements()) { 2364 short id = ((Short ) ids.nextElement()).shortValue(); 2365 if (id != AgentServer.getServerId() && 2366 id != avoidServerId) { 2367 forward(AdminTopic.getDefault(id), not); 2368 } 2369 } 2370 } 2371 2372 private void doProcess(GetConfigRequest request, 2373 AgentId replyTo, 2374 String msgId) { 2375 try { 2376 A3CMLConfig a3cmlConfig = AgentServer.getConfig(); 2377 ByteArrayOutputStream baos = 2378 new ByteArrayOutputStream (); 2379 PrintWriter out = new PrintWriter (baos); 2380 A3CML.toXML(a3cmlConfig, out); 2381 out.flush(); 2382 baos.flush(); 2383 baos.close(); 2384 String config = baos.toString(); 2385 distributeReply(replyTo, msgId, 2386 new AdminReply(true, config)); 2387 } catch (Exception exc) { 2388 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2389 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2390 distributeReply(replyTo, msgId, 2391 new AdminReply(false, exc.toString())); 2392 } 2393 } 2394 2395 private void doProcess(UserAdminRequest request, 2396 AgentId replyTo, 2397 String requestMsgId) 2398 throws UnknownServerException { 2399 AgentId userId = AgentId.fromString(request.getUserId()); 2400 if (checkServerId(userId.getTo())) { 2401 forward(userId, new UserAdminRequestNot( 2403 request, replyTo, requestMsgId, createMessageId())); 2404 } else { 2405 forward(AdminTopic.getDefault(userId.getTo()), 2407 new AdminRequestNot( 2408 replyTo, requestMsgId, request)); 2409 } 2410 } 2411 2412 private void doProcess(GetSubscriberIds request, 2413 AgentId replyTo, 2414 String requestMsgId) 2415 throws UnknownServerException { 2416 try { 2417 AgentId topicId = AgentId.fromString(request.getTopicId()); 2418 forward(topicId, new DestinationAdminRequestNot( 2419 request, replyTo, requestMsgId, createMessageId())); 2420 } catch (Exception exc) { 2421 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2422 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2423 distributeReply(replyTo, requestMsgId, 2424 new AdminReply(false, exc.toString())); 2425 } 2426 } 2427 2428 private void doProcess(QueueAdminRequest request, 2429 AgentId replyTo, 2430 String requestMsgId) 2431 throws UnknownServerException { 2432 try { 2433 AgentId queueId = AgentId.fromString(request.getQueueId()); 2434 forward(queueId, new DestinationAdminRequestNot( 2435 request, replyTo, requestMsgId, createMessageId())); 2436 } catch (Exception exc) { 2437 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2438 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "", exc); 2439 distributeReply(replyTo, requestMsgId, 2440 new AdminReply(false, exc.toString())); 2441 } 2442 } 2443 2444 2452 private boolean checkServerId(int serverId) throws UnknownServerException 2453 { 2454 if (serverId == this.serverId) 2455 return true; 2456 2457 Enumeration ids = AgentServer.getServersIds(); 2458 while (ids.hasMoreElements()) { 2459 if (((Short ) ids.nextElement()).intValue() == serverId) 2460 return false; 2461 } 2462 2463 throw new UnknownServerException("server#" + serverId + " is unknow."); 2464 } 2465 2466 private String createMessageId() { 2467 msgCounter++; 2468 return "ID:" + destId.toString() + ":" + msgCounter; 2469 } 2470 2471 2479 private void distributeReply(AgentId to, String msgId, AdminReply reply) { 2480 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 2481 JoramTracing.dbgDestination.log( 2482 BasicLevel.DEBUG, 2483 "AdminTopicImpl.distributeReply(" + 2484 to + ',' + msgId + ',' + reply + ')'); 2485 2486 if (to == null) 2487 return; 2488 2489 Message message = new Message(); 2490 message.id = createMessageId(); 2491 message.correlationId = msgId; 2492 message.timestamp = System.currentTimeMillis(); 2493 message.setDestination(destId.toString(), Topic.TOPIC_TYPE); 2494 try { 2495 message.setObject(reply); 2496 ClientMessages clientMessages = new ClientMessages(-1, -1, message); 2497 forward(to, clientMessages); 2498 nbMsgsDeliverSinceCreation = nbMsgsDeliverSinceCreation + 1; 2499 } catch (Exception exc) { 2500 JoramTracing.dbgDestination.log( 2501 BasicLevel.ERROR, "", exc); 2502 } 2503 } 2504 2505 2506 2507 private void writeObject(java.io.ObjectOutputStream out) 2508 throws java.io.IOException { 2509 defaultDMQId = DeadMQueueImpl.id; 2510 defaultThreshold = DeadMQueueImpl.threshold; 2511 out.defaultWriteObject(); 2512 } 2513 2514 2515 private void readObject(java.io.ObjectInputStream in) 2516 throws java.io.IOException , ClassNotFoundException { 2517 in.defaultReadObject(); 2518 ref = this; 2519 DeadMQueueImpl.id = defaultDMQId; 2520 DeadMQueueImpl.threshold = defaultThreshold; 2521 } 2522 2523 static class AdminRequestNot extends Notification { 2524 String msgId = null; 2525 AgentId replyTo = null; 2526 AdminRequest request = null; 2527 2528 AdminRequestNot(AgentId replyTo, 2529 String msgId, 2530 AdminRequest request) { 2531 this.msgId = msgId; 2532 this.replyTo = replyTo; 2533 this.request = request; 2534 } 2535 } 2536 2537 static class DestinationDesc 2538 implements java.io.Serializable { 2539 private AgentId id; 2540 private String name; 2541 private String className; 2542 private String type; 2543 2544 public DestinationDesc(AgentId id, 2545 String name, 2546 String className, 2547 String type) { 2548 this.id = id; 2549 this.name = name; 2550 this.className = className.intern(); 2551 this.type = type.intern(); 2552 } 2553 2554 public final AgentId getId() { 2555 return id; 2556 } 2557 2558 public final String getName() { 2559 return name; 2560 } 2561 2562 public final String getClassName() { 2563 return className; 2564 } 2565 2566 public final String getType() { 2567 return type; 2568 } 2569 2570 public boolean isAssignableTo(String assignedType) { 2571 return type.startsWith(assignedType); 2572 } 2573 2574 public String toString() { 2575 return '(' + super.toString() + 2576 ",id=" + id + 2577 ",name=" + name + 2578 ",className=" + className + 2579 ",type=" + type + ')'; 2580 } 2581 } 2582} 2583 | Popular Tags |