KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > snmp4j > agent > agentx > master > AgentXCommandProcessor


1 /*_############################################################################
2   _##
3   _## SNMP4J-AgentX - AgentXCommandProcessor.java
4   _##
5   _## Copyright (C) 2005-2007 Frank Fock (SNMP4J.org)
6   _##
7   _## This program is free software; you can redistribute it and/or modify
8   _## it under the terms of the GNU General Public License version 2 as
9   _## published by the Free Software Foundation.
10   _##
11   _## This program is distributed in the hope that it will be useful,
12   _## but WITHOUT ANY WARRANTY; without even the implied warranty of
13   _## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14   _## GNU General Public License for more details.
15   _##
16   _## You should have received a copy of the GNU General Public License
17   _## along with this program; if not, write to the Free Software
18   _## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19   _## MA 02110-1301 USA
20   _##
21   _##########################################################################*/

22
23 package org.snmp4j.agent.agentx.master;
24
25 import java.io.IOException JavaDoc;
26 import java.util.*;
27
28 import org.snmp4j.CommandResponderEvent;
29 import org.snmp4j.PDU;
30 import org.snmp4j.TransportMapping;
31 import org.snmp4j.agent.*;
32 import org.snmp4j.agent.agentx.*;
33 import org.snmp4j.agent.agentx.master.AgentXQueue.AgentXQueueEntry;
34 import org.snmp4j.agent.agentx.master.index.AgentXIndexRegistry;
35 import org.snmp4j.agent.mo.snmp.AgentCapabilityList;
36 import org.snmp4j.agent.mo.snmp.SNMPv2MIB.SysUpTimeImpl;
37 import org.snmp4j.agent.mo.snmp.SysUpTime;
38 import org.snmp4j.agent.request.*;
39 import org.snmp4j.agent.security.VACM;
40 import org.snmp4j.log.LogAdapter;
41 import org.snmp4j.log.LogFactory;
42 import org.snmp4j.mp.SnmpConstants;
43 import org.snmp4j.smi.*;
44 import org.snmp4j.transport.ConnectionOrientedTransportMapping;
45 import org.snmp4j.transport.TransportStateEvent;
46 import org.snmp4j.transport.TransportStateListener;
47
48 public class AgentXCommandProcessor extends CommandProcessor implements
49     AgentXCommandListener, TransportStateListener,
50     AgentXResponseListener {
51
52   public static final int MAX_REPROCESSING_DEFAULT = 100;
53
54   private static final LogAdapter LOGGER =
55       LogFactory.getLogger(AgentXCommandProcessor.class);
56
57   private static final OctetString DEFAULT_CONTEXT = new OctetString();
58
59   private AgentXQueue agentXQueue;
60   private AgentX agentX;
61   private Map sessions = new HashMap();
62   private Map peers = new HashMap(10);
63   private Set registrations = new TreeSet(new AgentXRegEntryComparator());
64   private MOServer server;
65   private int nextSessionID = 1;
66   private byte defaultTimeout = AgentXProtocol.DEFAULT_TIMEOUT_SECONDS;
67   private int maxConsecutiveTimeouts =
68       AgentXProtocol.DEFAULT_MAX_CONSECUTIVE_TIMEOUTS;
69   private int maxParseErrors =
70       AgentXProtocol.DEFAULT_MAX_PARSE_ERRORS;
71   private Map contextInfo = new HashMap(10);
72   private boolean acceptNewContexts = false;
73
74   private int nextPacketID = 0;
75
76   protected AgentXIndexRegistry indexRegistry = new AgentXIndexRegistry();
77
78   private transient Vector agentXMasterListeners;
79
80   private int maxReprocessing = MAX_REPROCESSING_DEFAULT;
81
82   public AgentXCommandProcessor(OctetString contextEngineID,
83                                 AgentXQueue queue,
84                                 AgentX agentX,
85                                 MOServer server) {
86     super(contextEngineID);
87     this.agentXQueue = queue;
88     this.agentX = agentX;
89     this.server = server;
90     if (this.agentXQueue.getServer4BulkOptimization() == null) {
91       this.agentXQueue.setServer4BulkOptimization(server);
92     }
93   }
94
95   private synchronized int createNextPacketID() {
96     return nextPacketID++;
97   }
98
99   public void setMaxReprocessing(int maxReprocessing) {
100     this.maxReprocessing = maxReprocessing;
101   }
102
103   public int getMaxReprocessing() {
104     return maxReprocessing;
105   }
106
107   /**
108    * Sets the maximum number of parse errors allowed per peer. If this number
109    * is exceeded then the peer will be closed with reason
110    * {@link AgentXProtocol#REASON_PARSE_ERROR}.
111    *
112    * @param maxParseErrors
113    * a positive value (including zero) sets the upper limit of parse errors
114    * tolerated per peer. If the number of parse errors exceeds this limit,
115    * all sessions with that peer will be closed. A negative value deactivates
116    * any limit.
117    * @since 1.0.1
118    */

119   public void setMaxParseErrors(int maxParseErrors) {
120     this.maxParseErrors = maxParseErrors;
121   }
122
123   /**
124    * Gets the upper limit for parse errors for an AgentX peer.
125    * @return
126    * a positive value (including zero) indicates the upper limit of parse
127    * errors tolerated per peer. A negative value indicates that there is no
128    * limit.
129    * @since 1.0.1
130    */

131   public int getMaxParseErrors() {
132     return maxParseErrors;
133   }
134
135   protected void finalizeRequest(CommandResponderEvent command,
136                                  Request req,
137                                  MOServer server) {
138     boolean complete = req.isComplete();
139     AgentXQueueEntry entry = agentXQueue.get(req.getTransactionID());
140     if (entry != null) {
141       Collection pending = entry.getPending();
142       entry.updateTimestamp();
143       for (Iterator it = pending.iterator(); it.hasNext(); ) {
144         AgentXPending p = (AgentXPending) it.next();
145         if (pending != null) {
146           AgentXPDU agentXPDU = p.getAgentXPDU();
147           AgentXMasterSession session = p.getSession();
148           agentXPDU.setSessionID(session.getSessionID());
149           agentXPDU.setTransactionID(req.getTransactionID());
150           agentXPDU.setPacketID(createNextPacketID());
151           p.updateTimestamp();
152           try {
153             agentX.send(agentXPDU,
154                         session.createAgentXTarget(),
155                         session.getPeer().getTransport(),
156                         p, this);
157           }
158           catch (IOException JavaDoc ex) {
159             LOGGER.error("Failed to send AgentX subrequest: " +
160                          ex.getMessage());
161             ((SubRequest) p.getReferences().next()).
162                 getStatus().setErrorStatus(PDU.genErr);
163             break;
164           }
165         }
166       }
167     }
168     else {
169       if (complete) {
170         agentXQueue.removeAll(req.getTransactionID());
171       }
172       else {
173         // there are still incomplete sub-requests -> reprocess them
174
if (req.getReprocessCounter() < this.maxReprocessing) {
175           reprocessRequest(server, (SnmpRequest)req);
176         }
177         else {
178           req.setErrorStatus(PDU.genErr);
179           LOGGER.warn("The following request has been repeocessed "+
180                       req.getReprocessCounter()+" which exceeds the agent's "+
181                       "upper limit of "+this.maxReprocessing+": "+
182                       req);
183         }
184       }
185       super.finalizeRequest(command, req, server);
186     }
187   }
188
189   protected synchronized int getNextSessionID() {
190     return nextSessionID++;
191   }
192
193   public MOServer getServer() {
194     return server;
195   }
196
197   public byte getDefaultTimeout() {
198     return defaultTimeout;
199   }
200
201   /**
202    * Gets the maximum number of consecutive timeouts allowed per session.
203    * @return
204    * the maximum number of consecutive timeouts allowed per session
205    */

206   public int getMaxConsecutiveTimeouts() {
207     return maxConsecutiveTimeouts;
208   }
209
210   /**
211    * Indicates whether subagents can register contexts that are not yet
212    * supported by this master agent.
213    * @return
214    * <code>true</code> if subagents can register objects for new contexts.
215    */

216   public boolean isAcceptNewContexts() {
217     return acceptNewContexts;
218   }
219
220   public void setDefaultTimeout(byte defaultTimeout) {
221     this.defaultTimeout = defaultTimeout;
222   }
223
224   /**
225    * Sets the maximum number of timeouts allowed per session. If the number
226    * is exceeded then the session will be closed with reason
227    * {@link AgentXProtocol#REASON_TIMEOUTS}.
228    * @param maxConsecutiveTimeouts
229    * the maximum number of timeouts (should be greater than zero).
230    */

231   public void setMaxConsecutiveTimeouts(int maxConsecutiveTimeouts) {
232     this.maxConsecutiveTimeouts = maxConsecutiveTimeouts;
233   }
234
235   /**
236    * Enables or disables accepting new contexts from subagents.
237    * @param acceptNewContexts
238    * <code>true</code> if subagents are allowed to register objects for new
239    * contexts, <code>false</code> otherwise. Default is <code>false</code>.
240    */

241   public void setAcceptNewContexts(boolean acceptNewContexts) {
242     this.acceptNewContexts = acceptNewContexts;
243   }
244
245   public void processCommand(AgentXCommandEvent event) {
246     boolean pendingClose = false;
247     if (event.isException()) {
248       AgentXPeer peer = getPeer(event.getPeerAddress());
249       if (peer != null) {
250         peer.incParseErrors();
251         LOGGER.warn("AgentX parse exception from peer '"+peer+
252                     "' : " + event.getException());
253         if ((maxParseErrors >= 0) && (peer.getParseErrors() > maxParseErrors)) {
254           LOGGER.warn("Removing peer due to excessive parse errors: " +peer);
255           closePeer(peer.getAddress(), AgentXProtocol.REASON_PARSE_ERROR);
256         }
257       }
258       else {
259         LOGGER.error("AgentX parse exception from unknown peer '"+
260                      event.getPeerAddress()+
261                      "' : " + event.getException());
262       }
263     }
264     else {
265       AgentXPDU pdu = event.getCommand();
266       AgentXMasterSession session = getSession(pdu);
267       AgentXResponsePDU response = null;
268       if (LOGGER.isDebugEnabled()) {
269         LOGGER.debug("Processing AgentX PDU "+pdu+" for session "+session);
270       }
271       switch (pdu.getType()) {
272         case AgentXPDU.AGENTX_RESPONSE_PDU: {
273           LOGGER.error(
274               "Internal error: received AgentX response without request");
275           return;
276         }
277         case AgentXPDU.AGENTX_OPEN_PDU: {
278           response = openSession((AgentXOpenPDU) pdu, event);
279           session = getSession(response.getSessionID());
280           break;
281         }
282         case AgentXPDU.AGENTX_CLOSE_PDU: {
283           response = closeSession((AgentXClosePDU)pdu, session);
284           pendingClose = true;
285           break;
286         }
287         case AgentXPDU.AGENTX_REGISTER_PDU: {
288           response = register((AgentXRegisterPDU)pdu, event, session);
289           break;
290         }
291         case AgentXPDU.AGENTX_UNREGISTER_PDU: {
292           response = unregister((AgentXUnregisterPDU)pdu, event, session);
293           break;
294         }
295         case AgentXPDU.AGENTX_ADDAGENTCAPS_PDU: {
296           response = addAgentCaps((AgentXAddAgentCapsPDU)pdu, session);
297           break;
298         }
299         case AgentXPDU.AGENTX_REMOVEAGENTCAPS_PDU: {
300           response = removeAgentCaps((AgentXRemoveAgentCapsPDU)pdu, session);
301           break;
302         }
303         case AgentXPDU.AGENTX_NOTIFY_PDU: {
304           response = notify((AgentXNotifyPDU)pdu, session);
305           break;
306         }
307         case AgentXPDU.AGENTX_PING_PDU: {
308           response = ping((AgentXPingPDU)pdu, session);
309           break;
310         }
311         case AgentXPDU.AGENTX_INDEXALLOCATE_PDU: {
312           response = indexAllocate((AgentXIndexAllocatePDU)pdu, session);
313           break;
314         }
315         case AgentXPDU.AGENTX_INDEXDEALLOCATE_PDU: {
316           response = indexDeallocate((AgentXIndexDeallocatePDU)pdu, session);
317           break;
318         }
319         default:
320           LOGGER.warn("Unknown AgentX PDU type received: " + pdu);
321       }
322       if ((response != null) && (session != null)) {
323         sendResponse(response, session);
324       }
325       if (pendingClose) {
326         if (session != null) {
327           closePeer(session.getPeer());
328         }
329       }
330     }
331     event.setProcessed(true);
332   }
333
334   private void closePeer(AgentXPeer peer) {
335     TransportMapping transport = peer.getTransport();
336     if (transport instanceof ConnectionOrientedTransportMapping) {
337       try {
338         if (((ConnectionOrientedTransportMapping)
339              transport).close(peer.getAddress())) {
340           if (LOGGER.isInfoEnabled()) {
341             LOGGER.info("Closed sub-agent connection to " +
342                         peer.getAddress());
343           }
344         }
345         else {
346           LOGGER.warn("Failed to close sub-agent connection to " +
347                       peer.getAddress());
348         }
349       }
350       catch (IOException JavaDoc ex) {
351         LOGGER.error("Failed to close transport mapping "+
352                      peer.getTransport()+" because: "+
353                      ex.getMessage(), ex);
354       }
355     }
356   }
357
358   public AgentXResponsePDU indexDeallocate(AgentXIndexDeallocatePDU pdu,
359                                            AgentXMasterSession session) {
360     AgentXResponsePDU response = createResponse(pdu, session);
361     boolean contextSupported = server.isContextSupported(pdu.getContext());
362     if (contextSupported) {
363       VariableBinding[] vbs = pdu.getVariableBindings();
364       // test index allocation
365
deallocateIndexes(response, pdu, session, vbs, true);
366       if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) {
367         // do it on success
368
deallocateIndexes(response, pdu, session, vbs, false);
369         response.setVariableBindings(vbs);
370       }
371     }
372     else {
373       response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
374     }
375     return response;
376   }
377
378   private boolean checkIfContextIsSupported(OctetString context) {
379     boolean contextSupported = server.isContextSupported(context);
380     if (LOGGER.isDebugEnabled()) {
381       LOGGER.debug("Checking context '"+context+"' is supported");
382     }
383     if (isAcceptNewContexts() && !contextSupported) {
384       server.addContext(context);
385       contextSupported = server.isContextSupported(context);
386       if (LOGGER.isInfoEnabled()) {
387         LOGGER.info("Adding new context '"+context+
388                     "' on subagent request returned: "+contextSupported);
389       }
390     }
391     return contextSupported;
392   }
393
394   public AgentXResponsePDU indexAllocate(AgentXIndexAllocatePDU pdu,
395                                          AgentXMasterSession session) {
396     AgentXResponsePDU response = createResponse(pdu, session);
397     response.setVariableBindings(pdu.getVariableBindings());
398     boolean contextSupported = checkIfContextIsSupported(pdu.getContext());
399     if (contextSupported) {
400       VariableBinding[] vbs = pdu.getVariableBindings();
401       // test index allocation
402
allocateIndexes(response, pdu, session, vbs, true);
403       if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) {
404         // do it on success
405
allocateIndexes(response, pdu, session, vbs, false);
406         response.setVariableBindings(vbs);
407       }
408     }
409     else {
410       response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
411     }
412     return response;
413   }
414
415   private int allocateIndexes(AgentXResponsePDU response,
416                               AgentXIndexAllocatePDU pdu,
417                               AgentXMasterSession session,
418                               VariableBinding[] vbs,
419                               boolean testOnly) {
420     int status = AgentXProtocol.AGENTX_SUCCESS;
421     int i=0;
422     for (; (i<vbs.length) && (status == AgentXProtocol.AGENTX_SUCCESS); i++) {
423       VariableBinding vb = vbs[i];
424       if (pdu.isFlagSet(AgentXProtocol.FLAG_ANY_INDEX)) {
425         status = indexRegistry.anyIndex(session.getSessionID(),
426                                         pdu.getContext(), vb, testOnly);
427       }
428       else if (pdu.isFlagSet(AgentXProtocol.FLAG_NEW_INDEX)) {
429         status = indexRegistry.newIndex(session.getSessionID(),
430                                         pdu.getContext(), vb, testOnly);
431       }
432       else {
433         status = indexRegistry.allocate(session.getSessionID(),
434                                         pdu.getContext(), vb, testOnly);
435       }
436     }
437     response.setErrorStatus(status);
438     if (status != AgentXProtocol.AGENTX_SUCCESS) {
439       response.setErrorIndex(i);
440     }
441     return status;
442   }
443
444   private int deallocateIndexes(AgentXResponsePDU response,
445                                 AgentXIndexDeallocatePDU pdu,
446                                 AgentXMasterSession session,
447                                 VariableBinding[] vbs,
448                                 boolean testOnly) {
449     int status = AgentXProtocol.AGENTX_SUCCESS;
450     int i=0;
451     for (; (i<vbs.length) && (status == AgentXProtocol.AGENTX_SUCCESS); i++) {
452       VariableBinding vb = vbs[i];
453       status = indexRegistry.release(session.getSessionID(),
454                                      pdu.getContext(), vb, testOnly);
455     }
456     response.setErrorStatus(status);
457     if (status != AgentXProtocol.AGENTX_SUCCESS) {
458       response.setErrorIndex(i);
459     }
460     return status;
461   }
462
463   protected void processAgentXSearchResponse(AgentXPending pending,
464                                              AgentXResponsePDU pdu) {
465     if (pdu.getErrorStatus() != PDU.noError) {
466       processsErrorResponse(pending, pdu);
467     }
468     else {
469       // no error -> normal processing
470
if (pending.getAgentXPDU().getType() == AgentXPDU.AGENTX_GETBULK_PDU) {
471         processAgentXNextResponse(pending, pdu, Integer.MAX_VALUE);
472       }
473       else {
474         processAgentXNextResponse(pending, pdu,
475                                   ((AgentXRequestPDU)pending.getAgentXPDU()).
476                                   getRanges().length);
477       }
478     }
479   }
480
481   private SubRequestIterator
482       processAgentXNextResponse(AgentXPending pending,
483                                 AgentXResponsePDU pdu,
484                                 int subRequestIndexUpperBound) throws
485       NoSuchElementException
486   {
487     VariableBinding[] vbs = pdu.getVariableBindings();
488     AgentXRequestPDU axReqPDU = (AgentXRequestPDU) pending.getAgentXPDU();
489     SubRequestIterator subRequests = pending.getReferences();
490     for (int i=0; (i<subRequestIndexUpperBound) && subRequests.hasNext(); i++) {
491       SnmpSubRequest sreq = (SnmpSubRequest) subRequests.nextSubRequest();
492       processNextSubRequest(vbs, axReqPDU, i, i, sreq);
493     }
494     return subRequests;
495   }
496
497   private void processNextSubRequest(VariableBinding[] vbs,
498                                      AgentXRequestPDU axReqPDU,
499                                      int vbIndex,
500                                      int rangeIndex,
501                                      SnmpSubRequest sreq) {
502     MOScope srange = axReqPDU.getRanges()[rangeIndex];
503     if (vbIndex < vbs.length) {
504       VariableBinding vb = vbs[vbIndex];
505       if (vb.getSyntax() == SMIConstants.EXCEPTION_END_OF_MIB_VIEW) {
506         processEndOfMibView(sreq, srange, vb.getOid());
507       }
508       else if (!srange.covers(vb.getOid())) {
509         processEndOfMibView(sreq, srange, null);
510       }
511       else if ((vb.isException()) ||
512                (super.vacm.isAccessAllowed(sreq.getSnmpRequest().
513                                            getViewName(),
514                                            vb.getOid()) != VACM.VACM_OK)) {
515         DefaultMOContextScope nscope = (DefaultMOContextScope) sreq.getScope();
516         nscope.substractScope(srange);
517         nscope.setUpperBound(null);
518         nscope.setUpperIncluded(true);
519         // reset query because scope changed!
520
sreq.setQuery(null);
521         sreq.getStatus().setProcessed(false);
522       }
523       else {
524         sreq.getVariableBinding().setOid(vb.getOid());
525         sreq.getVariableBinding().setVariable(vb.getVariable());
526         sreq.getStatus().setPhaseComplete(true);
527         if (LOGGER.isDebugEnabled()) {
528           LOGGER.debug("Assigned next subrequest "+sreq);
529         }
530         // Not needed here because bulk processing does it anyway:
531
sreq.updateNextRepetition();
532       }
533     }
534     else {
535       // less VBs than expected
536
processEndOfMibView(sreq, srange, null);
537     }
538   }
539
540   private static void processEndOfMibView(SnmpSubRequest sreq, MOScope srange,
541                                           OID oid) {
542     if (srange.getUpperBound() == null) {
543       // unbounded
544
// set also all following repetitions to endOfMibView
545
SubRequestIterator tail = sreq.repetitions();
546       while (tail.hasNext()) {
547         SubRequest sr = tail.nextSubRequest();
548         if (oid == null) {
549           sr.getVariableBinding().setOid(srange.getLowerBound());
550         }
551         else {
552           sreq.getVariableBinding().setOid(oid);
553         }
554         sr.getVariableBinding().setVariable(Null.endOfMibView);
555         sr.getStatus().setPhaseComplete(true);
556       }
557       return;
558     }
559     else {
560       sreq.getStatus().setProcessed(false);
561     }
562     DefaultMOContextScope nscope = (DefaultMOContextScope) sreq.getScope();
563     nscope.substractScope(srange);
564     nscope.setUpperBound(null);
565     nscope.setUpperIncluded(true);
566     // reset query because scope changed!
567
sreq.setQuery(null);
568   }
569
570   protected void processAgentXBulkResponse(AgentXPending pending,
571                                            AgentXResponsePDU pdu) {
572     if (pdu.getErrorStatus() != PDU.noError) {
573       processsErrorResponse(pending, pdu);
574     }
575     else {
576       AgentXGetBulkPDU requestPDU = (AgentXGetBulkPDU) pending.getAgentXPDU();
577       VariableBinding[] vbs = pdu.getVariableBindings();
578       int numBindings = vbs.length;
579       int repeaters =
580           requestPDU.getRanges().length - requestPDU.getNonRepeaters();
581       if (numBindings - requestPDU.getNonRepeaters() >
582           requestPDU.getMaxRepetitions() * repeaters) {
583         LOGGER.warn("Bulk response with more repetitions ("+
584                     ((numBindings - requestPDU.getNonRepeaters())/ repeaters)+
585                     ") than max rep. "+requestPDU.getMaxRepetitions());
586         numBindings = requestPDU.getMaxRepetitions() * repeaters
587             + requestPDU.getNonRepeaters();
588       }
589       if (numBindings == 0) {
590         // this is IMHO outside the AgentX/SNMP spec but it is in fact
591
// needed to be interoperable with NET-SNMP sub-agent
592
AgentXRequestPDU axReqPDU = (AgentXRequestPDU) pending.getAgentXPDU();
593         SubRequestIterator subRequests = pending.getReferences();
594         for (int i=0; subRequests.hasNext(); i++) {
595           SnmpSubRequest sreq = (SnmpSubRequest) subRequests.nextSubRequest();
596           MOScope srange = axReqPDU.getRanges()[i];
597           processEndOfMibView(sreq, srange, null);
598         }
599       }
600       else {
601         // process non repeaters first
602
SubRequestIterator it =
603             processAgentXNextResponse(pending, pdu, requestPDU.getNonRepeaters());
604         int nonRep = requestPDU.getNonRepeaters();
605         for (int c = 0;
606              (c+nonRep < requestPDU.getRanges().length) && it.hasNext(); c++) {
607           int rangeIndex = c + nonRep;
608           SnmpSubRequest sreq = (SnmpSubRequest) it.nextSubRequest();
609           SubRequestIterator rsreq = sreq.repetitions();
610           for (int r = 0; (nonRep + (r * repeaters) + c < numBindings) &&
611                rsreq.hasNext(); r++) {
612             SnmpSubRequest repetition = (SnmpSubRequest) rsreq.nextSubRequest();
613 /*
614             System.err.println("nr="+nonRep+",r="+r+",repeaters="+repeaters+
615                 ",c="+c+",rangeIndex="+rangeIndex+",rep="+repetition);
616 */

617             processNextSubRequest(vbs, requestPDU, nonRep + (r * repeaters) + c,
618                                   rangeIndex, repetition);
619           }
620         }
621       }
622     }
623   }
624
625   protected static void processsErrorResponse(AgentXPending pending,
626                                               AgentXResponsePDU pdu) throws
627       NoSuchElementException
628   {
629     SubRequestIterator subRequests = pending.getReferences();
630     for (int i=1; i<pdu.getErrorIndex(); i++) {
631       if (subRequests.hasNext()) {
632         subRequests.next();
633       }
634       else {
635         pending.getRequest().setErrorStatus(PDU.genErr);
636         return;
637       }
638     }
639     if (subRequests.hasNext()) {
640       SubRequest sreq = subRequests.nextSubRequest();
641       RequestStatus status = sreq.getStatus();
642       status.setErrorStatus(pdu.getErrorStatus());
643     }
644     else {
645       pending.getRequest().setErrorStatus(PDU.genErr);
646     }
647   }
648
649
650   private static boolean checkAgentXResponse(AgentXResponsePDU pdu,
651                                              AgentXPending pending) {
652     switch (pending.getAgentXPDU().getType()) {
653       case AgentXPDU.AGENTX_GET_PDU:
654       case AgentXPDU.AGENTX_GETNEXT_PDU: {
655         if (((AgentXRequestPDU) pending.getAgentXPDU()).getRanges().length !=
656             pdu.size()) {
657           pending.getRequest().setErrorStatus(PDU.genErr);
658           return false;
659         }
660         break;
661       }
662       default: {
663         // no check?
664
}
665     }
666     return true;
667   }
668
669   protected AgentXResponsePDU ping(AgentXPingPDU pdu,
670                                    AgentXMasterSession session) {
671     AgentXResponsePDU response = createResponse(pdu, session);
672     if (!checkIfContextIsSupported(pdu.getContext())) {
673       response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
674       return response;
675     }
676     return response;
677   }
678
679   protected AgentXResponsePDU notify(AgentXNotifyPDU pdu,
680                                      AgentXMasterSession session) {
681     AgentXResponsePDU response = createResponse(pdu, session);
682     if (session != null) {
683       if (!checkIfContextIsSupported(pdu.getContext())) {
684         response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
685         return response;
686       }
687       VariableBinding[] vbs = pdu.getVariableBindings();
688       response.setVariableBindings(vbs);
689       int payloadIndex = 1;
690       OID trapoid = null;
691       TimeTicks timestamp = new TimeTicks(getContextSysUpTime(DEFAULT_CONTEXT));
692
693       if (vbs.length >= 1) {
694         if (SnmpConstants.sysUpTime.equals(vbs[0].getOid())) {
695           payloadIndex++;
696           if ((vbs.length < 2) ||
697               (!SnmpConstants.snmpTrapOID.equals(vbs[1].getOid()))) {
698             response.setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
699             response.setErrorIndex(2);
700           }
701           else {
702             timestamp = (TimeTicks) vbs[0].getVariable();
703             trapoid = (OID) vbs[1].getVariable();
704           }
705         }
706         else if (SnmpConstants.snmpTrapOID.equals(vbs[0].getOid())) {
707           trapoid = (OID) vbs[0].getVariable();
708         }
709         else {
710           response.setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
711           response.setErrorIndex(1);
712         }
713       }
714       if (trapoid != null) {
715         VariableBinding[] pvbs = new VariableBinding[vbs.length - payloadIndex];
716         System.arraycopy(vbs, payloadIndex, pvbs, 0, pvbs.length);
717         notify(pdu.getContext(), trapoid, timestamp, pvbs);
718       }
719     }
720     return response;
721   }
722
723   protected TimeTicks getContextSysUpTime(OctetString context) {
724     MasterContextInfo info = (MasterContextInfo) contextInfo.get(context);
725     SysUpTime contextSysUpTime;
726     if (info == null) {
727       MOContextScope scope =
728           new DefaultMOContextScope(context,
729                                     SnmpConstants.sysUpTime, true,
730                                     SnmpConstants.sysUpTime, true);
731       ManagedObject mo = server.lookup(new DefaultMOQuery(scope));
732       if (mo instanceof SysUpTime) {
733         contextSysUpTime = (SysUpTime) mo;
734       }
735       else {
736         /**@todo May be we can use an integer of the found object to
737          * initialize the time?
738          */

739         LOGGER.warn("SysUpTime could not be found in '"+context+
740                     "' context, using a new instance instead");
741         contextSysUpTime = new SysUpTimeImpl();
742       }
743       contextInfo.put(context,
744                       new MasterContextInfo(context, contextSysUpTime));
745     }
746     else {
747       contextSysUpTime = info.getUpTime();
748     }
749     if (contextSysUpTime != null) {
750       return contextSysUpTime.get();
751     }
752     return null;
753   }
754
755   public AgentXResponsePDU addAgentCaps(AgentXAddAgentCapsPDU pdu,
756                                         AgentXMasterSession session) {
757     AgentXResponsePDU response = createResponse(pdu, session);
758     if (session != null) {
759       if (!checkIfContextIsSupported(pdu.getContext())) {
760         response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
761         return response;
762       }
763       AgentCapabilityList agentCaps = getAgentCaps(pdu.getContext());
764       if (agentCaps != null) {
765         OID index = agentCaps.addSysOREntry(pdu.getId(), pdu.getDescr());
766         session.addAgentCaps(pdu.getId(), index);
767       }
768     }
769     return response;
770   }
771
772   protected AgentCapabilityList getAgentCaps(OctetString contextName) {
773     MOContextScope scope =
774         new DefaultMOContextScope(contextName,
775                                   SnmpConstants.sysOREntry, true,
776                                   SnmpConstants.sysOREntry, true);
777     ManagedObject mo = server.lookup(new DefaultMOQuery(scope));
778     if (mo instanceof AgentCapabilityList) {
779       return (AgentCapabilityList)mo;
780     }
781     else {
782       LOGGER.warn("SysOREntry managed object for context "+contextName+
783                   " not found, instead found: "+mo);
784     }
785     return null;
786   }
787
788   public AgentXResponsePDU removeAgentCaps(AgentXRemoveAgentCapsPDU pdu,
789                                            AgentXMasterSession session) {
790     AgentXResponsePDU response = createResponse(pdu, session);
791     if (session != null) {
792       OID index = session.removeAgentCaps(pdu.getId());
793       AgentCapabilityList agentCaps = getAgentCaps(pdu.getContext());
794       if (agentCaps != null) {
795         Object JavaDoc ac = agentCaps.removeSysOREntry(index);
796         if (ac == null) {
797           response.setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_AGENTCAPS);
798         }
799       }
800       else {
801         response.setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_AGENTCAPS);
802       }
803     }
804     return response;
805   }
806
807   public AgentXResponsePDU closeSession(AgentXClosePDU pdu,
808                                         AgentXMasterSession session) {
809     if (LOGGER.isInfoEnabled()) {
810       LOGGER.info("Subagent is closing session "+session+
811                   " because "+pdu.getReason());
812     }
813     AgentXResponsePDU response = createResponse(pdu, session);
814     if (session != null) {
815       removeSession(session.getSessionID());
816       removeAllRegistrations(session);
817       session.setClosed(true);
818     }
819     return response;
820   }
821
822   public void closeSession(AgentXMasterSession session,
823                            byte reason) {
824     if (LOGGER.isInfoEnabled()) {
825       LOGGER.info("Closing sub-agent session "+session+" because "+reason);
826     }
827     AgentXClosePDU closePDU = new AgentXClosePDU(reason);
828     try {
829       agentX.send(closePDU,
830                   session.createAgentXTarget(),
831                   session.getPeer().getTransport(),
832                   new AgentXPendingClose(session, closePDU), this);
833     }
834     catch (IOException JavaDoc ex) {
835       LOGGER.error("Failed to send CloseSessionPDU to close session "+session+
836                    ": "+ex.getMessage(), ex);
837     }
838     removeSession(session.getSessionID());
839     removeAllRegistrations(session);
840     session.setClosed(true);
841   }
842
843
844   protected synchronized void removeAllRegistrations(AgentXMasterSession session) {
845     if (LOGGER.isDebugEnabled()) {
846       LOGGER.debug("Removing all registrations (out of "+registrations.size()+
847                    ") of session "+session);
848     }
849     for (Iterator it = registrations.iterator(); it.hasNext(); ) {
850       AgentXRegEntry r = (AgentXRegEntry) it.next();
851       if (r.getSession().equals(session)) {
852         removeRegistration(r, it);
853       }
854     }
855   }
856
857   protected AgentXMasterSession getSession(int sessionID) {
858     return (AgentXMasterSession) sessions.get(new Integer JavaDoc(sessionID));
859   }
860
861   protected synchronized AgentXMasterSession getSession(AgentXPDU pdu) {
862     int sessionID = pdu.getSessionID();
863     return getSession(sessionID);
864   }
865
866   protected AgentXResponsePDU register(AgentXRegisterPDU pdu,
867                                        AgentXCommandEvent command,
868                                        AgentXMasterSession session) {
869     AgentXResponsePDU response = createResponse(pdu, session);
870     if (session != null) {
871       if (!checkIfContextIsSupported(pdu.getContext())) {
872         response.setErrorStatus(AgentXProtocol.AGENTX_UNSUPPORTED_CONTEXT);
873         return response;
874       }
875       AgentXRegEntry regEntry =
876           new AgentXRegEntry(session,
877                              pdu.getRegion(),
878                              pdu.getPriority(),
879                              pdu.getContext(),
880                              pdu.getTimeout());
881       if (isDuplicate(regEntry)) {
882         response.setErrorStatus(AgentXProtocol.AGENTX_DUPLICATE_REGISTRATION);
883         return response;
884       }
885       AgentXMasterEvent event =
886           new AgentXMasterEvent(this, AgentXMasterEvent.REGISTRATION_TO_ADD,
887                                 regEntry);
888       fireMasterChanged(event);
889       if (event.getVetoReason() == AgentXProtocol.AGENTX_SUCCESS) {
890         try {
891           addRegistration(regEntry);
892         }
893         catch (DuplicateRegistrationException drex) {
894           if (LOGGER.isDebugEnabled()) {
895             drex.printStackTrace();
896           }
897           response.setErrorStatus(AgentXProtocol.AGENTX_DUPLICATE_REGISTRATION);
898           return response;
899         }
900       }
901       else {
902         response.setErrorStatus(event.getVetoReason());
903       }
904     }
905     return response;
906   }
907
908   protected AgentXResponsePDU unregister(AgentXUnregisterPDU pdu,
909                                          AgentXCommandEvent event,
910                                          AgentXMasterSession session) {
911     AgentXResponsePDU response = createResponse(pdu, session);
912     if (session != null) {
913       AgentXRegEntry regEntry =
914           new AgentXRegEntry(session,
915                              pdu.getRegion(),
916                              pdu.getPriority(),
917                              pdu.getContext(),
918                              pdu.getTimeout());
919       boolean found = false;
920       for (Iterator it = registrations.iterator(); it.hasNext(); ) {
921         AgentXRegEntry r = (AgentXRegEntry) it.next();
922         if (r.equals(regEntry)) {
923           found = true;
924           if (!removeRegistration(r, it)) {
925             response.setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_REGISTRATION);
926           }
927           break;
928         }
929       }
930       if (!found) {
931         response.setErrorStatus(AgentXProtocol.AGENTX_UNKNOWN_REGISTRATION);
932       }
933     }
934     return response;
935   }
936
937   protected synchronized boolean isDuplicate(AgentXRegEntry registration) {
938     if (registrations.contains(registration)) {
939       if (LOGGER.isDebugEnabled()) {
940         LOGGER.debug("Identical registration attempt for "+registration);
941       }
942       return true;
943     }
944     AgentXNodeQuery query =
945         new AgentXNodeQuery(registration.getContext(),
946                             registration.getRegion(),
947                             AgentXNodeQuery.QUERY_NON_AGENTX_NODES);
948     ManagedObject mo = server.lookup(query);
949     if (mo != null) {
950       // overlaps non AgentX --> return duplicate region error
951
if (LOGGER.isDebugEnabled()) {
952         LOGGER.debug("New registration is rejected as duplicate because it "+
953                      "overlaps with non AgentX managed object: "+mo);
954       }
955       return true;
956     }
957     return false;
958   }
959
960   protected synchronized void addRegistration(AgentXRegEntry registration)
961       throws DuplicateRegistrationException
962   {
963     registrations.add(registration);
964     if (registration.getRegion().isRange()) {
965       AgentXRegion r = registration.getRegion();
966       long start = r.getLowerBoundSubID() & 0xFFFFFFFFL;
967       long stop = r.getUpperBoundSubID() & 0xFFFFFFFFL;
968       if (start > stop) {
969         LOGGER.warn("Empty range registration "+registration);
970       }
971       else {
972         for (long s = start; s <= stop; s++) {
973           OID root = new OID(r.getLowerBound());
974           root.set(r.getRangeSubID()-1, (int)s);
975           AgentXRegion sr = new AgentXRegion(root, root.nextPeer());
976           addRegion(registration, sr);
977         }
978       }
979     }
980     else {
981       addRegion(registration, registration.getRegion());
982     }
983     AgentXMasterEvent e =
984         new AgentXMasterEvent(this, AgentXMasterEvent.REGISTRATION_ADDED,
985                               registration);
986     fireMasterChanged(e);
987   }
988
989   private static AgentXNodeQuery nextQuery(AgentXNodeQuery lastQuery,
990                                            AgentXNode lastNode) {
991     if (lastNode != null) {
992       lastQuery.getMutableScope().setLowerBound(
993           lastNode.getScope().getUpperBound());
994       lastQuery.getMutableScope().setLowerIncluded(
995           !lastNode.getScope().isUpperIncluded());
996     }
997     return lastQuery;
998   }
999
1000  protected synchronized void addRegion(AgentXRegEntry registration,
1001                                        AgentXRegion region) throws
1002      DuplicateRegistrationException
1003  {
1004    if (region.isRange()) {
1005      String JavaDoc errText = "Regions with range cannot be added";
1006      LOGGER.error(errText);
1007      throw new IllegalArgumentException JavaDoc(errText);
1008    }
1009    AgentXNodeQuery query =
1010        new AgentXNodeQuery(registration.getContext(),
1011                            region,
1012                            AgentXNodeQuery.QUERY_AGENTX_NODES);
1013    AgentXNode lastNode = null;
1014    AgentXNode node = (AgentXNode)server.lookup(query);
1015    if (node != null) {
1016      LinkedList splitted = new LinkedList();
1017      AgentXRegion r1 = new AgentXRegion(region);
1018      for (; (node != null);
1019           node = (AgentXNode) server.lookup(nextQuery(query, lastNode))) {
1020        AgentXRegion r2 = new AgentXRegion(node.getScope().getLowerBound(),
1021                                           node.getScope().getUpperBound());
1022        if (LOGGER.isDebugEnabled()) {
1023          LOGGER.debug("Affected region r2="+r2+
1024                       " from registered region r1="+r1);
1025        }
1026        if (r2.covers(r1)) {
1027          if (LOGGER.isDebugEnabled()) {
1028            LOGGER.debug("Region r2 covers r1 (r1="+r1+",r2="+r2+")");
1029          }
1030          oldRegionCoversNew(registration, node, splitted, r1, r2);
1031          r1 = null;
1032        }
1033        else if (r1.covers(r2)) {
1034          if (LOGGER.isDebugEnabled()) {
1035            LOGGER.debug("Region r1 covers r2 (r1="+r1+",r2="+r2+")");
1036          }
1037          r1 = newRegionCoversOld(registration, lastNode,
1038                                  node, splitted, r1, r2);
1039        }
1040        else if ((r1.isOverlapping(r2)) &&
1041                 (r2.getLowerBound().compareTo(r1.getLowerBound()) < 0)) {
1042          if (LOGGER.isDebugEnabled()) {
1043            LOGGER.debug("Region r1 ovelaps r2 and r2 < r1 (r1="+r1+
1044                         ",r2="+r2+")");
1045          }
1046          if (LOGGER.isDebugEnabled()) {
1047            LOGGER.debug("Shrinking node "+node+
1048                         " to "+r1.getLowerBound());
1049          }
1050          node.shrink(r1.getLowerBound());
1051          AgentXNode r2b =
1052              node.getClone(new AgentXRegion(r1.getLowerBound(),
1053                                             r2.getUpperBound()));
1054          r2b.addRegistration(registration);
1055          splitted.add(r2b);
1056          r1 = new AgentXRegion(r2.getUpperBound(), r1.getLowerBound());
1057        }
1058        // r1.overlaps(r2) and (r1.get_lower() < r2.get_lower())
1059
else {
1060          if (LOGGER.isDebugEnabled()) {
1061            LOGGER.debug("Region r1 ovelaps r2 and r1 < r2 (r1="+
1062                         r1+",r2="+r2+")");
1063          }
1064          if (LOGGER.isDebugEnabled()) {
1065            LOGGER.debug("Shrinking node "+node+
1066                        " to "+r1.getUpperBound());
1067          }
1068          node.shrink(r1.getUpperBound());
1069          AgentXNode r2b =
1070              node.getClone(new AgentXRegion(r1.getUpperBound(),
1071                                             r2.getUpperBound()));
1072          node.addRegistration(registration);
1073          splitted.add(r2b);
1074          AgentXNode r1a =
1075            new AgentXNode(new AgentXRegion(r1.getLowerBound(),
1076                                            r2.getLowerBound()), registration);
1077          splitted.add(r1a);
1078          r1 = null;
1079        }
1080        if (r1 != null) {
1081          if (r1.isEmpty()) {
1082            splitted.add(new AgentXNode(region, registration));
1083          }
1084          else {
1085            splitted.add(new AgentXNode(r1, registration));
1086          }
1087        }
1088        lastNode = node;
1089      }
1090      for (Iterator it = splitted.iterator(); it.hasNext(); ) {
1091        AgentXNode n = (AgentXNode) it.next();
1092        server.register(n, registration.getContext());
1093        if (LOGGER.isDebugEnabled()) {
1094          LOGGER.debug("Registered splitted AgentX node: "+n);
1095        }
1096      }
1097    }
1098    else {
1099      node = new AgentXNode(region, registration);
1100      server.register(node, registration.getContext());
1101      if (LOGGER.isDebugEnabled()) {
1102        LOGGER.debug("Registered AgentX node: "+node);
1103      }
1104    }
1105  }
1106
1107  protected boolean removeRegistration(AgentXRegEntry registration,
1108                                       Iterator regIterator) {
1109    LinkedList remove = new LinkedList();
1110    AgentXRegion queryRegion = new AgentXRegion(registration.getRegion());
1111    queryRegion.setUpperIncluded(true);
1112    AgentXNodeQuery query =
1113        new AgentXNodeQuery(registration.getContext(),
1114                            queryRegion,
1115                            AgentXNodeQuery.QUERY_AGENTX_NODES);
1116    AgentXNode lastNode = null;
1117    AgentXNode node = (AgentXNode)server.lookup(query);
1118    if (node != null) {
1119      for (; (node != null);
1120           node = (AgentXNode) server.lookup(nextQuery(query, lastNode))) {
1121        if (node == lastNode) {
1122          break;
1123        }
1124        if ((node.removeRegistration(registration)) &&
1125            (node.getRegistrationCount() == 0)) {
1126          remove.add(node);
1127        }
1128        else {
1129          if ((lastNode != null) &&
1130              (lastNode.getRegistrationCount() == 1) &&
1131              (node.getRegistrationCount() == 1) &&
1132              (lastNode.getScope().getUpperBound().equals(
1133                  node.getScope().getLowerBound())) &&
1134              (node.getActiveRegistration().equals(
1135                   lastNode.getActiveRegistration()))) {
1136            AgentXRegion r =
1137                new AgentXRegion(node.getScope().getLowerBound(),
1138                                 lastNode.getScope().getUpperBound());
1139            if (node.getActiveRegistration().getRegion().covers(r)) {
1140              remove.add(node);
1141              lastNode.expand(node.getScope().getUpperBound(), false);
1142            }
1143          }
1144        }
1145        lastNode = node;
1146      }
1147    }
1148    else {
1149      LOGGER.warn("A registration is removed with not associated subtree: "+
1150                  registration);
1151    }
1152    for (Iterator it = remove.iterator(); it.hasNext(); ) {
1153      AgentXNode rnode = (AgentXNode) it.next();
1154      server.unregister(rnode, registration.getContext());
1155    }
1156    if (regIterator != null) {
1157      regIterator.remove();
1158      if (LOGGER.isDebugEnabled()) {
1159        LOGGER.debug("Removed registration "+registration+
1160                     " by session close, "+registrations.size()+" left.");
1161      }
1162      fireMasterChanged(new AgentXMasterEvent(this,
1163                                              AgentXMasterEvent.REGISTRATION_REMOVED,
1164                                              registration));
1165      return true;
1166    }
1167    else if (registrations.remove(registration)) {
1168      if (LOGGER.isDebugEnabled()) {
1169        LOGGER.debug("Removed registration "+registration+
1170                     ", "+registrations.size()+" left.");
1171      }
1172      fireMasterChanged(new AgentXMasterEvent(this,
1173                                              AgentXMasterEvent.REGISTRATION_REMOVED,
1174                                              registration));
1175      return true;
1176    }
1177    return false;
1178  }
1179
1180  private static AgentXRegion newRegionCoversOld(AgentXRegEntry registration,
1181                                                 AgentXNode lastNode,
1182                                                 AgentXNode node,
1183                                                 LinkedList splitted,
1184                                                 AgentXRegion r1,
1185                                                 AgentXRegion r2) {
1186    AgentXNode r1a = null;
1187    if (lastNode != null) {
1188      AgentXRegion r =
1189          new AgentXRegion(lastNode.getScope().getUpperBound(),
1190                           r2.getLowerBound());
1191      r1a = new AgentXNode(r, registration);
1192    }
1193    else {
1194      AgentXRegion r =
1195          new AgentXRegion(r1.getLowerBound(), r2.getLowerBound());
1196      r1a = new AgentXNode(r, registration);
1197    }
1198    if (!splitted.isEmpty()) {
1199      if (LOGGER.isDebugEnabled()) {
1200        LOGGER.debug("Shrinking node "+splitted.getLast()+
1201                     " to "+r2.getLowerBound());
1202      }
1203      ((AgentXNode)splitted.getLast()).shrink(r2.getLowerBound());
1204    }
1205    node.addRegistration(registration);
1206    if ((r1.getLowerBound().equals(r2.getLowerBound())) ||
1207        ((!splitted.isEmpty()) &&
1208         (((AgentXNode)splitted.getLast()).
1209          getScope().equals(r1a.getScope())))) {
1210      r1a = null;
1211    }
1212    else {
1213      splitted.add(r1a);
1214    }
1215    return new AgentXRegion(r2.getUpperBound(), r1.getUpperBound());
1216  }
1217
1218  private static void oldRegionCoversNew(AgentXRegEntry registration,
1219                                         AgentXNode node,
1220                                         List splitted,
1221                                         AgentXRegion r1,
1222                                         AgentXRegion r2) {
1223    AgentXRegion r = new AgentXRegion(r1.getUpperBound(),
1224                                      node.getScope().getUpperBound());
1225    AgentXNode r2c = node.getClone(r);
1226    if (r2.getLowerBound().equals(r1.getLowerBound())) {
1227      if (LOGGER.isDebugEnabled()) {
1228        LOGGER.debug("Shrinking node "+node+" to "+r1.getUpperBound());
1229      }
1230      node.shrink(r1.getUpperBound());
1231      node.addRegistration(registration);
1232    }
1233    else {
1234      if (LOGGER.isDebugEnabled()) {
1235        LOGGER.debug("Shrinking node "+node+" to "+r1.getLowerBound());
1236      }
1237      node.shrink(r1.getLowerBound());
1238      AgentXNode r2b = node.getClone(r1);
1239      r2b.addRegistration(registration);
1240      splitted.add(r2b);
1241    }
1242    splitted.add(r2c);
1243  }
1244
1245  public AgentXResponsePDU openSession(AgentXOpenPDU pdu,
1246                                       AgentXCommandEvent event) {
1247    AgentXMasterSession session =
1248        new AgentXMasterSession(getNextSessionID(), agentXQueue,
1249                                pdu.getSubagentID(), pdu.getSubagentDescr());
1250    AgentXPeer peer = getPeer(event.getPeerAddress());
1251    if (peer == null) {
1252      peer = new AgentXPeer(event.getPeerTransport(), event.getPeerAddress());
1253      addPeer(peer);
1254      LOGGER.warn("Added peer during session opening: "+peer+
1255                  " (peer should have been there already due "+
1256                  "to connection setup)");
1257    }
1258    session.setPeer(peer);
1259    session.setAgentXVersion(pdu.getVersion() & 0xFF);
1260    if (pdu.getTimeout() != 0) {
1261      session.setTimeout(pdu.getTimeout());
1262    }
1263    else {
1264      session.setTimeout(defaultTimeout);
1265    }
1266    int sessionAccepted = acceptSession(session);
1267    if (sessionAccepted == AgentXProtocol.AGENTX_SUCCESS) {
1268      addSession(session);
1269      if (LOGGER.isInfoEnabled()) {
1270        LOGGER.info("Session " + session + " opened from "+peer.getAddress());
1271      }
1272    }
1273    else {
1274      LOGGER.warn("Session open rejected because "+sessionAccepted+" for "+
1275                  session+" from "+event.getPeerAddress());
1276    }
1277    AgentXResponsePDU response = createResponse(pdu, session);
1278    response.setErrorStatus((short)sessionAccepted);
1279    return response;
1280  }
1281
1282  protected synchronized void addPeer(AgentXPeer peer) {
1283    peers.put(peer.getAddress(), peer);
1284    fireMasterChanged(new AgentXMasterEvent(this,
1285                                            AgentXMasterEvent.PEER_ADDED,
1286                                            peer));
1287  }
1288
1289  protected synchronized AgentXPeer getPeer(Address address) {
1290    return (AgentXPeer) peers.get(address);
1291  }
1292
1293  protected int acceptSession(AgentXMasterSession session) {
1294    AgentXMasterEvent event =
1295        new AgentXMasterEvent(this, AgentXMasterEvent.SESSION_ADDED, session);
1296    fireMasterChanged(event);
1297    return event.getVetoReason();
1298  }
1299
1300  protected synchronized void addSession(AgentXMasterSession session) {
1301    sessions.put(new Integer JavaDoc(session.getSessionID()), session);
1302    fireMasterChanged(new AgentXMasterEvent(this,
1303                                            AgentXMasterEvent.SESSION_ADDED,
1304                                            session));
1305  }
1306
1307  protected synchronized AgentXMasterSession removeSession(int sessionID) {
1308    AgentXMasterSession session =
1309        (AgentXMasterSession) sessions.remove(new Integer JavaDoc(sessionID));
1310    if (session != null) {
1311      fireMasterChanged(new AgentXMasterEvent(this,
1312                                              AgentXMasterEvent.SESSION_REMOVED,
1313                                              session));
1314    }
1315    return session;
1316  }
1317
1318  protected AgentXResponsePDU createResponse(AgentXPDU request,
1319                                             AgentXSession session) {
1320    OctetString context = DEFAULT_CONTEXT;
1321    if (request instanceof AgentXContextPDU) {
1322      OctetString reqContext = ((AgentXContextPDU) request).getContext();
1323      if (server.isContextSupported(reqContext)) {
1324        context = reqContext;
1325      }
1326    }
1327    AgentXResponsePDU response =
1328        new AgentXResponsePDU(getContextSysUpTime(context).toInt(),
1329                              (short)0, (short)0);
1330    if (session == null) {
1331      response.setSessionID(request.getSessionID());
1332      response.setErrorStatus(AgentXProtocol.AGENTX_NOT_OPEN);
1333    }
1334    else {
1335      response.setSessionID(session.getSessionID());
1336    }
1337    response.setPacketID(request.getPacketID());
1338    response.setTransactionID(request.getTransactionID());
1339    response.setByteOrder(request.getByteOrder());
1340    return response;
1341  }
1342
1343  protected void sendResponse(AgentXPDU response, AgentXSession session) {
1344    if (LOGGER.isDebugEnabled()) {
1345      LOGGER.debug("Sending AgentX response "+response+" to session "+session);
1346    }
1347    try {
1348      agentX.send(response,
1349                  session.createAgentXTarget(), session.getPeer().getTransport());
1350    }
1351    catch (IOException JavaDoc ex) {
1352      if (LOGGER.isDebugEnabled()) {
1353        ex.printStackTrace();
1354      }
1355      LOGGER.error("Failed to send AgentX response "+response+" to session "+
1356                   session+" because: "+ex.getMessage(), ex);
1357    }
1358  }
1359
1360  public synchronized void connectionStateChanged(TransportStateEvent change) {
1361    Address peerAddress = change.getPeerAddress();
1362    switch (change.getNewState()) {
1363      case TransportStateEvent.STATE_CLOSED:
1364      case TransportStateEvent.STATE_DISCONNECTED_REMOTELY:
1365      case TransportStateEvent.STATE_DISCONNECTED_TIMEOUT: {
1366        AgentXPeer removedPeer = removePeer(peerAddress);
1367        fireMasterChanged(new AgentXMasterEvent(this,
1368                                                AgentXMasterEvent.PEER_REMOVED,
1369                                                removedPeer));
1370        break;
1371      }
1372      default: {
1373        AgentXPeer newPeer =
1374            new AgentXPeer((TransportMapping)change.getSource(), peerAddress);
1375        addPeer(newPeer);
1376      }
1377    }
1378  }
1379
1380  protected synchronized AgentXPeer removePeer(Address peerAddress) {
1381    AgentXPeer peer = (AgentXPeer) peers.remove(peerAddress);
1382    if (peer != null) {
1383      peer.setClosing(true);
1384      for (Iterator it = sessions.values().iterator(); it.hasNext(); ) {
1385        AgentXMasterSession session = (AgentXMasterSession) it.next();
1386        if (session.getPeer().equals(peer)) {
1387          it.remove();
1388          fireMasterChanged(new AgentXMasterEvent(this,
1389                                                  AgentXMasterEvent.SESSION_REMOVED,
1390                                                  session));
1391          indexRegistry.release(session.getSessionID());
1392          removeAllRegistrations(session);
1393          session.setClosed(true);
1394          if (peer.getTransport() instanceof ConnectionOrientedTransportMapping) {
1395            try {
1396              ((ConnectionOrientedTransportMapping)peer.getTransport()).
1397                  close(peer.getAddress());
1398            }
1399            catch (IOException JavaDoc iox) {
1400              LOGGER.warn("Caught exception while closing transport: " +
1401                          iox.getMessage());
1402            }
1403          }
1404        }
1405      }
1406/* Optional code for debugging of registry issues:
1407      if (LOGGER.isDebugEnabled()) {
1408        if (server instanceof DefaultMOServer) {
1409          SortedMap registry = ((DefaultMOServer) server).getRegistry();
1410          System.err.println(registry.toString());
1411        }
1412      }
1413*/

1414    }
1415    else {
1416      LOGGER.warn("Tried to remove peer with address "+peerAddress+
1417                  " which is not part of peer list: "+peers);
1418    }
1419    return peer;
1420  }
1421
1422  protected synchronized AgentXPeer closePeer(Address peerAddress, byte reason) {
1423    AgentXPeer peer = (AgentXPeer) peers.remove(peerAddress);
1424    if (peer != null) {
1425      peer.setClosing(true);
1426      Map s = new HashMap(sessions);
1427      for (Iterator it = s.values().iterator(); it.hasNext(); ) {
1428        AgentXMasterSession session = (AgentXMasterSession) it.next();
1429        if (session.getPeer().equals(peer)) {
1430          closeSession(session, reason);
1431          if (peer.getTransport() instanceof ConnectionOrientedTransportMapping) {
1432            try {
1433              ((ConnectionOrientedTransportMapping)peer.getTransport()).
1434                  close(peer.getAddress());
1435            }
1436            catch (IOException JavaDoc iox) {
1437              LOGGER.warn("Caught exception while closing transport: " +
1438                          iox.getMessage());
1439            }
1440          }
1441        }
1442      }
1443    }
1444    else {
1445      LOGGER.warn("Tried to remove peer with address "+peerAddress+
1446                  " which is not part of peer list: "+peers);
1447    }
1448    return peer;
1449  }
1450
1451  public byte getAgentXVersion() {
1452    return AgentXProtocol.VERSION_1_0;
1453  }
1454
1455  public synchronized void addAgentXMasterListener(AgentXMasterListener l) {
1456    if (agentXMasterListeners == null) {
1457      agentXMasterListeners = new Vector(2);
1458    }
1459    agentXMasterListeners.add(l);
1460  }
1461
1462  public synchronized void removeAgentXMasterListener(AgentXMasterListener l) {
1463    if (agentXMasterListeners != null) {
1464      agentXMasterListeners.remove(l);
1465    }
1466  }
1467
1468  protected void fireMasterChanged(AgentXMasterEvent event) {
1469    if (agentXMasterListeners != null) {
1470      Vector listeners = agentXMasterListeners;
1471      int count = listeners.size();
1472      for (int i = 0; i < count; i++) {
1473        try {
1474          ((AgentXMasterListener) listeners.get(i)).masterChanged(event);
1475        }
1476        catch (Exception JavaDoc ex) {
1477          LOGGER.error("AgentXMasterListener "+listeners.get(i)+
1478                       " threw exception on "+event+": "+ex.getMessage(), ex);
1479        }
1480      }
1481    }
1482  }
1483
1484  protected static class AgentXRegEntryComparator implements Comparator {
1485
1486    public int compare(Object JavaDoc o1, Object JavaDoc o2) {
1487      AgentXRegEntry a = (AgentXRegEntry)o1;
1488      AgentXRegEntry b = (AgentXRegEntry)o2;
1489      int c = a.getRegion().compareTo(b.getRegion());
1490      if (c == 0) {
1491        c = a.getContext().compareTo(b.getContext());
1492      }
1493      return c;
1494    }
1495  }
1496
1497  public void onResponse(AgentXResponseEvent event) {
1498    AgentXResponsePDU pdu = event.getResponse();
1499    AgentXPending pending = (AgentXPending) event.getUserObject();
1500    if (LOGGER.isDebugEnabled()) {
1501      LOGGER.debug("Processing AgentX response "+pdu+" for request "+pending);
1502    }
1503    if (pending.getRequest() != null) {
1504      AgentXPending p =
1505          agentXQueue.remove(pending.getAgentXPDU().getSessionID(),
1506                             pending.getRequest().getTransactionID());
1507      if (p == null) {
1508        LOGGER.warn("Pending AgentX request not found (may be timed out already): " +
1509                    "Received AgentX response from " + event.getPeerAddress() +
1510                    " for request " + event.getUserObject() +
1511                    " does not match any pending request:" + pdu);
1512        return;
1513      }
1514    }
1515    if ((pdu == null) &&
1516        (pending.getAgentXPDU().getType() != AgentXPDU.AGENTX_CLOSE_PDU)) {
1517      pending.getSession().incConsecutiveTimeouts();
1518      pending.getReferences().
1519          nextSubRequest().getStatus().setErrorStatus(PDU.genErr);
1520      if (pending.getSession().getConsecutiveTimeouts() >
1521          maxConsecutiveTimeouts) {
1522        closeSession(pending.getSession(), AgentXProtocol.REASON_TIMEOUTS);
1523      }
1524    }
1525    if (pdu != null) {
1526      pending.getSession().clearConsecutiveTimeouts();
1527    }
1528    if (requestList.contains(pending.getRequest())) {
1529      if (pdu != null) {
1530        if (checkAgentXResponse(pdu, pending)) {
1531          switch (pending.getAgentXPDU().getType()) {
1532            case AgentXPDU.AGENTX_GET_PDU: {
1533              processAgentXGetResponse(pending, pdu);
1534              break;
1535            }
1536            case AgentXPDU.AGENTX_GETNEXT_PDU: {
1537              processAgentXGetNextResponse(pending, pdu);
1538              break;
1539            }
1540            case AgentXPDU.AGENTX_GETBULK_PDU: {
1541              processAgentXBulkResponse(pending, pdu);
1542              break;
1543            }
1544            case AgentXPDU.AGENTX_CLEANUPSET_PDU:
1545            case AgentXPDU.AGENTX_UNDOSET_PDU:
1546            case AgentXPDU.AGENTX_COMMITSET_PDU:
1547            case AgentXPDU.AGENTX_TESTSET_PDU: {
1548              processAgentXSetResponse(pending, pdu);
1549              break;
1550            }
1551            default: {
1552              LOGGER.warn("Unhandled AgentX response "+pdu);
1553            }
1554          }
1555        }
1556        else {
1557          LOGGER.warn("Invalid AgentX response " + pdu +
1558                      " on request " + pending);
1559        }
1560      }
1561      // reprocess SNMP request
1562
if (!pending.getRequest().isComplete()) {
1563        reprocessRequest(server, pending.getRequest());
1564      }
1565      finalizeRequest((CommandResponderEvent)
1566                      pending.getRequest().getSource(), pending.getRequest(),
1567                      server);
1568    }
1569    else {
1570      if (pending.getAgentXPDU().getType() == AgentXPDU.AGENTX_CLOSE_PDU) {
1571        if (pdu != null) {
1572          LOGGER.info("Subagent " + event.getPeerAddress() +
1573                      " confirmed close, disconnection transport now");
1574        }
1575        else {
1576          LOGGER.info("Subagent "+event.getPeerAddress()+
1577                      " did not answered on session close, "+
1578                      "disconnection now");
1579        }
1580        AgentXPeer peer = pending.getSession().getPeer();
1581        if (peer != null) {
1582          closePeer(peer);
1583        }
1584      }
1585      else {
1586        LOGGER.info("Received late response " + pdu + " on AgentX request: " +
1587                    pending);
1588        super.release(server, pending.getRequest());
1589      }
1590    }
1591  }
1592
1593  protected void processAgentXGetResponse(AgentXPending pending,
1594                                          AgentXResponsePDU pdu) {
1595    if (pdu.getErrorStatus() != PDU.noError) {
1596      processsErrorResponse(pending, pdu);
1597    }
1598    else {
1599      VariableBinding[] vbs = pdu.getVariableBindings();
1600      SubRequestIterator subRequests = pending.getReferences();
1601      for (int i=0; (i<pending.getRequest().size()) &&
1602           subRequests.hasNext(); i++) {
1603        SnmpSubRequest sreq = (SnmpSubRequest) subRequests.nextSubRequest();
1604        sreq.getVariableBinding().setVariable(vbs[i].getVariable());
1605        sreq.getStatus().setPhaseComplete(true);
1606      }
1607    }
1608  }
1609
1610  protected void processAgentXGetNextResponse(AgentXPending pending,
1611                                              AgentXResponsePDU pdu) {
1612    if (pdu.getErrorStatus() != PDU.noError) {
1613      processsErrorResponse(pending, pdu);
1614    }
1615    else {
1616      processAgentXNextResponse(pending, pdu, pending.getRequest().size());
1617    }
1618  }
1619
1620  protected void processAgentXSetResponse(AgentXPending pending,
1621                                          AgentXResponsePDU pdu) {
1622    if (pdu.getErrorStatus() != PDU.noError) {
1623      processsErrorResponse(pending, pdu);
1624    }
1625    else {
1626      SubRequestIterator it = pending.getReferences();
1627      while (it.hasNext()) {
1628        SubRequest sreq = it.nextSubRequest();
1629        sreq.getStatus().setPhaseComplete(true);
1630      }
1631    }
1632  }
1633
1634}
1635
Popular Tags