1 22 23 package org.snmp4j.agent.agentx.subagent; 24 25 import java.io.IOException ; 26 import java.util.*; 27 28 import org.snmp4j.PDU; 29 import org.snmp4j.TransportMapping; 30 import org.snmp4j.agent.*; 31 import org.snmp4j.agent.agentx.*; 32 import org.snmp4j.agent.agentx.event.PingEvent; 33 import org.snmp4j.agent.agentx.event.PingListener; 34 import org.snmp4j.agent.mo.MOScalar; 35 import org.snmp4j.agent.mo.snmp.CoexistenceInfo; 36 import org.snmp4j.agent.request.*; 37 import org.snmp4j.log.LogAdapter; 38 import org.snmp4j.log.LogFactory; 39 import org.snmp4j.mp.SnmpConstants; 40 import org.snmp4j.smi.*; 41 import org.snmp4j.transport.ConnectionOrientedTransportMapping; 42 import org.snmp4j.transport.TransportMappings; 43 import org.snmp4j.util.ThreadPool; 44 import org.snmp4j.agent.mo.MOTableRow; 45 import org.snmp4j.agent.agentx.subagent.index.AnyNewIndexOID; 46 import org.snmp4j.agent.agentx.subagent.index.NewIndexOID; 47 import org.snmp4j.agent.mo.snmp.SysUpTime; 48 49 56 public class AgentXSubagent 57 implements AgentXCommandListener, NotificationOriginator { 58 59 private static final LogAdapter LOGGER = 60 LogFactory.getLogger(AgentXSubagent.class); 61 62 private ArrayList moServers = new ArrayList(); 63 private ThreadPool threadPool; 64 private RequestFactory factory; 65 private AgentX agentX; 66 protected Map requestList; 67 68 protected Map peers = new LinkedHashMap(2); 69 protected Map sessions = new Hashtable(2); 70 71 72 protected RequestHandler requestHandlerGet; 73 protected RequestHandler requestHandlerGetNext; 74 protected RequestHandler requestHandlerGetBulk; 75 protected RequestHandler requestHandlerTestSet; 76 protected RequestHandler requestHandlerCommitSet; 77 protected RequestHandler requestHandlerUndoSet; 78 protected RequestHandler requestHandlerCleanupSet; 79 80 protected int nextTransactionID = 0; 81 82 private OID subagentID; 83 private OctetString subagentDescr; 84 85 private long timeout = AgentXProtocol.DEFAULT_TIMEOUT_SECONDS * 1000; 86 private byte defaultPriority = AgentXProtocol.DEFAULT_PRIORITY; 87 88 private Timer pingTimer; 89 private transient Vector pingListeners; 90 91 92 public AgentXSubagent(AgentX agentX, 93 OID subagentID, OctetString subagentDescr) { 94 this.requestList = Collections.synchronizedMap(new HashMap(10)); 95 this.agentX = agentX; 96 this.subagentID = subagentID; 97 this.subagentDescr = subagentDescr; 98 this.factory = new DefaultAgentXRequestFactory(); 99 requestHandlerGet = new GetRequestHandler(); 100 requestHandlerCleanupSet = new CleanupSetHandler(); 101 requestHandlerCommitSet = new CommitSetHandler(); 102 requestHandlerTestSet = new TestSetHandler(); 103 requestHandlerUndoSet = new UndoSetHandler(); 104 requestHandlerGetNext = new GetNextHandler(); 105 requestHandlerGetBulk = new GetBulkHandler(); 106 agentX.addCommandResponder(this); 107 } 108 109 118 public void setPingDelay(int seconds) { 119 if (pingTimer != null) { 120 pingTimer.cancel(); 121 pingTimer = null; 122 } 123 if (seconds > 0) { 124 pingTimer = new Timer(); 125 pingTimer.schedule(new PingTask(), seconds * 1000, seconds * 1000); 126 } 127 } 128 129 public void processCommand(AgentXCommandEvent event) { 130 if (event.getCommand() != null) { 131 event.setProcessed(true); 132 Command command = new Command(event); 133 if (threadPool != null) { 134 threadPool.execute(command); 135 } 136 else { 137 command.run(); 138 } 139 } 140 } 141 142 protected synchronized int getNextTransactionID() { 143 return nextTransactionID++; 144 } 145 146 protected synchronized int closeSession(int sessionID, byte reason) throws 147 IOException { 148 AgentXSession session = removeSession(sessionID); 149 if ((session == null) || (session.isClosed())) { 150 return AgentXProtocol.AGENTX_NOT_OPEN; 151 } 152 session.setClosed(true); 153 AgentXClosePDU closePDU = 154 new AgentXClosePDU(AgentXProtocol.REASON_SHUTDOWN); 155 AgentXTarget target = session.createAgentXTarget(); 156 AgentXResponseEvent resp = 157 agentX.send(closePDU, target, session.getPeer().getTransport()); 158 if (resp == null) { 159 return AgentXProtocol.AGENTX_TIMEOUT; 160 } 161 return ((AgentXResponsePDU)resp.getResponse()).getErrorStatus(); 162 } 163 164 protected int openSession(TransportMapping transport, 165 Address masterAddress, 166 AgentXSession session) throws IOException { 167 AgentXOpenPDU openPDU = new AgentXOpenPDU(0, getNextTransactionID(), 168 0, session.getTimeout(), 169 subagentID, subagentDescr); 170 AgentXResponseEvent responseEvent = 171 agentX.send(openPDU, session.createAgentXTarget(), transport); 172 if (responseEvent.getResponse() == null) { 173 LOGGER.error("Timeout on connection to master "+masterAddress); 174 } 175 else if (responseEvent.getResponse() instanceof AgentXResponsePDU) { 176 AgentXResponsePDU response = responseEvent.getResponse(); 177 if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) { 178 session.setSessionID(response.getSessionID()); 179 } 180 return response.getErrorStatus(); 181 } 182 else { 183 LOGGER.error("Received packet on open PDU is not a response AgentX PDU: "+ 184 responseEvent); 185 } 186 return AgentXProtocol.AGENTX_TIMEOUT; 187 } 188 189 private static int getResponseStatus(AgentXResponseEvent responseEvent) { 190 if (responseEvent.getResponse() == null) { 191 LOGGER.error("Timeout on connection to master "+ 192 responseEvent.getTarget()); 193 return AgentXProtocol.AGENTX_TIMEOUT; 194 } 195 else if (responseEvent.getResponse() instanceof AgentXResponsePDU) { 196 AgentXResponsePDU response = responseEvent.getResponse(); 197 return response.getErrorStatus(); 198 } 199 else { 200 LOGGER.error("Received packet on open PDU is not a response AgentX PDU: "+ 201 responseEvent); 202 } 203 return AgentXProtocol.AGENTX_ERROR; 204 } 205 206 public void disconnect(Address masterAddress) throws IOException { 207 AgentXPeer peer = (AgentXPeer) peers.remove(masterAddress); 208 if (peer != null) { 209 TransportMapping transport = peer.getTransport(); 210 if (transport instanceof ConnectionOrientedTransportMapping) { 211 ((ConnectionOrientedTransportMapping)transport).close(masterAddress); 212 } 213 } 214 } 215 216 public int connect(Address masterAddress, Address localAddress, 217 AgentXSession session) throws IOException { 218 AgentXPeer peer = (AgentXPeer) peers.get(masterAddress); 219 TransportMapping transport; 220 if (peer == null) { 221 transport = addMaster(localAddress); 222 peer = new AgentXPeer(transport, masterAddress); 223 } 224 else { 225 transport = peer.getTransport(); 226 } 227 peer.setTimeout(session.getTimeout()); 228 session.setPeer(peer); 229 int status = AgentXProtocol.AGENTX_TIMEOUT; 230 try { 231 status = openSession(transport, masterAddress, session); 232 if (status != AgentXProtocol.AGENTX_TIMEOUT) { 233 peers.put(masterAddress, peer); 234 LOGGER.info("Added new peer address="+masterAddress+",peer="+peer); 235 } 236 } 237 catch (IOException ex) { 238 LOGGER.error(ex); 239 removeMaster(transport); 240 return AgentXProtocol.AGENTX_ERROR; 241 } 242 if (status == AgentXProtocol.AGENTX_SUCCESS) { 243 sessions.put(new Integer (session.getSessionID()), session); 244 LOGGER.info("Opened subagent session successfully: "+session); 245 } 246 else { 247 removeMaster(transport); 248 } 249 return status; 250 } 251 252 public int close(AgentXSession session, byte reason) throws IOException { 253 return closeSession(session.getSessionID(), reason); 254 } 255 256 private synchronized AgentXSession getSession(int sessionID) { 257 return (AgentXSession) sessions.get(new Integer (sessionID)); 258 } 259 260 private synchronized AgentXSession removeSession(int sessionID) { 261 return (AgentXSession) sessions.remove(new Integer (sessionID)); 262 } 263 264 public void setDefaultPriority(byte priority) { 265 this.defaultPriority = priority; 266 } 267 268 public byte getDefaultPriority() { 269 return defaultPriority; 270 } 271 272 286 protected byte getPriority(ManagedObject mo, AgentXRegion region) { 287 return defaultPriority; 288 } 289 290 299 public List registerRegions(AgentXSession session, OctetString context) { 300 return registerRegions(session, context, null); 301 } 302 303 316 public List registerRegions(AgentXSession session, OctetString context, 317 TimeTicks sysUpTime) { 318 LinkedList failures = new LinkedList(); 319 MOServer server = getServer(context); 320 if (server == null) { 321 LOGGER.warn("No MOServer found for context '"+context+"'"); 322 return null; 323 } 324 for (Iterator it = server.iterator(); it.hasNext();) { 325 ManagedObject mo = (ManagedObject)it.next(); 326 if (mo instanceof AgentXSharedMOTable) { 327 List failedRows = registerSharedTableRows(session, context, 328 (AgentXSharedMOTable)mo); 329 failures.addAll(failedRows); 330 } 331 else { 332 MOScope scope = mo.getScope(); 333 AgentXRegion region = 334 new AgentXRegion(scope.getLowerBound(), scope.getUpperBound()); 335 if (mo instanceof MOScalar) { 336 region.setSingleOID(true); 337 } 338 region.setUpperIncluded(scope.isUpperIncluded()); 339 try { 340 int status = registerRegion(session, context, region, 341 getPriority(mo, region), sysUpTime); 342 if (status != AgentXProtocol.AGENTX_SUCCESS) { 343 failures.add(mo); 344 if (LOGGER.isWarnEnabled()) { 345 LOGGER.warn("Failed to registered MO " + scope + 346 " with status = " + 347 status); 348 } 349 } 350 else { 351 if (LOGGER.isInfoEnabled()) { 352 LOGGER.info("Registered MO " + scope + " successfully"); 353 } 354 } 355 356 } 357 catch (IOException ex) { 358 LOGGER.warn("Failed to register " + mo + " in context '" + context + 359 "' of session " + session); 360 failures.add(mo); 361 } 362 } 363 } 364 return failures; 365 } 366 367 382 public List registerSharedTableRows(AgentXSession session, 383 OctetString context, 384 AgentXSharedMOTable mo) { 385 LinkedList failedRows = new LinkedList(); 386 AgentXSharedMOTableSupport sharedTableSupport = 387 new AgentXSharedMOTableSupport(agentX, session, context); 388 synchronized (mo) { 389 if (mo instanceof AgentXSharedMutableMOTable) { 390 ((AgentXSharedMutableMOTable) 391 mo).setAgentXSharedMOTableSupport(sharedTableSupport); 392 } 393 for (Iterator it = mo.getModel().iterator(); it.hasNext();) { 394 MOTableRow row = (MOTableRow) it.next(); 395 OID newIndex = (OID) row.getIndex().clone(); 396 int status = sharedTableSupport.allocateIndex(context, mo.getIndexDef(), 397 (byte)AgentXSharedMOTableSupport.INDEX_MODE_ALLOCATE, 398 newIndex); 399 if (status == AgentXProtocol.AGENTX_SUCCESS) { 400 if ((newIndex instanceof AnyNewIndexOID) || 401 (newIndex instanceof NewIndexOID)) { 402 if (mo instanceof AgentXSharedMutableMOTable) { 403 ((AgentXSharedMutableMOTable)mo). 404 changeRowIndex(newIndex, row.getIndex()); 405 } 406 } 407 status = sharedTableSupport.registerRow(mo, row); 408 if (status != AgentXProtocol.AGENTX_SUCCESS) { 409 sharedTableSupport.deallocateIndex(context, mo.getIndexDef(), 410 row.getIndex()); 411 LOGGER.warn("Failed to register row with "+status+" for "+row); 412 failedRows.add(row); 413 } 414 } 415 else { 416 LOGGER.warn("Failed to allocate index with "+status+" for row "+ 417 row); 418 failedRows.add(row); 419 } 420 } 421 } 422 return failedRows; 423 } 424 425 protected int registerRegion(AgentXSession session, 426 OctetString context, AgentXRegion region, 427 byte priority, 428 TimeTicks sysUpTime) throws IOException { 429 if ((session == null) || (session.isClosed())) { 430 return AgentXProtocol.AGENTX_NOT_OPEN; 431 } 432 long t = (this.timeout == 0) ? session.getTimeout()*1000 : this.timeout; 433 AgentXRegisterPDU pdu = 434 new AgentXRegisterPDU(context, region.getLowerBound(), priority, 435 region.getRangeSubID(), 436 region.getUpperBoundSubID()); 437 pdu.setSessionAttributes(session); 438 AgentXResponseEvent event = 439 agentX.send(pdu, new AgentXTarget(session.getPeer().getAddress(), t), 440 session.getPeer().getTransport()); 441 if ((sysUpTime != null) && (event.getResponse() != null)) { 442 sysUpTime.setValue(event.getResponse().getSysUpTime() & 0xFFFFFFFFL); 443 } 444 return getResponseStatus(event); 445 } 446 447 protected int unregisterRegion(AgentXSession session, 448 OctetString context, AgentXRegion region, 449 byte timeout) throws IOException { 450 if ((session == null) || (session.isClosed())) { 451 return AgentXProtocol.AGENTX_NOT_OPEN; 452 } 453 byte t = (timeout == 0) ? session.getTimeout() : timeout; 454 AgentXUnregisterPDU pdu = 455 new AgentXUnregisterPDU(context, region.getLowerBound(), t, 456 region.getRangeSubID(), 457 region.getUpperBoundSubID()); 458 pdu.setSessionAttributes(session); 459 AgentXResponseEvent event = 460 agentX.send(pdu, new AgentXTarget(session.getPeer().getAddress(), 461 this.timeout), 462 session.getPeer().getTransport()); 463 return getResponseStatus(event); 464 } 465 466 467 468 protected TransportMapping addMaster(Address localAddress) 469 throws IOException 470 { 471 482 TransportMapping transport = 483 TransportMappings.getInstance().createTransportMapping(localAddress); 484 if (transport instanceof ConnectionOrientedTransportMapping) { 485 ConnectionOrientedTransportMapping tcpTransport = 486 (ConnectionOrientedTransportMapping)transport; 487 tcpTransport.setConnectionTimeout(0); 488 tcpTransport.setMessageLengthDecoder(new AgentXProtocol()); 489 } 490 agentX.addTransportMapping(transport); 491 transport.listen(); 492 return transport; 493 } 494 495 protected void removeMaster(TransportMapping transport) { 496 agentX.removeTransportMapping(transport); 497 try { 498 transport.close(); 499 } 500 catch (IOException ex) { 501 LOGGER.warn("Closing transport mapping "+transport+" failed with: "+ 502 ex.getMessage()); 503 } 504 } 505 506 public synchronized void addMOServer(MOServer server) { 507 moServers.add(server); 508 } 509 510 public synchronized void removeMOServer(MOServer server) { 511 moServers.remove(server); 512 } 513 514 public synchronized MOServer getServer(OctetString context) { 515 for (int i=0; i<moServers.size(); i++) { 516 MOServer s = (MOServer)moServers.get(i); 517 if (s.isContextSupported(context)) { 518 return s; 519 } 520 } 521 return null; 522 } 523 524 public synchronized Collection getContexts() { 525 LinkedList allContexts = new LinkedList(); 526 for (int i=0; i<moServers.size(); i++) { 527 MOServer s = (MOServer)moServers.get(i); 528 OctetString[] contexts = s.getContexts(); 529 allContexts.addAll(Arrays.asList(contexts)); 530 } 531 return allContexts; 532 } 533 534 public ThreadPool getThreadPool() { 535 return threadPool; 536 } 537 538 public void setThreadPool(ThreadPool threadPool) { 539 this.threadPool = threadPool; 540 } 541 542 public void dispatchCommand(AgentXCommandEvent cmd) { 543 boolean pendingSessionClose = false; 544 if (cmd.getCommand().isConfirmedPDU()) { 545 AgentXRequest request = null; 546 MOServer server = null; 547 switch (cmd.getCommand().getType()) { 548 case AgentXPDU.AGENTX_GET_PDU: { 549 request = (AgentXRequest) factory.createRequest(cmd, null); 550 server = getServer(request.getContext()); 551 requestHandlerGet.processPdu(request, server); 552 break; 553 } 554 case AgentXPDU.AGENTX_GETNEXT_PDU: { 555 request = (AgentXRequest) factory.createRequest(cmd, null); 556 server = getServer(request.getContext()); 557 requestHandlerGetNext.processPdu(request, server); 558 break; 559 } 560 case AgentXPDU.AGENTX_GETBULK_PDU: { 561 request = (AgentXRequest) factory.createRequest(cmd, null); 562 server = getServer(request.getContext()); 563 requestHandlerGetBulk.processPdu(request, server); 564 break; 565 } 566 case AgentXPDU.AGENTX_TESTSET_PDU: { 567 request = (AgentXRequest) factory.createRequest(cmd, null); 568 request.setPhase(Request.PHASE_2PC_PREPARE); 569 server = getServer(request.getContext()); 570 requestHandlerTestSet.processPdu(request, server); 571 requestList.put(createRequestID(cmd), request); 572 break; 573 } 574 case AgentXPDU.AGENTX_COMMITSET_PDU: 575 case AgentXPDU.AGENTX_UNDOSET_PDU: 576 case AgentXPDU.AGENTX_CLEANUPSET_PDU: { 577 RequestID reqID = createRequestID(cmd); 578 request = (AgentXRequest) requestList.get(reqID); 579 if (request == null) { 580 LOGGER.error("Request with ID "+reqID+" not found in request list"); 581 request = new AgentXRequest(cmd); 582 request.setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR); 583 break; 584 } 585 server = getServer(request.getContext()); 586 switch (cmd.getCommand().getType()) { 587 case AgentXPDU.AGENTX_COMMITSET_PDU: 588 request.setPhase(Request.PHASE_2PC_COMMIT); 589 requestHandlerCommitSet.processPdu(request, server); 590 break; 591 case AgentXPDU.AGENTX_UNDOSET_PDU: 592 request.setPhase(Request.PHASE_2PC_UNDO); 593 requestHandlerUndoSet.processPdu(request, server); 594 break; 595 case AgentXPDU.AGENTX_CLEANUPSET_PDU: 596 request.setPhase(Request.PHASE_2PC_CLEANUP); 597 requestHandlerCleanupSet.processPdu(request, server); 598 break; 599 default: { 600 LOGGER.fatal("Internal error"); 601 } 602 } 603 if (cmd.getCommand().getType() != AgentXPDU.AGENTX_COMMITSET_PDU) { 604 requestList.remove(reqID); 606 } 607 break; 608 } 609 case AgentXPDU.AGENTX_CLOSE_PDU: { 610 AgentXSession session = 611 removeSession(cmd.getCommand().getSessionID()); 612 if (session != null) { 613 session.setClosed(true); 614 pendingSessionClose = true; 615 } 616 break; 617 } 618 default: { 619 LOGGER.error("Unhandled PDU type: "+cmd.getCommand()); 620 request = new AgentXRequest(cmd); 621 request.setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR); 622 } 623 } 624 if (request != null) { 625 if (request.isPhaseComplete()) { 628 sendResponse(cmd, request); 630 } 631 if (server != null) { 632 release(server, request); 633 } 634 } 635 if (pendingSessionClose) { 636 try { 637 disconnect(cmd.getPeerAddress()); 638 } 639 catch (IOException ex) { 640 LOGGER.error("Failed to disconnect from master at "+ 641 cmd.getPeerAddress()+": "+ex.getMessage(), ex); 642 } 643 } 644 } 645 else { 646 processResponse(cmd); 647 } 648 } 649 650 protected void sendResponse(AgentXCommandEvent cmd, AgentXRequest request) { 651 AgentXMessageDispatcher dispatcher = cmd.getDispatcher(); 652 AgentXResponsePDU response = request.getResponsePDU(); 653 if (response != null) { 654 AgentXPDU rpdu = cmd.getCommand(); 655 response.setSessionID(rpdu.getSessionID()); 656 response.setTransactionID(rpdu.getTransactionID()); 657 response.setByteOrder(rpdu.getByteOrder()); 658 response.setPacketID(rpdu.getPacketID()); 659 try { 661 dispatcher.send(cmd.getPeerTransport(), 662 cmd.getPeerAddress(), response, null); 663 } 664 catch (IOException ex) { 665 LOGGER.warn("Failed to send AgentX response to '"+ 666 cmd.getPeerAddress()+"' with error: "+ex.getMessage()); 667 } 668 } 669 } 670 671 protected void release(MOServer server, Request req) { 672 for (Iterator it = req.iterator(); it.hasNext();) { 673 SubRequest sreq = (SubRequest)it.next(); 674 if (sreq.getTargetMO() != null) { 675 server.unlock(req, sreq.getTargetMO()); 676 } 677 } 678 } 679 680 private static RequestID createRequestID(AgentXCommandEvent cmd) { 681 return new RequestID(cmd.getPeerAddress(), 682 cmd.getCommand().getSessionID(), 683 cmd.getCommand().getTransactionID()); 684 685 } 686 687 protected void processResponse(AgentXCommandEvent cmd) { 688 if (LOGGER.isDebugEnabled()) { 689 LOGGER.debug("Received response "+cmd); 690 } 691 } 692 693 protected void processNextSubRequest(Request request, MOServer server, 694 OctetString context, 695 SubRequest sreq) 696 throws NoSuchElementException 697 { 698 DefaultMOContextScope scope = 701 (DefaultMOContextScope)sreq.getScope(); 702 MOQuery query = sreq.getQuery(); 703 if (query == null) { 704 query = new DefaultMOQuery(scope, false, request); 705 } 706 while (!sreq.getStatus().isProcessed()) { 707 ManagedObject mo = server.lookup(query); 708 if (mo == null) { 709 if (LOGGER.isDebugEnabled()) { 710 LOGGER.debug("EndOfMibView at scope="+query.getScope()+ 711 " and query "+query); 712 } 713 sreq.getVariableBinding().setVariable(Null.endOfMibView); 714 sreq.getStatus().setPhaseComplete(true); 715 break; 716 } 717 try { 718 if (!mo.next(sreq)) { 719 sreq.getVariableBinding().setVariable(Null.instance); 723 scope.substractScope(mo.getScope()); 724 query.substractScope(mo.getScope()); 726 } 727 } 728 catch (Exception moex) { 729 if (LOGGER.isDebugEnabled()) { 730 moex.printStackTrace(); 731 } 732 LOGGER.warn(moex); 733 if (sreq.getStatus().getErrorStatus() == PDU.noError) { 734 sreq.getStatus().setErrorStatus(PDU.genErr); 735 } 736 } 737 } 738 } 739 740 755 public Object notify(OctetString context, 756 OID notificationID, 757 VariableBinding[] vbs) { 758 return notify(context, notificationID, null, vbs); 759 } 760 761 public Object notify(OctetString context, OID notificationID, 762 TimeTicks sysUpTime, VariableBinding[] vbs) { 763 AgentXSession session = firstSession(); 764 AgentXResponseEvent agentXResponse = null; 765 try { 766 agentXResponse = 767 notify(session, context, notificationID, sysUpTime, vbs); 768 if ((agentXResponse == null) || (agentXResponse.getResponse() == null)) { 769 LOGGER.warn("Timeout on sending notification in context '"+context+ 770 "' with ID '"+notificationID+"' and payload "+ 771 Arrays.asList(vbs)); 772 return null; 773 } 774 return agentXResponse; 775 } 776 catch (IOException ex) { 777 LOGGER.error("Failed to send notification in context '"+context+ 778 "' with ID '"+notificationID+"' and payload "+ 779 Arrays.asList(vbs)+", reason is: "+ex.getMessage()); 780 return null; 781 } 782 } 783 784 791 public synchronized final AgentXSession firstSession() { 792 if (sessions.size() > 0) { 793 return (AgentXSession) sessions.values().iterator().next(); 794 } 795 return null; 796 } 797 798 public AgentXResponseEvent notify(AgentXSession session, 799 OctetString context, 800 OID notificationID, 801 TimeTicks sysUpTime, 802 VariableBinding[] vbs) throws IOException { 803 int offset = 1; 804 if (sysUpTime != null) { 805 offset = 2; 806 } 807 VariableBinding[] notifyVBs = new VariableBinding[vbs.length+offset]; 808 if (sysUpTime != null) { 809 notifyVBs[0] = new VariableBinding(SnmpConstants.sysUpTime, sysUpTime); 810 } 811 notifyVBs[offset-1] = 812 new VariableBinding(SnmpConstants.snmpTrapOID, notificationID); 813 System.arraycopy(vbs, 0, notifyVBs, offset, vbs.length); 814 AgentXNotifyPDU notifyPDU = new AgentXNotifyPDU(context, notifyVBs); 815 notifyPDU.setSessionAttributes(session); 816 notifyPDU.setTransactionID(getNextTransactionID()); 817 AgentXResponseEvent response = 818 agentX.send(notifyPDU, session.createAgentXTarget(), 819 session.getPeer().getTransport()); 820 return response; 821 } 822 823 public int addAgentCaps(AgentXSession session, 824 OctetString context, OID id, OctetString descr) { 825 AgentXAddAgentCapsPDU pdu = new AgentXAddAgentCapsPDU(context, id, descr); 826 pdu.setSessionAttributes(session); 827 try { 828 AgentXResponseEvent resp = agentX.send(pdu, session.createAgentXTarget(), 829 session.getPeer().getTransport()); 830 if (resp.getResponse() == null) { 831 return AgentXProtocol.AGENTX_TIMEOUT; 832 } 833 return resp.getResponse().getErrorStatus(); 834 } 835 catch (IOException ex) { 836 LOGGER.error("Failed to send AgentX AddAgentCaps PDU "+pdu+ 837 " because: "+ex.getMessage(), ex); 838 return AgentXProtocol.AGENTX_NOT_OPEN; 839 } 840 } 841 842 public int removeAgentCaps(AgentXSession session, 843 OctetString context, OID id) { 844 AgentXRemoveAgentCapsPDU pdu = new AgentXRemoveAgentCapsPDU(context, id); 845 pdu.setSessionAttributes(session); 846 try { 847 AgentXResponseEvent resp = agentX.send(pdu, session.createAgentXTarget(), 848 session.getPeer().getTransport()); 849 return resp.getResponse().getErrorStatus(); 850 } 851 catch (IOException ex) { 852 LOGGER.error("Failed to send AgentX RemoveAgentCaps PDU "+pdu+ 853 " because: "+ex.getMessage(), ex); 854 return AgentXProtocol.AGENTX_NOT_OPEN; 855 } 856 } 857 858 public void addPingListener(PingListener l) { 859 if (pingListeners == null) { 860 pingListeners = new Vector(); 861 } 862 pingListeners.add(l); 863 } 864 865 public void removePingListener(PingListener l) { 866 if (pingListeners != null) { 867 synchronized (pingListeners) { 868 pingListeners.remove(l); 869 } 870 } 871 } 872 873 protected void firePinged(PingEvent event) { 874 if (pingListeners != null) { 875 synchronized (pingListeners) { 876 Vector listeners = pingListeners; 877 int count = listeners.size(); 878 for (int i = 0; i < count; i++) { 879 ((PingListener) listeners.elementAt(i)).pinged(event); 880 } 881 } 882 } 883 } 884 885 private static void initRequestPhase(Request request) { 886 if (request.getPhase() == Request.PHASE_INIT) { 887 request.nextPhase(); 888 } 889 } 890 891 static class GetRequestHandler implements RequestHandler { 892 893 public boolean isSupported(int pduType) { 894 return pduType == AgentXPDU.AGENTX_GET_PDU; 895 } 896 897 public void processPdu(Request request, MOServer server) { 898 initRequestPhase(request); 899 try { 900 SubRequestIterator it = (SubRequestIterator) request.iterator(); 901 while (it.hasNext()) { 902 SubRequest sreq = it.nextSubRequest(); 903 DefaultMOQuery query = 904 new DefaultMOQuery((MOContextScope)sreq.getScope(), 905 false, request); 906 ManagedObject mo = server.lookup(query); 907 if (mo == null) { 908 sreq.getVariableBinding().setVariable(Null.noSuchObject); 909 sreq.getStatus().setPhaseComplete(true); 910 continue; 911 } 912 try { 913 mo.get(sreq); 914 } 915 catch (Exception moex) { 916 if (LOGGER.isDebugEnabled()) { 917 moex.printStackTrace(); 918 } 919 LOGGER.warn(moex); 920 if (sreq.getStatus().getErrorStatus() == PDU.noError) { 921 sreq.getStatus().setErrorStatus(PDU.genErr); 922 } 923 } 924 } 925 } 926 catch (NoSuchElementException nsex) { 927 if (LOGGER.isDebugEnabled()) { 928 nsex.printStackTrace(); 929 } 930 LOGGER.error("SubRequest not found"); 931 request.setErrorStatus(PDU.genErr); 932 } 933 } 934 } 935 936 class GetNextHandler implements RequestHandler { 937 938 public void processPdu(Request request, MOServer server) { 939 initRequestPhase(request); 940 OctetString context = request.getContext(); 941 try { 942 SubRequestIterator it = (SubRequestIterator) request.iterator(); 943 while (it.hasNext()) { 944 SubRequest sreq = it.nextSubRequest(); 945 processNextSubRequest(request, server, context, sreq); 946 } 947 } 948 catch (NoSuchElementException nsex) { 949 if (LOGGER.isDebugEnabled()) { 950 nsex.printStackTrace(); 951 } 952 LOGGER.error("SubRequest not found"); 953 request.setErrorStatus(PDU.genErr); 954 } 955 } 956 957 958 public boolean isSupported(int pduType) { 959 return (pduType == PDU.GETNEXT); 960 } 961 962 } 963 964 class GetBulkHandler implements RequestHandler { 965 966 public void processPdu(Request request, MOServer server) { 967 initRequestPhase(request); 968 OctetString context = request.getContext(); 969 AgentXRequest req = (AgentXRequest)request; 970 int nonRep = req.getNonRepeaters(); 971 try { 972 SubRequestIterator it = (SubRequestIterator) request.iterator(); 973 int i = 0; 974 for (; ((i < nonRep) && it.hasNext()); i++) { 976 SubRequest sreq = it.nextSubRequest(); 977 processNextSubRequest(request, server, context, sreq); 978 } 979 for (; it.hasNext(); i++) { 981 SubRequest sreq = it.nextSubRequest(); 982 processNextSubRequest(request, server, context, sreq); 983 } 984 } 985 catch (NoSuchElementException nsex) { 986 if (LOGGER.isDebugEnabled()) { 987 nsex.printStackTrace(); 988 } 989 LOGGER.error("SubRequest not found"); 990 request.setErrorStatus(PDU.genErr); 991 } 992 993 } 994 995 public boolean isSupported(int pduType) { 996 return (pduType == PDU.GETBULK); 997 } 998 999 } 1000 1001 1002 static class TestSetHandler implements RequestHandler { 1003 1004 public void processPdu(Request request, MOServer server) { 1005 try { 1006 SubRequestIterator it = (SubRequestIterator) request.iterator(); 1007 while ((!request.isPhaseComplete()) && (it.hasNext())) { 1008 SubRequest sreq = it.nextSubRequest(); 1009 if (sreq.isComplete()) { 1010 continue; 1011 } 1012 DefaultMOQuery query = 1013 new DefaultMOQuery((MOContextScope)sreq.getScope(), false, 1014 request); 1015 ManagedObject mo = server.lookup(query); 1016 if (mo == null) { 1017 sreq.getStatus().setErrorStatus(PDU.notWritable); 1018 break; 1019 } 1020 sreq.setTargetMO(mo); 1021 server.lock(sreq.getRequest(), mo); 1022 try { 1023 mo.prepare(sreq); 1024 sreq.getStatus().setPhaseComplete(true); 1025 } 1026 catch (Exception moex) { 1027 if (sreq.getStatus().getErrorStatus() == PDU.noError) { 1028 sreq.getStatus().setErrorStatus(PDU.genErr); 1029 } 1030 LOGGER.error("Exception occurred while preparing SET request, "+ 1031 "returning genErr: "+moex.getMessage(), moex); 1032 } 1033 } 1034 } 1035 catch (NoSuchElementException nsex) { 1036 if (LOGGER.isDebugEnabled()) { 1037 nsex.printStackTrace(); 1038 } 1039 LOGGER.error("Cannot find sub-request: ", nsex); 1040 request.setErrorStatus(PDU.genErr); 1041 } 1042 } 1043 1044 public boolean isSupported(int pduType) { 1045 return (pduType == AgentXPDU.AGENTX_TESTSET_PDU); 1046 } 1047 } 1048 1049 class UndoSetHandler implements RequestHandler { 1050 1051 public void processPdu(Request request, MOServer server) { 1052 try { 1053 SubRequestIterator it = (SubRequestIterator) request.iterator(); 1054 while (it.hasNext()) { 1055 SubRequest sreq = it.nextSubRequest(); 1056 if (sreq.isComplete()) { 1057 continue; 1058 } 1059 ManagedObject mo = sreq.getTargetMO(); 1060 if (mo == null) { 1061 DefaultMOQuery query = 1062 new DefaultMOQuery((MOContextScope)sreq.getScope(), true); 1063 mo = server.lookup(query); 1064 } 1065 if (mo == null) { 1066 sreq.getStatus().setErrorStatus(PDU.undoFailed); 1067 continue; 1068 } 1069 try { 1070 mo.undo(sreq); 1071 sreq.getStatus().setPhaseComplete(true); 1072 } 1073 catch (Exception moex) { 1074 if (LOGGER.isDebugEnabled()) { 1075 moex.printStackTrace(); 1076 } 1077 LOGGER.error(moex); 1078 if (sreq.getStatus().getErrorStatus() == PDU.noError) { 1079 sreq.getStatus().setErrorStatus(PDU.undoFailed); 1080 } 1081 } 1082 } 1083 } 1084 catch (NoSuchElementException nsex) { 1085 if (LOGGER.isDebugEnabled()) { 1086 nsex.printStackTrace(); 1087 } 1088 LOGGER.error("Cannot find sub-request: ", nsex); 1089 request.setErrorStatus(PDU.genErr); 1090 } 1091 } 1092 1093 public boolean isSupported(int pduType) { 1094 return (pduType == AgentXPDU.AGENTX_UNDOSET_PDU); 1095 } 1096 } 1097 1098 class CommitSetHandler implements RequestHandler { 1099 1100 public void processPdu(Request request, MOServer server) { 1101 try { 1102 SubRequestIterator it = (SubRequestIterator) request.iterator(); 1103 while ((!request.isPhaseComplete()) && (it.hasNext())) { 1104 SubRequest sreq = it.nextSubRequest(); 1105 if (sreq.isComplete()) { 1106 continue; 1107 } 1108 ManagedObject mo = sreq.getTargetMO(); 1109 if (mo == null) { 1110 DefaultMOQuery query = 1111 new DefaultMOQuery((MOContextScope)sreq.getScope(), true); 1112 mo = server.lookup(query); 1113 } 1114 if (mo == null) { 1115 sreq.getStatus().setErrorStatus(PDU.commitFailed); 1116 continue; 1117 } 1118 try { 1119 mo.commit(sreq); 1120 sreq.getStatus().setPhaseComplete(true); 1121 } 1122 catch (Exception moex) { 1123 if (LOGGER.isDebugEnabled()) { 1124 moex.printStackTrace(); 1125 } 1126 LOGGER.error(moex); 1127 if (sreq.getStatus().getErrorStatus() == PDU.noError) { 1128 sreq.getStatus().setErrorStatus(PDU.commitFailed); 1129 } 1130 } 1131 } 1132 } 1133 catch (NoSuchElementException nsex) { 1134 if (LOGGER.isDebugEnabled()) { 1135 nsex.printStackTrace(); 1136 } 1137 LOGGER.error("Cannot find sub-request: ", nsex); 1138 request.setErrorStatus(PDU.genErr); 1139 } 1140 } 1141 1142 public boolean isSupported(int pduType) { 1143 return (pduType == AgentXPDU.AGENTX_COMMITSET_PDU); 1144 } 1145 1146 } 1147 1148 class CleanupSetHandler implements RequestHandler { 1149 1150 public void processPdu(Request request, MOServer server) { 1151 try { 1152 SubRequestIterator it = (SubRequestIterator) request.iterator(); 1153 while (it.hasNext()) { 1154 SubRequest sreq = it.nextSubRequest(); 1155 if (sreq.isComplete()) { 1156 continue; 1157 } 1158 ManagedObject mo = sreq.getTargetMO(); 1159 if (mo == null) { 1160 DefaultMOQuery query = 1161 new DefaultMOQuery((MOContextScope)sreq.getScope(), false); 1162 mo = server.lookup(query); 1163 } 1164 if (mo == null) { 1165 sreq.completed(); 1166 continue; 1167 } 1168 server.unlock(sreq.getRequest(), mo); 1169 try { 1170 mo.cleanup(sreq); 1171 sreq.getStatus().setPhaseComplete(true); 1172 } 1173 catch (Exception moex) { 1174 if (LOGGER.isDebugEnabled()) { 1175 moex.printStackTrace(); 1176 } 1177 LOGGER.error(moex); 1178 } 1179 } 1180 } 1181 catch (NoSuchElementException nsex) { 1182 if (LOGGER.isDebugEnabled()) { 1183 nsex.printStackTrace(); 1184 } 1185 LOGGER.warn("Cannot find sub-request: "+ nsex.getMessage()); 1186 } 1187 } 1188 1189 public boolean isSupported(int pduType) { 1190 return (pduType == AgentXPDU.AGENTX_CLEANUPSET_PDU); 1191 } 1192 1193 } 1194 1195 1196 static class DefaultAgentXRequestFactory implements RequestFactory { 1197 1198 public Request createRequest(EventObject initiatingEvent, 1199 CoexistenceInfo cinfo) { 1200 Request request = new AgentXRequest((AgentXCommandEvent)initiatingEvent); 1201 if (LOGGER.isDebugEnabled()) { 1202 LOGGER.debug("Creating AgentX request "+request+ 1203 " from "+initiatingEvent); 1204 } 1205 return request; 1206 } 1207 1208 } 1209 1210 class Command implements Runnable { 1211 1212 private AgentXCommandEvent request; 1213 1214 public Command(AgentXCommandEvent event) { 1215 this.request = event; 1216 } 1217 1218 public void run() { 1219 dispatchCommand(request); 1220 } 1221 1222 } 1223 1224 1225 static class RequestID implements Comparable { 1226 private Address masterAddress; 1227 private int sessionID; 1228 private int transactionID; 1229 1230 public RequestID(Address masterAddress, int sessionID, int transactionID) { 1231 this.masterAddress = masterAddress; 1232 this.sessionID = sessionID; 1233 this.transactionID = transactionID; 1234 } 1235 1236 public int compareTo(Object o) { 1237 RequestID other = (RequestID)o; 1238 int c = masterAddress.compareTo(other.masterAddress); 1239 if (c == 0) { 1240 c = sessionID - other.sessionID; 1241 if (c == 0) { 1242 c = transactionID - other.transactionID; 1243 } 1244 } 1245 return c; 1246 } 1247 1248 public boolean equals(Object obj) { 1249 if (obj instanceof RequestID) { 1250 return (compareTo(obj) == 0); 1251 } 1252 return false; 1253 } 1254 1255 public int hashCode() { 1256 return transactionID; 1257 } 1258 1259 } 1260 1261 class PingTask extends TimerTask { 1262 1263 public void run() { 1264 List l; 1265 synchronized (sessions) { 1266 l = new LinkedList(sessions.values()); 1267 } 1268 for (Iterator it = l.iterator(); it.hasNext();) { 1269 AgentXSession session = (AgentXSession) it.next(); 1270 for (Iterator cit = getContexts().iterator(); cit.hasNext(); ) { 1271 OctetString context = (OctetString) cit.next(); 1272 AgentXPingPDU ping = new AgentXPingPDU(context); 1273 ping.setSessionAttributes(session); 1274 ping.setTransactionID(getNextTransactionID()); 1275 PingEvent pingEvent; 1276 try { 1277 AgentXResponseEvent resp = 1278 agentX.send(ping, session.createAgentXTarget(), 1279 session.getPeer().getTransport()); 1280 pingEvent = new PingEvent(this, session, 1281 resp.getResponse()); 1282 } 1283 catch (IOException ex) { 1284 pingEvent = new PingEvent(this, session, ex); 1285 } 1286 firePinged(pingEvent); 1287 if (LOGGER.isDebugEnabled()) { 1288 LOGGER.debug("Fired ping event "+pingEvent); 1289 } 1290 if (pingEvent.isCloseSession() || pingEvent.isResetSession()) { 1291 try { 1292 closeSession(session.getSessionID(), 1293 AgentXProtocol.REASON_TIMEOUTS); 1294 if (pingEvent.isResetSession()) { 1295 reopenSession(session); 1296 } 1297 } 1298 catch (IOException ex1) { 1299 } 1300 } 1301 } 1302 } 1303 } 1304 1305 1316 public int reopenSession(AgentXSession session) throws IOException { 1317 return openSession(session.getPeer().getTransport(), 1318 session.getPeer().getAddress(), 1319 session); 1320 } 1321 1322 } 1323} 1324 | Popular Tags |