1 22 23 package org.snmp4j.agent.agentx.master; 24 25 import java.io.IOException ; 26 import java.util.*; 27 28 import org.snmp4j.CommandResponderEvent; 29 import org.snmp4j.PDU; 30 import org.snmp4j.TransportMapping; 31 import org.snmp4j.agent.*; 32 import org.snmp4j.agent.agentx.*; 33 import org.snmp4j.agent.agentx.master.AgentXQueue.AgentXQueueEntry; 34 import org.snmp4j.agent.agentx.master.index.AgentXIndexRegistry; 35 import org.snmp4j.agent.mo.snmp.AgentCapabilityList; 36 import org.snmp4j.agent.mo.snmp.SNMPv2MIB.SysUpTimeImpl; 37 import org.snmp4j.agent.mo.snmp.SysUpTime; 38 import org.snmp4j.agent.request.*; 39 import org.snmp4j.agent.security.VACM; 40 import org.snmp4j.log.LogAdapter; 41 import org.snmp4j.log.LogFactory; 42 import org.snmp4j.mp.SnmpConstants; 43 import org.snmp4j.smi.*; 44 import org.snmp4j.transport.ConnectionOrientedTransportMapping; 45 import org.snmp4j.transport.TransportStateEvent; 46 import org.snmp4j.transport.TransportStateListener; 47 48 public class AgentXCommandProcessor extends CommandProcessor implements 49 AgentXCommandListener, TransportStateListener, 50 AgentXResponseListener { 51 52 public static final int MAX_REPROCESSING_DEFAULT = 100; 53 54 private static final LogAdapter LOGGER = 55 LogFactory.getLogger(AgentXCommandProcessor.class); 56 57 private static final OctetString DEFAULT_CONTEXT = new OctetString(); 58 59 private AgentXQueue agentXQueue; 60 private AgentX agentX; 61 private Map sessions = new HashMap(); 62 private Map peers = new HashMap(10); 63 private Set registrations = new TreeSet(new AgentXRegEntryComparator()); 64 private MOServer server; 65 private int nextSessionID = 1; 66 private byte defaultTimeout = AgentXProtocol.DEFAULT_TIMEOUT_SECONDS; 67 private int maxConsecutiveTimeouts = 68 AgentXProtocol.DEFAULT_MAX_CONSECUTIVE_TIMEOUTS; 69 private int maxParseErrors = 70 AgentXProtocol.DEFAULT_MAX_PARSE_ERRORS; 71 private Map contextInfo = new HashMap(10); 72 private boolean acceptNewContexts = false; 73 74 private int nextPacketID = 0; 75 76 protected AgentXIndexRegistry indexRegistry = new AgentXIndexRegistry(); 77 78 private transient Vector agentXMasterListeners; 79 80 private int maxReprocessing = MAX_REPROCESSING_DEFAULT; 81 82 public AgentXCommandProcessor(OctetString contextEngineID, 83 AgentXQueue queue, 84 AgentX agentX, 85 MOServer server) { 86 super(contextEngineID); 87 this.agentXQueue = queue; 88 this.agentX = agentX; 89 this.server = server; 90 if (this.agentXQueue.getServer4BulkOptimization() == null) { 91 this.agentXQueue.setServer4BulkOptimization(server); 92 } 93 } 94 95 private synchronized int createNextPacketID() { 96 return nextPacketID++; 97 } 98 99 public void setMaxReprocessing(int maxReprocessing) { 100 this.maxReprocessing = maxReprocessing; 101 } 102 103 public int getMaxReprocessing() { 104 return maxReprocessing; 105 } 106 107 119 public void setMaxParseErrors(int maxParseErrors) { 120 this.maxParseErrors = maxParseErrors; 121 } 122 123 131 public int getMaxParseErrors() { 132 return maxParseErrors; 133 } 134 135 protected void finalizeRequest(CommandResponderEvent command, 136 Request req, 137 MOServer server) { 138 boolean complete = req.isComplete(); 139 AgentXQueueEntry entry = agentXQueue.get(req.getTransactionID()); 140 if (entry != null) { 141 Collection pending = entry.getPending(); 142 entry.updateTimestamp(); 143 for (Iterator it = pending.iterator(); it.hasNext(); ) { 144 AgentXPending p = (AgentXPending) it.next(); 145 if (pending != null) { 146 AgentXPDU agentXPDU = p.getAgentXPDU(); 147 AgentXMasterSession session = p.getSession(); 148 agentXPDU.setSessionID(session.getSessionID()); 149 agentXPDU.setTransactionID(req.getTransactionID()); 150 agentXPDU.setPacketID(createNextPacketID()); 151 p.updateTimestamp(); 152 try { 153 agentX.send(agentXPDU, 154 session.createAgentXTarget(), 155 session.getPeer().getTransport(), 156 p, this); 157 } 158 catch (IOException ex) { 159 LOGGER.error("Failed to send AgentX subrequest: " + 160 ex.getMessage()); 161 ((SubRequest) p.getReferences().next()). 162 getStatus().setErrorStatus(PDU.genErr); 163 break; 164 } 165 } 166 } 167 } 168 else { 169 if (complete) { 170 agentXQueue.removeAll(req.getTransactionID()); 171 } 172 else { 173 if (req.getReprocessCounter() < this.maxReprocessing) { 175 reprocessRequest(server, (SnmpRequest)req); 176 } 177 else { 178 req.setErrorStatus(PDU.genErr); 179 LOGGER.warn("The following request has been repeocessed "+ 180 req.getReprocessCounter()+" which exceeds the agent's "+ 181 "upper limit of "+this.maxReprocessing+": "+ 182 req); 183 } 184 } 185 super.finalizeRequest(command, req, server); 186 } 187 } 188 189 protected synchronized int getNextSessionID() { 190 return nextSessionID++; 191 } 192 193 public MOServer getServer() { 194 return server; 195 } 196 197 public byte getDefaultTimeout() { 198 return defaultTimeout; 199 } 200 201 206 public int getMaxConsecutiveTimeouts() { 207 return maxConsecutiveTimeouts; 208 } 209 210 216 public boolean isAcceptNewContexts() { 217 return acceptNewContexts; 218 } 219 220 public void setDefaultTimeout(byte defaultTimeout) { 221 this.defaultTimeout = defaultTimeout; 222 } 223 224 231 public void setMaxConsecutiveTimeouts(int maxConsecutiveTimeouts) { 232 this.maxConsecutiveTimeouts = maxConsecutiveTimeouts; 233 } 234 235 241 public void setAcceptNewContexts(boolean acceptNewContexts) { 242 this.acceptNewContexts = acceptNewContexts; 243 } 244 245 public void processCommand(AgentXCommandEvent event) { 246 boolean pendingClose = false; 247 if (event.isException()) { 248 AgentXPeer peer = getPeer(event.getPeerAddress()); 249 if (peer != null) { 250 peer.incParseErrors(); 251 LOGGER.warn("AgentX parse exception from peer '"+peer+ 252 "' : " + event.getException()); 253 if ((maxParseErrors >= 0) && (peer.getParseErrors() > maxParseErrors)) { 254 LOGGER.warn("Removing peer due to excessive parse errors: " +peer); 255 closePeer(peer.getAddress(), AgentXProtocol.REASON_PARSE_ERROR); 256 } 257 } 258 else { 259 LOGGER.error("AgentX parse exception from unknown peer '"+ 260 event.getPeerAddress()+ 261 "' : " + event.getException()); 262 } 263 } 264 else { 265 AgentXPDU pdu = event.getCommand(); 266 AgentXMasterSession session = getSession(pdu); 267 AgentXResponsePDU response = null; 268 if (LOGGER.isDebugEnabled()) { 269 LOGGER.debug("Processing AgentX PDU "+pdu+" for session "+session); 270 } 271 switch (pdu.getType()) { 272 case AgentXPDU.AGENTX_RESPONSE_PDU: { 273 LOGGER.error( 274 "Internal error: received AgentX response without request"); 275 return; 276 } 277 case AgentXPDU.AGENTX_OPEN_PDU: { 278 response = openSession((AgentXOpenPDU) pdu, event); 279 session = getSession(response.getSessionID()); 280 break; 281 } 282 case AgentXPDU.AGENTX_CLOSE_PDU: { 283 response = closeSession((AgentXClosePDU)pdu, session); 284 pendingClose = true; 285 break; 286 } 287 case AgentXPDU.AGENTX_REGISTER_PDU: { 288 response = register((AgentXRegisterPDU)pdu, event, session); 289 break; 290 } 291 case AgentXPDU.AGENTX_UNREGISTER_PDU: { 292 response = unregister((AgentXUnregisterPDU)pdu, event, session); 293 break; 294 } 295 case AgentXPDU.AGENTX_ADDAGENTCAPS_PDU: { 296 response = addAgentCaps((AgentXAddAgentCapsPDU)pdu, session); 297 break; 298 } 299 case AgentXPDU.AGENTX_REMOVEAGENTCAPS_PDU: { 300 response = removeAgentCaps((AgentXRemoveAgentCapsPDU)pdu, session); 301 break; 302 } 303 case AgentXPDU.AGENTX_NOTIFY_PDU: { 304 response = notify((AgentXNotifyPDU)pdu, session); 305 break; 306 } 307 case AgentXPDU.AGENTX_PING_PDU: { 308 response = ping((AgentXPingPDU)pdu, session); 309 break; 310 } 311 case AgentXPDU.AGENTX_INDEXALLOCATE_PDU: { 312 response = indexAllocate((AgentXIndexAllocatePDU)pdu, session); 313 break; 314 } 315 case AgentXPDU.AGENTX_INDEXDEALLOCATE_PDU: { 316 response = indexDeallocate((AgentXIndexDeallocatePDU)pdu, session); 317 break; 318 } 319 default: 320 LOGGER.warn("Unknown AgentX PDU type received: " + pdu); 321 } 322 if ((response != null) && (session != null)) { 323 sendResponse(response, session); 324 } 325 if (pendingClose) { 326 if (session != null) { 327 closePeer(session.getPeer()); 328 } 329 } 330 } 331 event.setProcessed(true); 332 } 333 334 private void closePeer(AgentXPeer peer) { 335 TransportMapping transport = peer.getTransport(); 336 if (transport instanceof ConnectionOrientedTransportMapping) { 337 try { 338 if (((ConnectionOrientedTransportMapping) 339 transport).close(peer.getAddress())) { 340 if (LOGGER.isInfoEnabled()) { 341 LOGGER.info("Closed sub-agent connection to " + 342 peer.getAddress()); 343 } 344 } 345 else { 346 LOGGER.warn("Failed to close sub-agent connection to " + 347 peer.getAddress()); 348 } 349 } 350 catch (IOException ex) { 351 LOGGER.error("Failed to close transport mapping "+ 352 peer.getTransport()+" because: "+ 353 ex.getMessage(), ex); 354 } 355 } 356 } 357 358 public AgentXResponsePDU indexDeallocate(AgentXIndexDeallocatePDU pdu, 359 AgentXMasterSession session) { 360 AgentXResponsePDU response = createResponse(pdu, session); 361 boolean contextSupported = server.isContextSupported(pdu.getContext()); 362 if (contextSupported) { 363 VariableBinding[] vbs = pdu.getVariableBindings(); 364 deallocateIndexes(response, pdu, session, vbs, true); 366 if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) { 367 deallocateIndexes(response, pdu, session, vbs, false); 369 response.setVariableBindings(vbs); 370 } 371 } 372 else { 373 response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT); 374 } 375 return response; 376 } 377 378 private boolean checkIfContextIsSupported(OctetString context) { 379 boolean contextSupported = server.isContextSupported(context); 380 if (LOGGER.isDebugEnabled()) { 381 LOGGER.debug("Checking context '"+context+"' is supported"); 382 } 383 if (isAcceptNewContexts() && !contextSupported) { 384 server.addContext(context); 385 contextSupported = server.isContextSupported(context); 386 if (LOGGER.isInfoEnabled()) { 387 LOGGER.info("Adding new context '"+context+ 388 "' on subagent request returned: "+contextSupported); 389 } 390 } 391 return contextSupported; 392 } 393 394 public AgentXResponsePDU indexAllocate(AgentXIndexAllocatePDU pdu, 395 AgentXMasterSession session) { 396 AgentXResponsePDU response = createResponse(pdu, session); 397 response.setVariableBindings(pdu.getVariableBindings()); 398 boolean contextSupported = checkIfContextIsSupported(pdu.getContext()); 399 if (contextSupported) { 400 VariableBinding[] vbs = pdu.getVariableBindings(); 401 allocateIndexes(response, pdu, session, vbs, true); 403 if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) { 404 allocateIndexes(response, pdu, session, vbs, false); 406 response.setVariableBindings(vbs); 407 } 408 } 409 else { 410 response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT); 411 } 412 return response; 413 } 414 415 private int allocateIndexes(AgentXResponsePDU response, 416 AgentXIndexAllocatePDU pdu, 417 AgentXMasterSession session, 418 VariableBinding[] vbs, 419 boolean testOnly) { 420 int status = AgentXProtocol.AGENTX_SUCCESS; 421 int i=0; 422 for (; (i<vbs.length) && (status == AgentXProtocol.AGENTX_SUCCESS); i++) { 423 VariableBinding vb = vbs[i]; 424 if (pdu.isFlagSet(AgentXProtocol.FLAG_ANY_INDEX)) { 425 status = indexRegistry.anyIndex(session.getSessionID(), 426 pdu.getContext(), vb, testOnly); 427 } 428 else if (pdu.isFlagSet(AgentXProtocol.FLAG_NEW_INDEX)) { 429 status = indexRegistry.newIndex(session.getSessionID(), 430 pdu.getContext(), vb, testOnly); 431 } 432 else { 433 status = indexRegistry.allocate(session.getSessionID(), 434 pdu.getContext(), vb, testOnly); 435 } 436 } 437 response.setErrorStatus(status); 438 if (status != AgentXProtocol.AGENTX_SUCCESS) { 439 response.setErrorIndex(i); 440 } 441 return status; 442 } 443 444 private int deallocateIndexes(AgentXResponsePDU response, 445 AgentXIndexDeallocatePDU pdu, 446 AgentXMasterSession session, 447 VariableBinding[] vbs, 448 boolean testOnly) { 449 int status = AgentXProtocol.AGENTX_SUCCESS; 450 int i=0; 451 for (; (i<vbs.length) && (status == AgentXProtocol.AGENTX_SUCCESS); i++) { 452 VariableBinding vb = vbs[i]; 453 status = indexRegistry.release(session.getSessionID(), 454 pdu.getContext(), vb, testOnly); 455 } 456 response.setErrorStatus(status); 457 if (status != AgentXProtocol.AGENTX_SUCCESS) { 458 response.setErrorIndex(i); 459 } 460 return status; 461 } 462 463 protected void processAgentXSearchResponse(AgentXPending pending, 464 AgentXResponsePDU pdu) { 465 if (pdu.getErrorStatus() != PDU.noError) { 466 processsErrorResponse(pending, pdu); 467 } 468 else { 469 if (pending.getAgentXPDU().getType() == AgentXPDU.AGENTX_GETBULK_PDU) { 471 processAgentXNextResponse(pending, pdu, Integer.MAX_VALUE); 472 } 473 else { 474 processAgentXNextResponse(pending, pdu, 475 ((AgentXRequestPDU)pending.getAgentXPDU()). 476 getRanges().length); 477 } 478 } 479 } 480 481 private SubRequestIterator 482 processAgentXNextResponse(AgentXPending pending, 483 AgentXResponsePDU pdu, 484 int subRequestIndexUpperBound) throws 485 NoSuchElementException 486 { 487 VariableBinding[] vbs = pdu.getVariableBindings(); 488 AgentXRequestPDU axReqPDU = (AgentXRequestPDU) pending.getAgentXPDU(); 489 SubRequestIterator subRequests = pending.getReferences(); 490 for (int i=0; (i<subRequestIndexUpperBound) && subRequests.hasNext(); i++) { 491 SnmpSubRequest sreq = (SnmpSubRequest) subRequests.nextSubRequest(); 492 processNextSubRequest(vbs, axReqPDU, i, i, sreq); 493 } 494 return subRequests; 495 } 496 497 private void processNextSubRequest(VariableBinding[] vbs, 498 AgentXRequestPDU axReqPDU, 499 int vbIndex, 500 int rangeIndex, 501 SnmpSubRequest sreq) { 502 MOScope srange = axReqPDU.getRanges()[rangeIndex]; 503 if (vbIndex < vbs.length) { 504 VariableBinding vb = vbs[vbIndex]; 505 if (vb.getSyntax() == SMIConstants.EXCEPTION_END_OF_MIB_VIEW) { 506 processEndOfMibView(sreq, srange, vb.getOid()); 507 } 508 else if (!srange.covers(vb.getOid())) { 509 processEndOfMibView(sreq, srange, null); 510 } 511 else if ((vb.isException()) || 512 (super.vacm.isAccessAllowed(sreq.getSnmpRequest(). 513 getViewName(), 514 vb.getOid()) != VACM.VACM_OK)) { 515 DefaultMOContextScope nscope = (DefaultMOContextScope) sreq.getScope(); 516 nscope.substractScope(srange); 517 nscope.setUpperBound(null); 518 nscope.setUpperIncluded(true); 519 sreq.setQuery(null); 521 sreq.getStatus().setProcessed(false); 522 } 523 else { 524 sreq.getVariableBinding().setOid(vb.getOid()); 525 sreq.getVariableBinding().setVariable(vb.getVariable()); 526 sreq.getStatus().setPhaseComplete(true); 527 if (LOGGER.isDebugEnabled()) { 528 LOGGER.debug("Assigned next subrequest "+sreq); 529 } 530 sreq.updateNextRepetition(); 532 } 533 } 534 else { 535 processEndOfMibView(sreq, srange, null); 537 } 538 } 539 540 private static void processEndOfMibView(SnmpSubRequest sreq, MOScope srange, 541 OID oid) { 542 if (srange.getUpperBound() == null) { 543 SubRequestIterator tail = sreq.repetitions(); 546 while (tail.hasNext()) { 547 SubRequest sr = tail.nextSubRequest(); 548 if (oid == null) { 549 sr.getVariableBinding().setOid(srange.getLowerBound()); 550 } 551 else { 552 sreq.getVariableBinding().setOid(oid); 553 } 554 sr.getVariableBinding().setVariable(Null.endOfMibView); 555 sr.getStatus().setPhaseComplete(true); 556 } 557 return; 558 } 559 else { 560 sreq.getStatus().setProcessed(false); 561 } 562 DefaultMOContextScope nscope = (DefaultMOContextScope) sreq.getScope(); 563 nscope.substractScope(srange); 564 nscope.setUpperBound(null); 565 nscope.setUpperIncluded(true); 566 sreq.setQuery(null); 568 } 569 570 protected void processAgentXBulkResponse(AgentXPending pending, 571 AgentXResponsePDU pdu) { 572 if (pdu.getErrorStatus() != PDU.noError) { 573 processsErrorResponse(pending, pdu); 574 } 575 else { 576 AgentXGetBulkPDU requestPDU = (AgentXGetBulkPDU) pending.getAgentXPDU(); 577 VariableBinding[] vbs = pdu.getVariableBindings(); 578 int numBindings = vbs.length; 579 int repeaters = 580 requestPDU.getRanges().length - requestPDU.getNonRepeaters(); 581 if (numBindings - requestPDU.getNonRepeaters() > 582 requestPDU.getMaxRepetitions() * repeaters) { 583 LOGGER.warn("Bulk response with more repetitions ("+ 584 ((numBindings - requestPDU.getNonRepeaters())/ repeaters)+ 585 ") than max rep. "+requestPDU.getMaxRepetitions()); 586 numBindings = requestPDU.getMaxRepetitions() * repeaters 587 + requestPDU.getNonRepeaters(); 588 } 589 if (numBindings == 0) { 590 AgentXRequestPDU axReqPDU = (AgentXRequestPDU) pending.getAgentXPDU(); 593 SubRequestIterator subRequests = pending.getReferences(); 594 for (int i=0; subRequests.hasNext(); i++) { 595 SnmpSubRequest sreq = (SnmpSubRequest) subRequests.nextSubRequest(); 596 MOScope srange = axReqPDU.getRanges()[i]; 597 processEndOfMibView(sreq, srange, null); 598 } 599 } 600 else { 601 SubRequestIterator it = 603 processAgentXNextResponse(pending, pdu, requestPDU.getNonRepeaters()); 604 int nonRep = requestPDU.getNonRepeaters(); 605 for (int c = 0; 606 (c+nonRep < requestPDU.getRanges().length) && it.hasNext(); c++) { 607 int rangeIndex = c + nonRep; 608 SnmpSubRequest sreq = (SnmpSubRequest) it.nextSubRequest(); 609 SubRequestIterator rsreq = sreq.repetitions(); 610 for (int r = 0; (nonRep + (r * repeaters) + c < numBindings) && 611 rsreq.hasNext(); r++) { 612 SnmpSubRequest repetition = (SnmpSubRequest) rsreq.nextSubRequest(); 613 617 processNextSubRequest(vbs, requestPDU, nonRep + (r * repeaters) + c, 618 rangeIndex, repetition); 619 } 620 } 621 } 622 } 623 } 624 625 protected static void processsErrorResponse(AgentXPending pending, 626 AgentXResponsePDU pdu) throws 627 NoSuchElementException 628 { 629 SubRequestIterator subRequests = pending.getReferences(); 630 for (int i=1; i<pdu.getErrorIndex(); i++) { 631 if (subRequests.hasNext()) { 632 subRequests.next(); 633 } 634 else { 635 pending.getRequest().setErrorStatus(PDU.genErr); 636 return; 637 } 638 } 639 if (subRequests.hasNext()) { 640 SubRequest sreq = subRequests.nextSubRequest(); 641 RequestStatus status = sreq.getStatus(); 642 status.setErrorStatus(pdu.getErrorStatus()); 643 } 644 else { 645 pending.getRequest().setErrorStatus(PDU.genErr); 646 } 647 } 648 649 650 private static boolean checkAgentXResponse(AgentXResponsePDU pdu, 651 AgentXPending pending) { 652 switch (pending.getAgentXPDU().getType()) { 653 case AgentXPDU.AGENTX_GET_PDU: 654 case AgentXPDU.AGENTX_GETNEXT_PDU: { 655 if (((AgentXRequestPDU) pending.getAgentXPDU()).getRanges().length != 656 pdu.size()) { 657 pending.getRequest().setErrorStatus(PDU.genErr); 658 return false; 659 } 660 break; 661 } 662 default: { 663 } 665 } 666 return true; 667 } 668 669 protected AgentXResponsePDU ping(AgentXPingPDU pdu, 670 AgentXMasterSession session) { 671 AgentXResponsePDU response = createResponse(pdu, session); 672 if (!checkIfContextIsSupported(pdu.getContext())) { 673 response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT); 674 return response; 675 } 676 return response; 677 } 678 679 protected AgentXResponsePDU notify(AgentXNotifyPDU pdu, 680 AgentXMasterSession session) { 681 AgentXResponsePDU response = createResponse(pdu, session); 682 if (session != null) { 683 if (!checkIfContextIsSupported(pdu.getContext())) { 684 response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT); 685 return response; 686 } 687 VariableBinding[] vbs = pdu.getVariableBindings(); 688 response.setVariableBindings(vbs); 689 int payloadIndex = 1; 690 OID trapoid = null; 691 TimeTicks timestamp = new TimeTicks(getContextSysUpTime(DEFAULT_CONTEXT)); 692 693 if (vbs.length >= 1) { 694 if (SnmpConstants.sysUpTime.equals(vbs[0].getOid())) { 695 payloadIndex++; 696 if ((vbs.length < 2) || 697 (!SnmpConstants.snmpTrapOID.equals(vbs[1].getOid()))) { 698 response.setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR); 699 response.setErrorIndex(2); 700 } 701 else { 702 timestamp = (TimeTicks) vbs[0].getVariable(); 703 trapoid = (OID) vbs[1].getVariable(); 704 } 705 } 706 else if (SnmpConstants.snmpTrapOID.equals(vbs[0].getOid())) { 707 trapoid = (OID) vbs[0].getVariable(); 708 } 709 else { 710 response.setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR); 711 response.setErrorIndex(1); 712 } 713 } 714 if (trapoid != null) { 715 VariableBinding[] pvbs = new VariableBinding[vbs.length - payloadIndex]; 716 System.arraycopy(vbs, payloadIndex, pvbs, 0, pvbs.length); 717 notify(pdu.getContext(), trapoid, timestamp, pvbs); 718 } 719 } 720 return response; 721 } 722 723 protected TimeTicks getContextSysUpTime(OctetString context) { 724 MasterContextInfo info = (MasterContextInfo) contextInfo.get(context); 725 SysUpTime contextSysUpTime; 726 if (info == null) { 727 MOContextScope scope = 728 new DefaultMOContextScope(context, 729 SnmpConstants.sysUpTime, true, 730 SnmpConstants.sysUpTime, true); 731 ManagedObject mo = server.lookup(new DefaultMOQuery(scope)); 732 if (mo instanceof SysUpTime) { 733 contextSysUpTime = (SysUpTime) mo; 734 } 735 else { 736 739 LOGGER.warn("SysUpTime could not be found in '"+context+ 740 "' context, using a new instance instead"); 741 contextSysUpTime = new SysUpTimeImpl(); 742 } 743 contextInfo.put(context, 744 new MasterContextInfo(context, contextSysUpTime)); 745 } 746 else { 747 contextSysUpTime = info.getUpTime(); 748 } 749 if (contextSysUpTime != null) { 750 return contextSysUpTime.get(); 751 } 752 return null; 753 } 754 755 public AgentXResponsePDU addAgentCaps(AgentXAddAgentCapsPDU pdu, 756 AgentXMasterSession session) { 757 AgentXResponsePDU response = createResponse(pdu, session); 758 if (session != null) { 759 if (!checkIfContextIsSupported(pdu.getContext())) { 760 response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT); 761 return response; 762 } 763 AgentCapabilityList agentCaps = getAgentCaps(pdu.getContext()); 764 if (agentCaps != null) { 765 OID index = agentCaps.addSysOREntry(pdu.getId(), pdu.getDescr()); 766 session.addAgentCaps(pdu.getId(), index); 767 } 768 } 769 return response; 770 } 771 772 protected AgentCapabilityList getAgentCaps(OctetString contextName) { 773 MOContextScope scope = 774 new DefaultMOContextScope(contextName, 775 SnmpConstants.sysOREntry, true, 776 SnmpConstants.sysOREntry, true); 777 ManagedObject mo = server.lookup(new DefaultMOQuery(scope)); 778 if (mo instanceof AgentCapabilityList) { 779 return (AgentCapabilityList)mo; 780 } 781 else { 782 LOGGER.warn("SysOREntry managed object for context "+contextName+ 783 " not found, instead found: "+mo); 784 } 785 return null; 786 } 787 788 public AgentXResponsePDU removeAgentCaps(AgentXRemoveAgentCapsPDU pdu, 789 AgentXMasterSession session) { 790 AgentXResponsePDU response = createResponse(pdu, session); 791 if (session != null) { 792 OID index = session.removeAgentCaps(pdu.getId()); 793 AgentCapabilityList agentCaps = getAgentCaps(pdu.getContext()); 794 if (agentCaps != null) { 795 Object ac = agentCaps.removeSysOREntry(index); 796 if (ac == null) { 797 response.setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_AGENTCAPS); 798 } 799 } 800 else { 801 response.setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_AGENTCAPS); 802 } 803 } 804 return response; 805 } 806 807 public AgentXResponsePDU closeSession(AgentXClosePDU pdu, 808 AgentXMasterSession session) { 809 if (LOGGER.isInfoEnabled()) { 810 LOGGER.info("Subagent is closing session "+session+ 811 " because "+pdu.getReason()); 812 } 813 AgentXResponsePDU response = createResponse(pdu, session); 814 if (session != null) { 815 removeSession(session.getSessionID()); 816 removeAllRegistrations(session); 817 session.setClosed(true); 818 } 819 return response; 820 } 821 822 public void closeSession(AgentXMasterSession session, 823 byte reason) { 824 if (LOGGER.isInfoEnabled()) { 825 LOGGER.info("Closing sub-agent session "+session+" because "+reason); 826 } 827 AgentXClosePDU closePDU = new AgentXClosePDU(reason); 828 try { 829 agentX.send(closePDU, 830 session.createAgentXTarget(), 831 session.getPeer().getTransport(), 832 new AgentXPendingClose(session, closePDU), this); 833 } 834 catch (IOException ex) { 835 LOGGER.error("Failed to send CloseSessionPDU to close session "+session+ 836 ": "+ex.getMessage(), ex); 837 } 838 removeSession(session.getSessionID()); 839 removeAllRegistrations(session); 840 session.setClosed(true); 841 } 842 843 844 protected synchronized void removeAllRegistrations(AgentXMasterSession session) { 845 if (LOGGER.isDebugEnabled()) { 846 LOGGER.debug("Removing all registrations (out of "+registrations.size()+ 847 ") of session "+session); 848 } 849 for (Iterator it = registrations.iterator(); it.hasNext(); ) { 850 AgentXRegEntry r = (AgentXRegEntry) it.next(); 851 if (r.getSession().equals(session)) { 852 removeRegistration(r, it); 853 } 854 } 855 } 856 857 protected AgentXMasterSession getSession(int sessionID) { 858 return (AgentXMasterSession) sessions.get(new Integer (sessionID)); 859 } 860 861 protected synchronized AgentXMasterSession getSession(AgentXPDU pdu) { 862 int sessionID = pdu.getSessionID(); 863 return getSession(sessionID); 864 } 865 866 protected AgentXResponsePDU register(AgentXRegisterPDU pdu, 867 AgentXCommandEvent command, 868 AgentXMasterSession session) { 869 AgentXResponsePDU response = createResponse(pdu, session); 870 if (session != null) { 871 if (!checkIfContextIsSupported(pdu.getContext())) { 872 response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT); 873 return response; 874 } 875 AgentXRegEntry regEntry = 876 new AgentXRegEntry(session, 877 pdu.getRegion(), 878 pdu.getPriority(), 879 pdu.getContext(), 880 pdu.getTimeout()); 881 if (isDuplicate(regEntry)) { 882 response.setErrorStatus(AgentXProtocol.AGENTX_DUPLICATE_REGISTRATION); 883 return response; 884 } 885 AgentXMasterEvent event = 886 new AgentXMasterEvent(this, AgentXMasterEvent.REGISTRATION_TO_ADD, 887 regEntry); 888 fireMasterChanged(event); 889 if (event.getVetoReason() == AgentXProtocol.AGENTX_SUCCESS) { 890 try { 891 addRegistration(regEntry); 892 } 893 catch (DuplicateRegistrationException drex) { 894 if (LOGGER.isDebugEnabled()) { 895 drex.printStackTrace(); 896 } 897 response.setErrorStatus(AgentXProtocol.AGENTX_DUPLICATE_REGISTRATION); 898 return response; 899 } 900 } 901 else { 902 response.setErrorStatus(event.getVetoReason()); 903 } 904 } 905 return response; 906 } 907 908 protected AgentXResponsePDU unregister(AgentXUnregisterPDU pdu, 909 AgentXCommandEvent event, 910 AgentXMasterSession session) { 911 AgentXResponsePDU response = createResponse(pdu, session); 912 if (session != null) { 913 AgentXRegEntry regEntry = 914 new AgentXRegEntry(session, 915 pdu.getRegion(), 916 pdu.getPriority(), 917 pdu.getContext(), 918 pdu.getTimeout()); 919 boolean found = false; 920 for (Iterator it = registrations.iterator(); it.hasNext(); ) { 921 AgentXRegEntry r = (AgentXRegEntry) it.next(); 922 if (r.equals(regEntry)) { 923 found = true; 924 if (!removeRegistration(r, it)) { 925 response.setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_REGISTRATION); 926 } 927 break; 928 } 929 } 930 if (!found) { 931 response.setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_REGISTRATION); 932 } 933 } 934 return response; 935 } 936 937 protected synchronized boolean isDuplicate(AgentXRegEntry registration) { 938 if (registrations.contains(registration)) { 939 if (LOGGER.isDebugEnabled()) { 940 LOGGER.debug("Identical registration attempt for "+registration); 941 } 942 return true; 943 } 944 AgentXNodeQuery query = 945 new AgentXNodeQuery(registration.getContext(), 946 registration.getRegion(), 947 AgentXNodeQuery.QUERY_NON_AGENTX_NODES); 948 ManagedObject mo = server.lookup(query); 949 if (mo != null) { 950 if (LOGGER.isDebugEnabled()) { 952 LOGGER.debug("New registration is rejected as duplicate because it "+ 953 "overlaps with non AgentX managed object: "+mo); 954 } 955 return true; 956 } 957 return false; 958 } 959 960 protected synchronized void addRegistration(AgentXRegEntry registration) 961 throws DuplicateRegistrationException 962 { 963 registrations.add(registration); 964 if (registration.getRegion().isRange()) { 965 AgentXRegion r = registration.getRegion(); 966 long start = r.getLowerBoundSubID() & 0xFFFFFFFFL; 967 long stop = r.getUpperBoundSubID() & 0xFFFFFFFFL; 968 if (start > stop) { 969 LOGGER.warn("Empty range registration "+registration); 970 } 971 else { 972 for (long s = start; s <= stop; s++) { 973 OID root = new OID(r.getLowerBound()); 974 root.set(r.getRangeSubID()-1, (int)s); 975 AgentXRegion sr = new AgentXRegion(root, root.nextPeer()); 976 addRegion(registration, sr); 977 } 978 } 979 } 980 else { 981 addRegion(registration, registration.getRegion()); 982 } 983 AgentXMasterEvent e = 984 new AgentXMasterEvent(this, AgentXMasterEvent.REGISTRATION_ADDED, 985 registration); 986 fireMasterChanged(e); 987 } 988 989 private static AgentXNodeQuery nextQuery(AgentXNodeQuery lastQuery, 990 AgentXNode lastNode) { 991 if (lastNode != null) { 992 lastQuery.getMutableScope().setLowerBound( 993 lastNode.getScope().getUpperBound()); 994 lastQuery.getMutableScope().setLowerIncluded( 995 !lastNode.getScope().isUpperIncluded()); 996 } 997 return lastQuery; 998 } 999 1000 protected synchronized void addRegion(AgentXRegEntry registration, 1001 AgentXRegion region) throws 1002 DuplicateRegistrationException 1003 { 1004 if (region.isRange()) { 1005 String errText = "Regions with range cannot be added"; 1006 LOGGER.error(errText); 1007 throw new IllegalArgumentException (errText); 1008 } 1009 AgentXNodeQuery query = 1010 new AgentXNodeQuery(registration.getContext(), 1011 region, 1012 AgentXNodeQuery.QUERY_AGENTX_NODES); 1013 AgentXNode lastNode = null; 1014 AgentXNode node = (AgentXNode)server.lookup(query); 1015 if (node != null) { 1016 LinkedList splitted = new LinkedList(); 1017 AgentXRegion r1 = new AgentXRegion(region); 1018 for (; (node != null); 1019 node = (AgentXNode) server.lookup(nextQuery(query, lastNode))) { 1020 AgentXRegion r2 = new AgentXRegion(node.getScope().getLowerBound(), 1021 node.getScope().getUpperBound()); 1022 if (LOGGER.isDebugEnabled()) { 1023 LOGGER.debug("Affected region r2="+r2+ 1024 " from registered region r1="+r1); 1025 } 1026 if (r2.covers(r1)) { 1027 if (LOGGER.isDebugEnabled()) { 1028 LOGGER.debug("Region r2 covers r1 (r1="+r1+",r2="+r2+")"); 1029 } 1030 oldRegionCoversNew(registration, node, splitted, r1, r2); 1031 r1 = null; 1032 } 1033 else if (r1.covers(r2)) { 1034 if (LOGGER.isDebugEnabled()) { 1035 LOGGER.debug("Region r1 covers r2 (r1="+r1+",r2="+r2+")"); 1036 } 1037 r1 = newRegionCoversOld(registration, lastNode, 1038 node, splitted, r1, r2); 1039 } 1040 else if ((r1.isOverlapping(r2)) && 1041 (r2.getLowerBound().compareTo(r1.getLowerBound()) < 0)) { 1042 if (LOGGER.isDebugEnabled()) { 1043 LOGGER.debug("Region r1 ovelaps r2 and r2 < r1 (r1="+r1+ 1044 ",r2="+r2+")"); 1045 } 1046 if (LOGGER.isDebugEnabled()) { 1047 LOGGER.debug("Shrinking node "+node+ 1048 " to "+r1.getLowerBound()); 1049 } 1050 node.shrink(r1.getLowerBound()); 1051 AgentXNode r2b = 1052 node.getClone(new AgentXRegion(r1.getLowerBound(), 1053 r2.getUpperBound())); 1054 r2b.addRegistration(registration); 1055 splitted.add(r2b); 1056 r1 = new AgentXRegion(r2.getUpperBound(), r1.getLowerBound()); 1057 } 1058 else { 1060 if (LOGGER.isDebugEnabled()) { 1061 LOGGER.debug("Region r1 ovelaps r2 and r1 < r2 (r1="+ 1062 r1+",r2="+r2+")"); 1063 } 1064 if (LOGGER.isDebugEnabled()) { 1065 LOGGER.debug("Shrinking node "+node+ 1066 " to "+r1.getUpperBound()); 1067 } 1068 node.shrink(r1.getUpperBound()); 1069 AgentXNode r2b = 1070 node.getClone(new AgentXRegion(r1.getUpperBound(), 1071 r2.getUpperBound())); 1072 node.addRegistration(registration); 1073 splitted.add(r2b); 1074 AgentXNode r1a = 1075 new AgentXNode(new AgentXRegion(r1.getLowerBound(), 1076 r2.getLowerBound()), registration); 1077 splitted.add(r1a); 1078 r1 = null; 1079 } 1080 if (r1 != null) { 1081 if (r1.isEmpty()) { 1082 splitted.add(new AgentXNode(region, registration)); 1083 } 1084 else { 1085 splitted.add(new AgentXNode(r1, registration)); 1086 } 1087 } 1088 lastNode = node; 1089 } 1090 for (Iterator it = splitted.iterator(); it.hasNext(); ) { 1091 AgentXNode n = (AgentXNode) it.next(); 1092 server.register(n, registration.getContext()); 1093 if (LOGGER.isDebugEnabled()) { 1094 LOGGER.debug("Registered splitted AgentX node: "+n); 1095 } 1096 } 1097 } 1098 else { 1099 node = new AgentXNode(region, registration); 1100 server.register(node, registration.getContext()); 1101 if (LOGGER.isDebugEnabled()) { 1102 LOGGER.debug("Registered AgentX node: "+node); 1103 } 1104 } 1105 } 1106 1107 protected boolean removeRegistration(AgentXRegEntry registration, 1108 Iterator regIterator) { 1109 LinkedList remove = new LinkedList(); 1110 AgentXRegion queryRegion = new AgentXRegion(registration.getRegion()); 1111 queryRegion.setUpperIncluded(true); 1112 AgentXNodeQuery query = 1113 new AgentXNodeQuery(registration.getContext(), 1114 queryRegion, 1115 AgentXNodeQuery.QUERY_AGENTX_NODES); 1116 AgentXNode lastNode = null; 1117 AgentXNode node = (AgentXNode)server.lookup(query); 1118 if (node != null) { 1119 for (; (node != null); 1120 node = (AgentXNode) server.lookup(nextQuery(query, lastNode))) { 1121 if (node == lastNode) { 1122 break; 1123 } 1124 if ((node.removeRegistration(registration)) && 1125 (node.getRegistrationCount() == 0)) { 1126 remove.add(node); 1127 } 1128 else { 1129 if ((lastNode != null) && 1130 (lastNode.getRegistrationCount() == 1) && 1131 (node.getRegistrationCount() == 1) && 1132 (lastNode.getScope().getUpperBound().equals( 1133 node.getScope().getLowerBound())) && 1134 (node.getActiveRegistration().equals( 1135 lastNode.getActiveRegistration()))) { 1136 AgentXRegion r = 1137 new AgentXRegion(node.getScope().getLowerBound(), 1138 lastNode.getScope().getUpperBound()); 1139 if (node.getActiveRegistration().getRegion().covers(r)) { 1140 remove.add(node); 1141 lastNode.expand(node.getScope().getUpperBound(), false); 1142 } 1143 } 1144 } 1145 lastNode = node; 1146 } 1147 } 1148 else { 1149 LOGGER.warn("A registration is removed with not associated subtree: "+ 1150 registration); 1151 } 1152 for (Iterator it = remove.iterator(); it.hasNext(); ) { 1153 AgentXNode rnode = (AgentXNode) it.next(); 1154 server.unregister(rnode, registration.getContext()); 1155 } 1156 if (regIterator != null) { 1157 regIterator.remove(); 1158 if (LOGGER.isDebugEnabled()) { 1159 LOGGER.debug("Removed registration "+registration+ 1160 " by session close, "+registrations.size()+" left."); 1161 } 1162 fireMasterChanged(new AgentXMasterEvent(this, 1163 AgentXMasterEvent.REGISTRATION_REMOVED, 1164 registration)); 1165 return true; 1166 } 1167 else if (registrations.remove(registration)) { 1168 if (LOGGER.isDebugEnabled()) { 1169 LOGGER.debug("Removed registration "+registration+ 1170 ", "+registrations.size()+" left."); 1171 } 1172 fireMasterChanged(new AgentXMasterEvent(this, 1173 AgentXMasterEvent.REGISTRATION_REMOVED, 1174 registration)); 1175 return true; 1176 } 1177 return false; 1178 } 1179 1180 private static AgentXRegion newRegionCoversOld(AgentXRegEntry registration, 1181 AgentXNode lastNode, 1182 AgentXNode node, 1183 LinkedList splitted, 1184 AgentXRegion r1, 1185 AgentXRegion r2) { 1186 AgentXNode r1a = null; 1187 if (lastNode != null) { 1188 AgentXRegion r = 1189 new AgentXRegion(lastNode.getScope().getUpperBound(), 1190 r2.getLowerBound()); 1191 r1a = new AgentXNode(r, registration); 1192 } 1193 else { 1194 AgentXRegion r = 1195 new AgentXRegion(r1.getLowerBound(), r2.getLowerBound()); 1196 r1a = new AgentXNode(r, registration); 1197 } 1198 if (!splitted.isEmpty()) { 1199 if (LOGGER.isDebugEnabled()) { 1200 LOGGER.debug("Shrinking node "+splitted.getLast()+ 1201 " to "+r2.getLowerBound()); 1202 } 1203 ((AgentXNode)splitted.getLast()).shrink(r2.getLowerBound()); 1204 } 1205 node.addRegistration(registration); 1206 if ((r1.getLowerBound().equals(r2.getLowerBound())) || 1207 ((!splitted.isEmpty()) && 1208 (((AgentXNode)splitted.getLast()). 1209 getScope().equals(r1a.getScope())))) { 1210 r1a = null; 1211 } 1212 else { 1213 splitted.add(r1a); 1214 } 1215 return new AgentXRegion(r2.getUpperBound(), r1.getUpperBound()); 1216 } 1217 1218 private static void oldRegionCoversNew(AgentXRegEntry registration, 1219 AgentXNode node, 1220 List splitted, 1221 AgentXRegion r1, 1222 AgentXRegion r2) { 1223 AgentXRegion r = new AgentXRegion(r1.getUpperBound(), 1224 node.getScope().getUpperBound()); 1225 AgentXNode r2c = node.getClone(r); 1226 if (r2.getLowerBound().equals(r1.getLowerBound())) { 1227 if (LOGGER.isDebugEnabled()) { 1228 LOGGER.debug("Shrinking node "+node+" to "+r1.getUpperBound()); 1229 } 1230 node.shrink(r1.getUpperBound()); 1231 node.addRegistration(registration); 1232 } 1233 else { 1234 if (LOGGER.isDebugEnabled()) { 1235 LOGGER.debug("Shrinking node "+node+" to "+r1.getLowerBound()); 1236 } 1237 node.shrink(r1.getLowerBound()); 1238 AgentXNode r2b = node.getClone(r1); 1239 r2b.addRegistration(registration); 1240 splitted.add(r2b); 1241 } 1242 splitted.add(r2c); 1243 } 1244 1245 public AgentXResponsePDU openSession(AgentXOpenPDU pdu, 1246 AgentXCommandEvent event) { 1247 AgentXMasterSession session = 1248 new AgentXMasterSession(getNextSessionID(), agentXQueue, 1249 pdu.getSubagentID(), pdu.getSubagentDescr()); 1250 AgentXPeer peer = getPeer(event.getPeerAddress()); 1251 if (peer == null) { 1252 peer = new AgentXPeer(event.getPeerTransport(), event.getPeerAddress()); 1253 addPeer(peer); 1254 LOGGER.warn("Added peer during session opening: "+peer+ 1255 " (peer should have been there already due "+ 1256 "to connection setup)"); 1257 } 1258 session.setPeer(peer); 1259 session.setAgentXVersion(pdu.getVersion() & 0xFF); 1260 if (pdu.getTimeout() != 0) { 1261 session.setTimeout(pdu.getTimeout()); 1262 } 1263 else { 1264 session.setTimeout(defaultTimeout); 1265 } 1266 int sessionAccepted = acceptSession(session); 1267 if (sessionAccepted == AgentXProtocol.AGENTX_SUCCESS) { 1268 addSession(session); 1269 if (LOGGER.isInfoEnabled()) { 1270 LOGGER.info("Session " + session + " opened from "+peer.getAddress()); 1271 } 1272 } 1273 else { 1274 LOGGER.warn("Session open rejected because "+sessionAccepted+" for "+ 1275 session+" from "+event.getPeerAddress()); 1276 } 1277 AgentXResponsePDU response = createResponse(pdu, session); 1278 response.setErrorStatus((short)sessionAccepted); 1279 return response; 1280 } 1281 1282 protected synchronized void addPeer(AgentXPeer peer) { 1283 peers.put(peer.getAddress(), peer); 1284 fireMasterChanged(new AgentXMasterEvent(this, 1285 AgentXMasterEvent.PEER_ADDED, 1286 peer)); 1287 } 1288 1289 protected synchronized AgentXPeer getPeer(Address address) { 1290 return (AgentXPeer) peers.get(address); 1291 } 1292 1293 protected int acceptSession(AgentXMasterSession session) { 1294 AgentXMasterEvent event = 1295 new AgentXMasterEvent(this, AgentXMasterEvent.SESSION_ADDED, session); 1296 fireMasterChanged(event); 1297 return event.getVetoReason(); 1298 } 1299 1300 protected synchronized void addSession(AgentXMasterSession session) { 1301 sessions.put(new Integer (session.getSessionID()), session); 1302 fireMasterChanged(new AgentXMasterEvent(this, 1303 AgentXMasterEvent.SESSION_ADDED, 1304 session)); 1305 } 1306 1307 protected synchronized AgentXMasterSession removeSession(int sessionID) { 1308 AgentXMasterSession session = 1309 (AgentXMasterSession) sessions.remove(new Integer (sessionID)); 1310 if (session != null) { 1311 fireMasterChanged(new AgentXMasterEvent(this, 1312 AgentXMasterEvent.SESSION_REMOVED, 1313 session)); 1314 } 1315 return session; 1316 } 1317 1318 protected AgentXResponsePDU createResponse(AgentXPDU request, 1319 AgentXSession session) { 1320 OctetString context = DEFAULT_CONTEXT; 1321 if (request instanceof AgentXContextPDU) { 1322 OctetString reqContext = ((AgentXContextPDU) request).getContext(); 1323 if (server.isContextSupported(reqContext)) { 1324 context = reqContext; 1325 } 1326 } 1327 AgentXResponsePDU response = 1328 new AgentXResponsePDU(getContextSysUpTime(context).toInt(), 1329 (short)0, (short)0); 1330 if (session == null) { 1331 response.setSessionID(request.getSessionID()); 1332 response.setErrorStatus(AgentXProtocol.AGENTX_NOT_OPEN); 1333 } 1334 else { 1335 response.setSessionID(session.getSessionID()); 1336 } 1337 response.setPacketID(request.getPacketID()); 1338 response.setTransactionID(request.getTransactionID()); 1339 response.setByteOrder(request.getByteOrder()); 1340 return response; 1341 } 1342 1343 protected void sendResponse(AgentXPDU response, AgentXSession session) { 1344 if (LOGGER.isDebugEnabled()) { 1345 LOGGER.debug("Sending AgentX response "+response+" to session "+session); 1346 } 1347 try { 1348 agentX.send(response, 1349 session.createAgentXTarget(), session.getPeer().getTransport()); 1350 } 1351 catch (IOException ex) { 1352 if (LOGGER.isDebugEnabled()) { 1353 ex.printStackTrace(); 1354 } 1355 LOGGER.error("Failed to send AgentX response "+response+" to session "+ 1356 session+" because: "+ex.getMessage(), ex); 1357 } 1358 } 1359 1360 public synchronized void connectionStateChanged(TransportStateEvent change) { 1361 Address peerAddress = change.getPeerAddress(); 1362 switch (change.getNewState()) { 1363 case TransportStateEvent.STATE_CLOSED: 1364 case TransportStateEvent.STATE_DISCONNECTED_REMOTELY: 1365 case TransportStateEvent.STATE_DISCONNECTED_TIMEOUT: { 1366 AgentXPeer removedPeer = removePeer(peerAddress); 1367 fireMasterChanged(new AgentXMasterEvent(this, 1368 AgentXMasterEvent.PEER_REMOVED, 1369 removedPeer)); 1370 break; 1371 } 1372 default: { 1373 AgentXPeer newPeer = 1374 new AgentXPeer((TransportMapping)change.getSource(), peerAddress); 1375 addPeer(newPeer); 1376 } 1377 } 1378 } 1379 1380 protected synchronized AgentXPeer removePeer(Address peerAddress) { 1381 AgentXPeer peer = (AgentXPeer) peers.remove(peerAddress); 1382 if (peer != null) { 1383 peer.setClosing(true); 1384 for (Iterator it = sessions.values().iterator(); it.hasNext(); ) { 1385 AgentXMasterSession session = (AgentXMasterSession) it.next(); 1386 if (session.getPeer().equals(peer)) { 1387 it.remove(); 1388 fireMasterChanged(new AgentXMasterEvent(this, 1389 AgentXMasterEvent.SESSION_REMOVED, 1390 session)); 1391 indexRegistry.release(session.getSessionID()); 1392 removeAllRegistrations(session); 1393 session.setClosed(true); 1394 if (peer.getTransport() instanceof ConnectionOrientedTransportMapping) { 1395 try { 1396 ((ConnectionOrientedTransportMapping)peer.getTransport()). 1397 close(peer.getAddress()); 1398 } 1399 catch (IOException iox) { 1400 LOGGER.warn("Caught exception while closing transport: " + 1401 iox.getMessage()); 1402 } 1403 } 1404 } 1405 } 1406 1414 } 1415 else { 1416 LOGGER.warn("Tried to remove peer with address "+peerAddress+ 1417 " which is not part of peer list: "+peers); 1418 } 1419 return peer; 1420 } 1421 1422 protected synchronized AgentXPeer closePeer(Address peerAddress, byte reason) { 1423 AgentXPeer peer = (AgentXPeer) peers.remove(peerAddress); 1424 if (peer != null) { 1425 peer.setClosing(true); 1426 Map s = new HashMap(sessions); 1427 for (Iterator it = s.values().iterator(); it.hasNext(); ) { 1428 AgentXMasterSession session = (AgentXMasterSession) it.next(); 1429 if (session.getPeer().equals(peer)) { 1430 closeSession(session, reason); 1431 if (peer.getTransport() instanceof ConnectionOrientedTransportMapping) { 1432 try { 1433 ((ConnectionOrientedTransportMapping)peer.getTransport()). 1434 close(peer.getAddress()); 1435 } 1436 catch (IOException iox) { 1437 LOGGER.warn("Caught exception while closing transport: " + 1438 iox.getMessage()); 1439 } 1440 } 1441 } 1442 } 1443 } 1444 else { 1445 LOGGER.warn("Tried to remove peer with address "+peerAddress+ 1446 " which is not part of peer list: "+peers); 1447 } 1448 return peer; 1449 } 1450 1451 public byte getAgentXVersion() { 1452 return AgentXProtocol.VERSION_1_0; 1453 } 1454 1455 public synchronized void addAgentXMasterListener(AgentXMasterListener l) { 1456 if (agentXMasterListeners == null) { 1457 agentXMasterListeners = new Vector(2); 1458 } 1459 agentXMasterListeners.add(l); 1460 } 1461 1462 public synchronized void removeAgentXMasterListener(AgentXMasterListener l) { 1463 if (agentXMasterListeners != null) { 1464 agentXMasterListeners.remove(l); 1465 } 1466 } 1467 1468 protected void fireMasterChanged(AgentXMasterEvent event) { 1469 if (agentXMasterListeners != null) { 1470 Vector listeners = agentXMasterListeners; 1471 int count = listeners.size(); 1472 for (int i = 0; i < count; i++) { 1473 try { 1474 ((AgentXMasterListener) listeners.get(i)).masterChanged(event); 1475 } 1476 catch (Exception ex) { 1477 LOGGER.error("AgentXMasterListener "+listeners.get(i)+ 1478 " threw exception on "+event+": "+ex.getMessage(), ex); 1479 } 1480 } 1481 } 1482 } 1483 1484 protected static class AgentXRegEntryComparator implements Comparator { 1485 1486 public int compare(Object o1, Object o2) { 1487 AgentXRegEntry a = (AgentXRegEntry)o1; 1488 AgentXRegEntry b = (AgentXRegEntry)o2; 1489 int c = a.getRegion().compareTo(b.getRegion()); 1490 if (c == 0) { 1491 c = a.getContext().compareTo(b.getContext()); 1492 } 1493 return c; 1494 } 1495 } 1496 1497 public void onResponse(AgentXResponseEvent event) { 1498 AgentXResponsePDU pdu = event.getResponse(); 1499 AgentXPending pending = (AgentXPending) event.getUserObject(); 1500 if (LOGGER.isDebugEnabled()) { 1501 LOGGER.debug("Processing AgentX response "+pdu+" for request "+pending); 1502 } 1503 if (pending.getRequest() != null) { 1504 AgentXPending p = 1505 agentXQueue.remove(pending.getAgentXPDU().getSessionID(), 1506 pending.getRequest().getTransactionID()); 1507 if (p == null) { 1508 LOGGER.warn("Pending AgentX request not found (may be timed out already): " + 1509 "Received AgentX response from " + event.getPeerAddress() + 1510 " for request " + event.getUserObject() + 1511 " does not match any pending request:" + pdu); 1512 return; 1513 } 1514 } 1515 if ((pdu == null) && 1516 (pending.getAgentXPDU().getType() != AgentXPDU.AGENTX_CLOSE_PDU)) { 1517 pending.getSession().incConsecutiveTimeouts(); 1518 pending.getReferences(). 1519 nextSubRequest().getStatus().setErrorStatus(PDU.genErr); 1520 if (pending.getSession().getConsecutiveTimeouts() > 1521 maxConsecutiveTimeouts) { 1522 closeSession(pending.getSession(), AgentXProtocol.REASON_TIMEOUTS); 1523 } 1524 } 1525 if (pdu != null) { 1526 pending.getSession().clearConsecutiveTimeouts(); 1527 } 1528 if (requestList.contains(pending.getRequest())) { 1529 if (pdu != null) { 1530 if (checkAgentXResponse(pdu, pending)) { 1531 switch (pending.getAgentXPDU().getType()) { 1532 case AgentXPDU.AGENTX_GET_PDU: { 1533 processAgentXGetResponse(pending, pdu); 1534 break; 1535 } 1536 case AgentXPDU.AGENTX_GETNEXT_PDU: { 1537 processAgentXGetNextResponse(pending, pdu); 1538 break; 1539 } 1540 case AgentXPDU.AGENTX_GETBULK_PDU: { 1541 processAgentXBulkResponse(pending, pdu); 1542 break; 1543 } 1544 case AgentXPDU.AGENTX_CLEANUPSET_PDU: 1545 case AgentXPDU.AGENTX_UNDOSET_PDU: 1546 case AgentXPDU.AGENTX_COMMITSET_PDU: 1547 case AgentXPDU.AGENTX_TESTSET_PDU: { 1548 processAgentXSetResponse(pending, pdu); 1549 break; 1550 } 1551 default: { 1552 LOGGER.warn("Unhandled AgentX response "+pdu); 1553 } 1554 } 1555 } 1556 else { 1557 LOGGER.warn("Invalid AgentX response " + pdu + 1558 " on request " + pending); 1559 } 1560 } 1561 if (!pending.getRequest().isComplete()) { 1563 reprocessRequest(server, pending.getRequest()); 1564 } 1565 finalizeRequest((CommandResponderEvent) 1566 pending.getRequest().getSource(), pending.getRequest(), 1567 server); 1568 } 1569 else { 1570 if (pending.getAgentXPDU().getType() == AgentXPDU.AGENTX_CLOSE_PDU) { 1571 if (pdu != null) { 1572 LOGGER.info("Subagent " + event.getPeerAddress() + 1573 " confirmed close, disconnection transport now"); 1574 } 1575 else { 1576 LOGGER.info("Subagent "+event.getPeerAddress()+ 1577 " did not answered on session close, "+ 1578 "disconnection now"); 1579 } 1580 AgentXPeer peer = pending.getSession().getPeer(); 1581 if (peer != null) { 1582 closePeer(peer); 1583 } 1584 } 1585 else { 1586 LOGGER.info("Received late response " + pdu + " on AgentX request: " + 1587 pending); 1588 super.release(server, pending.getRequest()); 1589 } 1590 } 1591 } 1592 1593 protected void processAgentXGetResponse(AgentXPending pending, 1594 AgentXResponsePDU pdu) { 1595 if (pdu.getErrorStatus() != PDU.noError) { 1596 processsErrorResponse(pending, pdu); 1597 } 1598 else { 1599 VariableBinding[] vbs = pdu.getVariableBindings(); 1600 SubRequestIterator subRequests = pending.getReferences(); 1601 for (int i=0; (i<pending.getRequest().size()) && 1602 subRequests.hasNext(); i++) { 1603 SnmpSubRequest sreq = (SnmpSubRequest) subRequests.nextSubRequest(); 1604 sreq.getVariableBinding().setVariable(vbs[i].getVariable()); 1605 sreq.getStatus().setPhaseComplete(true); 1606 } 1607 } 1608 } 1609 1610 protected void processAgentXGetNextResponse(AgentXPending pending, 1611 AgentXResponsePDU pdu) { 1612 if (pdu.getErrorStatus() != PDU.noError) { 1613 processsErrorResponse(pending, pdu); 1614 } 1615 else { 1616 processAgentXNextResponse(pending, pdu, pending.getRequest().size()); 1617 } 1618 } 1619 1620 protected void processAgentXSetResponse(AgentXPending pending, 1621 AgentXResponsePDU pdu) { 1622 if (pdu.getErrorStatus() != PDU.noError) { 1623 processsErrorResponse(pending, pdu); 1624 } 1625 else { 1626 SubRequestIterator it = pending.getReferences(); 1627 while (it.hasNext()) { 1628 SubRequest sreq = it.nextSubRequest(); 1629 sreq.getStatus().setPhaseComplete(true); 1630 } 1631 } 1632 } 1633 1634} 1635 | Popular Tags |