KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > snmp4j > agent > agentx > subagent > AgentXSubagent


1 /*_############################################################################
2   _##
3   _## SNMP4J-AgentX - AgentXSubagent.java
4   _##
5   _## Copyright (C) 2005-2007 Frank Fock (SNMP4J.org)
6   _##
7   _## This program is free software; you can redistribute it and/or modify
8   _## it under the terms of the GNU General Public License version 2 as
9   _## published by the Free Software Foundation.
10   _##
11   _## This program is distributed in the hope that it will be useful,
12   _## but WITHOUT ANY WARRANTY; without even the implied warranty of
13   _## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14   _## GNU General Public License for more details.
15   _##
16   _## You should have received a copy of the GNU General Public License
17   _## along with this program; if not, write to the Free Software
18   _## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19   _## MA 02110-1301 USA
20   _##
21   _##########################################################################*/

22
23 package org.snmp4j.agent.agentx.subagent;
24
25 import java.io.IOException JavaDoc;
26 import java.util.*;
27
28 import org.snmp4j.PDU;
29 import org.snmp4j.TransportMapping;
30 import org.snmp4j.agent.*;
31 import org.snmp4j.agent.agentx.*;
32 import org.snmp4j.agent.agentx.event.PingEvent;
33 import org.snmp4j.agent.agentx.event.PingListener;
34 import org.snmp4j.agent.mo.MOScalar;
35 import org.snmp4j.agent.mo.snmp.CoexistenceInfo;
36 import org.snmp4j.agent.request.*;
37 import org.snmp4j.log.LogAdapter;
38 import org.snmp4j.log.LogFactory;
39 import org.snmp4j.mp.SnmpConstants;
40 import org.snmp4j.smi.*;
41 import org.snmp4j.transport.ConnectionOrientedTransportMapping;
42 import org.snmp4j.transport.TransportMappings;
43 import org.snmp4j.util.ThreadPool;
44 import org.snmp4j.agent.mo.MOTableRow;
45 import org.snmp4j.agent.agentx.subagent.index.AnyNewIndexOID;
46 import org.snmp4j.agent.agentx.subagent.index.NewIndexOID;
47 import org.snmp4j.agent.mo.snmp.SysUpTime;
48
49 /**
50  * The <code>AgentXSubagent</code> class implements the AgentX communication
51  * for an AgentX subagent implementation.
52  *
53  * @author Frank Fock
54  * @version 1.0
55  */

56 public class AgentXSubagent
57     implements AgentXCommandListener, NotificationOriginator {
58
59   private static final LogAdapter LOGGER =
60       LogFactory.getLogger(AgentXSubagent.class);
61
62   private ArrayList moServers = new ArrayList();
63   private ThreadPool threadPool;
64   private RequestFactory factory;
65   private AgentX agentX;
66   protected Map requestList;
67
68   protected Map peers = new LinkedHashMap(2);
69   protected Map sessions = new Hashtable(2);
70
71
72   protected RequestHandler requestHandlerGet;
73   protected RequestHandler requestHandlerGetNext;
74   protected RequestHandler requestHandlerGetBulk;
75   protected RequestHandler requestHandlerTestSet;
76   protected RequestHandler requestHandlerCommitSet;
77   protected RequestHandler requestHandlerUndoSet;
78   protected RequestHandler requestHandlerCleanupSet;
79
80   protected int nextTransactionID = 0;
81
82   private OID subagentID;
83   private OctetString subagentDescr;
84
85   private long timeout = AgentXProtocol.DEFAULT_TIMEOUT_SECONDS * 1000;
86   private byte defaultPriority = AgentXProtocol.DEFAULT_PRIORITY;
87
88   private Timer pingTimer;
89   private transient Vector pingListeners;
90
91
92   public AgentXSubagent(AgentX agentX,
93                         OID subagentID, OctetString subagentDescr) {
94     this.requestList = Collections.synchronizedMap(new HashMap(10));
95     this.agentX = agentX;
96     this.subagentID = subagentID;
97     this.subagentDescr = subagentDescr;
98     this.factory = new DefaultAgentXRequestFactory();
99     requestHandlerGet = new GetRequestHandler();
100     requestHandlerCleanupSet = new CleanupSetHandler();
101     requestHandlerCommitSet = new CommitSetHandler();
102     requestHandlerTestSet = new TestSetHandler();
103     requestHandlerUndoSet = new UndoSetHandler();
104     requestHandlerGetNext = new GetNextHandler();
105     requestHandlerGetBulk = new GetBulkHandler();
106     agentX.addCommandResponder(this);
107   }
108
109   /**
110    * Sets the ping delay in seconds. If greater than zero, for each session
111    * a ping PDU is sent to the master to validate the session regularly with
112    * the specified delay. To monitor the ping requests, it is necessary to
113    * add a {@link PingListener} with {@link #addPingListener}.
114    *
115    * @param seconds
116    * the delay. If zero or a negative value is supplied, no pings are sent
117    */

118   public void setPingDelay(int seconds) {
119     if (pingTimer != null) {
120       pingTimer.cancel();
121       pingTimer = null;
122     }
123     if (seconds > 0) {
124       pingTimer = new Timer();
125       pingTimer.schedule(new PingTask(), seconds * 1000, seconds * 1000);
126     }
127   }
128
129   public void processCommand(AgentXCommandEvent event) {
130     if (event.getCommand() != null) {
131       event.setProcessed(true);
132       Command command = new Command(event);
133       if (threadPool != null) {
134         threadPool.execute(command);
135       }
136       else {
137         command.run();
138       }
139     }
140   }
141
142   protected synchronized int getNextTransactionID() {
143     return nextTransactionID++;
144   }
145
146   protected synchronized int closeSession(int sessionID, byte reason) throws
147       IOException JavaDoc {
148     AgentXSession session = removeSession(sessionID);
149     if ((session == null) || (session.isClosed())) {
150       return AgentXProtocol.AGENTX_NOT_OPEN;
151     }
152     session.setClosed(true);
153     AgentXClosePDU closePDU =
154         new AgentXClosePDU(AgentXProtocol.REASON_SHUTDOWN);
155     AgentXTarget target = session.createAgentXTarget();
156     AgentXResponseEvent resp =
157         agentX.send(closePDU, target, session.getPeer().getTransport());
158     if (resp == null) {
159       return AgentXProtocol.AGENTX_TIMEOUT;
160     }
161     return ((AgentXResponsePDU)resp.getResponse()).getErrorStatus();
162   }
163
164   protected int openSession(TransportMapping transport,
165                             Address masterAddress,
166                             AgentXSession session) throws IOException JavaDoc {
167     AgentXOpenPDU openPDU = new AgentXOpenPDU(0, getNextTransactionID(),
168                                               0, session.getTimeout(),
169                                               subagentID, subagentDescr);
170     AgentXResponseEvent responseEvent =
171         agentX.send(openPDU, session.createAgentXTarget(), transport);
172     if (responseEvent.getResponse() == null) {
173       LOGGER.error("Timeout on connection to master "+masterAddress);
174     }
175     else if (responseEvent.getResponse() instanceof AgentXResponsePDU) {
176       AgentXResponsePDU response = responseEvent.getResponse();
177       if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) {
178         session.setSessionID(response.getSessionID());
179       }
180       return response.getErrorStatus();
181     }
182     else {
183       LOGGER.error("Received packet on open PDU is not a response AgentX PDU: "+
184                    responseEvent);
185     }
186     return AgentXProtocol.AGENTX_TIMEOUT;
187   }
188
189   private static int getResponseStatus(AgentXResponseEvent responseEvent) {
190     if (responseEvent.getResponse() == null) {
191       LOGGER.error("Timeout on connection to master "+
192                    responseEvent.getTarget());
193       return AgentXProtocol.AGENTX_TIMEOUT;
194     }
195     else if (responseEvent.getResponse() instanceof AgentXResponsePDU) {
196       AgentXResponsePDU response = responseEvent.getResponse();
197       return response.getErrorStatus();
198     }
199     else {
200       LOGGER.error("Received packet on open PDU is not a response AgentX PDU: "+
201                    responseEvent);
202     }
203     return AgentXProtocol.AGENTX_ERROR;
204   }
205
206   public void disconnect(Address masterAddress) throws IOException JavaDoc {
207     AgentXPeer peer = (AgentXPeer) peers.remove(masterAddress);
208     if (peer != null) {
209       TransportMapping transport = peer.getTransport();
210       if (transport instanceof ConnectionOrientedTransportMapping) {
211         ((ConnectionOrientedTransportMapping)transport).close(masterAddress);
212       }
213     }
214   }
215
216   public int connect(Address masterAddress, Address localAddress,
217                      AgentXSession session) throws IOException JavaDoc {
218     AgentXPeer peer = (AgentXPeer) peers.get(masterAddress);
219     TransportMapping transport;
220     if (peer == null) {
221       transport = addMaster(localAddress);
222       peer = new AgentXPeer(transport, masterAddress);
223     }
224     else {
225       transport = peer.getTransport();
226     }
227     peer.setTimeout(session.getTimeout());
228     session.setPeer(peer);
229     int status = AgentXProtocol.AGENTX_TIMEOUT;
230     try {
231       status = openSession(transport, masterAddress, session);
232       if (status != AgentXProtocol.AGENTX_TIMEOUT) {
233         peers.put(masterAddress, peer);
234         LOGGER.info("Added new peer address="+masterAddress+",peer="+peer);
235       }
236     }
237     catch (IOException JavaDoc ex) {
238       LOGGER.error(ex);
239       removeMaster(transport);
240       return AgentXProtocol.AGENTX_ERROR;
241     }
242     if (status == AgentXProtocol.AGENTX_SUCCESS) {
243       sessions.put(new Integer JavaDoc(session.getSessionID()), session);
244       LOGGER.info("Opened subagent session successfully: "+session);
245     }
246     else {
247       removeMaster(transport);
248     }
249     return status;
250   }
251
252   public int close(AgentXSession session, byte reason) throws IOException JavaDoc {
253     return closeSession(session.getSessionID(), reason);
254   }
255
256   private synchronized AgentXSession getSession(int sessionID) {
257     return (AgentXSession) sessions.get(new Integer JavaDoc(sessionID));
258   }
259
260   private synchronized AgentXSession removeSession(int sessionID) {
261     return (AgentXSession) sessions.remove(new Integer JavaDoc(sessionID));
262   }
263
264   public void setDefaultPriority(byte priority) {
265     this.defaultPriority = priority;
266   }
267
268   public byte getDefaultPriority() {
269     return defaultPriority;
270   }
271
272   /**
273    * Gets the priority with which the supplied managed object and
274    * region should be registered at the master agent. Overwrite
275    * this method to use individual priorites depending on the registered
276    * region/managed object. The default implementation returns
277    * {@link #getDefaultPriority()}.
278    *
279    * @param mo ManagedObject
280    * a managed object instance that manages <code>region</code>.
281    * @param region
282    * the region to be registered.
283    * @return
284    * the priority between 0 and 255 (lower value results in higher priority).
285    */

286   protected byte getPriority(ManagedObject mo, AgentXRegion region) {
287     return defaultPriority;
288   }
289
290   /**
291    * Registers the subagent regions at the master agent.
292    * @param session
293    * the session on whose behalf regions are registered.
294    * @param context
295    * the context to use for registration.
296    * @return
297    * a List of the managed objects which failed to register.
298    */

299   public List registerRegions(AgentXSession session, OctetString context) {
300     return registerRegions(session, context, null);
301   }
302
303   /**
304    * Registers the subagent regions at the master agent.
305    * @param session
306    * the session on whose behalf regions are registered.
307    * @param context
308    * the context to use for registration.
309    * @param sysUpTime
310    * if not <code>null</code>, the master agent's notion of the sysUpTime
311    * for the registered context is returned. The input value is always
312    * ignored!
313    * @return
314    * a List of the managed objects which failed to register.
315    */

316   public List registerRegions(AgentXSession session, OctetString context,
317                               TimeTicks sysUpTime) {
318     LinkedList failures = new LinkedList();
319     MOServer server = getServer(context);
320     if (server == null) {
321       LOGGER.warn("No MOServer found for context '"+context+"'");
322       return null;
323     }
324     for (Iterator it = server.iterator(); it.hasNext();) {
325       ManagedObject mo = (ManagedObject)it.next();
326       if (mo instanceof AgentXSharedMOTable) {
327         List failedRows = registerSharedTableRows(session, context,
328                                                   (AgentXSharedMOTable)mo);
329         failures.addAll(failedRows);
330       }
331       else {
332         MOScope scope = mo.getScope();
333         AgentXRegion region =
334             new AgentXRegion(scope.getLowerBound(), scope.getUpperBound());
335         if (mo instanceof MOScalar) {
336           region.setSingleOID(true);
337         }
338         region.setUpperIncluded(scope.isUpperIncluded());
339         try {
340           int status = registerRegion(session, context, region,
341                                       getPriority(mo, region), sysUpTime);
342           if (status != AgentXProtocol.AGENTX_SUCCESS) {
343             failures.add(mo);
344             if (LOGGER.isWarnEnabled()) {
345               LOGGER.warn("Failed to registered MO " + scope +
346                           " with status = " +
347                           status);
348             }
349           }
350           else {
351             if (LOGGER.isInfoEnabled()) {
352               LOGGER.info("Registered MO " + scope + " successfully");
353             }
354           }
355
356         }
357         catch (IOException JavaDoc ex) {
358           LOGGER.warn("Failed to register " + mo + " in context '" + context +
359                       "' of session " + session);
360           failures.add(mo);
361         }
362       }
363     }
364     return failures;
365   }
366
367   /**
368    * Registers the indexes and (row) regions of a shared table. This method
369    * is called on behalf of {@link #registerRegions(AgentXSession session,
370    * OctetString context, TimeTicks sysUpTime)} and
371    * {@link #registerRegions(AgentXSession session, OctetString context)}.
372    *
373    * @param session
374    * the session on whose behalf regions are registered.
375    * @param context
376    * the context to use for registration.
377    * @param mo
378    * the <code>AgentXSharedMOTable</code> instance to register.
379    * @return List
380    * a list of failed {@link MOTableRow} instances.
381    */

382   public List registerSharedTableRows(AgentXSession session,
383                                       OctetString context,
384                                       AgentXSharedMOTable mo) {
385     LinkedList failedRows = new LinkedList();
386     AgentXSharedMOTableSupport sharedTableSupport =
387         new AgentXSharedMOTableSupport(agentX, session, context);
388     synchronized (mo) {
389       if (mo instanceof AgentXSharedMutableMOTable) {
390         ((AgentXSharedMutableMOTable)
391          mo).setAgentXSharedMOTableSupport(sharedTableSupport);
392       }
393       for (Iterator it = mo.getModel().iterator(); it.hasNext();) {
394         MOTableRow row = (MOTableRow) it.next();
395         OID newIndex = (OID) row.getIndex().clone();
396         int status = sharedTableSupport.allocateIndex(context, mo.getIndexDef(),
397             (byte)AgentXSharedMOTableSupport.INDEX_MODE_ALLOCATE,
398             newIndex);
399         if (status == AgentXProtocol.AGENTX_SUCCESS) {
400           if ((newIndex instanceof AnyNewIndexOID) ||
401               (newIndex instanceof NewIndexOID)) {
402             if (mo instanceof AgentXSharedMutableMOTable) {
403               ((AgentXSharedMutableMOTable)mo).
404                   changeRowIndex(newIndex, row.getIndex());
405             }
406           }
407           status = sharedTableSupport.registerRow(mo, row);
408           if (status != AgentXProtocol.AGENTX_SUCCESS) {
409             sharedTableSupport.deallocateIndex(context, mo.getIndexDef(),
410                                                row.getIndex());
411             LOGGER.warn("Failed to register row with "+status+" for "+row);
412             failedRows.add(row);
413           }
414         }
415         else {
416           LOGGER.warn("Failed to allocate index with "+status+" for row "+
417                       row);
418           failedRows.add(row);
419         }
420       }
421     }
422     return failedRows;
423   }
424
425   protected int registerRegion(AgentXSession session,
426                                OctetString context, AgentXRegion region,
427                                byte priority,
428                                TimeTicks sysUpTime) throws IOException JavaDoc {
429     if ((session == null) || (session.isClosed())) {
430       return AgentXProtocol.AGENTX_NOT_OPEN;
431     }
432     long t = (this.timeout == 0) ? session.getTimeout()*1000 : this.timeout;
433     AgentXRegisterPDU pdu =
434         new AgentXRegisterPDU(context, region.getLowerBound(), priority,
435                               region.getRangeSubID(),
436                               region.getUpperBoundSubID());
437     pdu.setSessionAttributes(session);
438     AgentXResponseEvent event =
439         agentX.send(pdu, new AgentXTarget(session.getPeer().getAddress(), t),
440                     session.getPeer().getTransport());
441     if ((sysUpTime != null) && (event.getResponse() != null)) {
442       sysUpTime.setValue(event.getResponse().getSysUpTime() & 0xFFFFFFFFL);
443     }
444     return getResponseStatus(event);
445   }
446
447   protected int unregisterRegion(AgentXSession session,
448                                  OctetString context, AgentXRegion region,
449                                  byte timeout) throws IOException JavaDoc {
450     if ((session == null) || (session.isClosed())) {
451       return AgentXProtocol.AGENTX_NOT_OPEN;
452     }
453     byte t = (timeout == 0) ? session.getTimeout() : timeout;
454     AgentXUnregisterPDU pdu =
455         new AgentXUnregisterPDU(context, region.getLowerBound(), t,
456                                 region.getRangeSubID(),
457                                 region.getUpperBoundSubID());
458     pdu.setSessionAttributes(session);
459     AgentXResponseEvent event =
460         agentX.send(pdu, new AgentXTarget(session.getPeer().getAddress(),
461                                           this.timeout),
462                     session.getPeer().getTransport());
463     return getResponseStatus(event);
464   }
465
466
467
468   protected TransportMapping addMaster(Address localAddress)
469       throws IOException JavaDoc
470   {
471 /*
472     if (transport != null) {
473       try {
474         transport.close();
475       }
476       catch (IOException ex) {
477         logger.error(ex);
478       }
479       agentX.removeTransportMapping(transport);
480     }
481 */

482     TransportMapping transport =
483           TransportMappings.getInstance().createTransportMapping(localAddress);
484     if (transport instanceof ConnectionOrientedTransportMapping) {
485       ConnectionOrientedTransportMapping tcpTransport =
486           (ConnectionOrientedTransportMapping)transport;
487       tcpTransport.setConnectionTimeout(0);
488       tcpTransport.setMessageLengthDecoder(new AgentXProtocol());
489     }
490     agentX.addTransportMapping(transport);
491     transport.listen();
492     return transport;
493   }
494
495   protected void removeMaster(TransportMapping transport) {
496     agentX.removeTransportMapping(transport);
497     try {
498       transport.close();
499     }
500     catch (IOException JavaDoc ex) {
501       LOGGER.warn("Closing transport mapping "+transport+" failed with: "+
502                   ex.getMessage());
503     }
504   }
505
506   public synchronized void addMOServer(MOServer server) {
507     moServers.add(server);
508   }
509
510   public synchronized void removeMOServer(MOServer server) {
511     moServers.remove(server);
512   }
513
514   public synchronized MOServer getServer(OctetString context) {
515     for (int i=0; i<moServers.size(); i++) {
516       MOServer s = (MOServer)moServers.get(i);
517       if (s.isContextSupported(context)) {
518         return s;
519       }
520     }
521     return null;
522   }
523
524   public synchronized Collection getContexts() {
525     LinkedList allContexts = new LinkedList();
526     for (int i=0; i<moServers.size(); i++) {
527       MOServer s = (MOServer)moServers.get(i);
528       OctetString[] contexts = s.getContexts();
529       allContexts.addAll(Arrays.asList(contexts));
530     }
531     return allContexts;
532   }
533
534   public ThreadPool getThreadPool() {
535     return threadPool;
536   }
537
538   public void setThreadPool(ThreadPool threadPool) {
539     this.threadPool = threadPool;
540   }
541
542   public void dispatchCommand(AgentXCommandEvent cmd) {
543     boolean pendingSessionClose = false;
544     if (cmd.getCommand().isConfirmedPDU()) {
545       AgentXRequest request = null;
546       MOServer server = null;
547       switch (cmd.getCommand().getType()) {
548         case AgentXPDU.AGENTX_GET_PDU: {
549           request = (AgentXRequest) factory.createRequest(cmd, null);
550           server = getServer(request.getContext());
551           requestHandlerGet.processPdu(request, server);
552           break;
553         }
554         case AgentXPDU.AGENTX_GETNEXT_PDU: {
555           request = (AgentXRequest) factory.createRequest(cmd, null);
556           server = getServer(request.getContext());
557           requestHandlerGetNext.processPdu(request, server);
558           break;
559         }
560         case AgentXPDU.AGENTX_GETBULK_PDU: {
561           request = (AgentXRequest) factory.createRequest(cmd, null);
562           server = getServer(request.getContext());
563           requestHandlerGetBulk.processPdu(request, server);
564           break;
565         }
566         case AgentXPDU.AGENTX_TESTSET_PDU: {
567           request = (AgentXRequest) factory.createRequest(cmd, null);
568           request.setPhase(Request.PHASE_2PC_PREPARE);
569           server = getServer(request.getContext());
570           requestHandlerTestSet.processPdu(request, server);
571           requestList.put(createRequestID(cmd), request);
572           break;
573         }
574         case AgentXPDU.AGENTX_COMMITSET_PDU:
575         case AgentXPDU.AGENTX_UNDOSET_PDU:
576         case AgentXPDU.AGENTX_CLEANUPSET_PDU: {
577           RequestID reqID = createRequestID(cmd);
578           request = (AgentXRequest) requestList.get(reqID);
579           if (request == null) {
580             LOGGER.error("Request with ID "+reqID+" not found in request list");
581             request = new AgentXRequest(cmd);
582             request.setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
583             break;
584           }
585           server = getServer(request.getContext());
586           switch (cmd.getCommand().getType()) {
587             case AgentXPDU.AGENTX_COMMITSET_PDU:
588               request.setPhase(Request.PHASE_2PC_COMMIT);
589               requestHandlerCommitSet.processPdu(request, server);
590               break;
591             case AgentXPDU.AGENTX_UNDOSET_PDU:
592               request.setPhase(Request.PHASE_2PC_UNDO);
593               requestHandlerUndoSet.processPdu(request, server);
594               break;
595             case AgentXPDU.AGENTX_CLEANUPSET_PDU:
596               request.setPhase(Request.PHASE_2PC_CLEANUP);
597               requestHandlerCleanupSet.processPdu(request, server);
598               break;
599             default: {
600               LOGGER.fatal("Internal error");
601             }
602           }
603           if (cmd.getCommand().getType() != AgentXPDU.AGENTX_COMMITSET_PDU) {
604             // remove request from request list
605
requestList.remove(reqID);
606           }
607           break;
608         }
609         case AgentXPDU.AGENTX_CLOSE_PDU: {
610           AgentXSession session =
611               removeSession(cmd.getCommand().getSessionID());
612           if (session != null) {
613             session.setClosed(true);
614             pendingSessionClose = true;
615           }
616           break;
617         }
618         default: {
619           LOGGER.error("Unhandled PDU type: "+cmd.getCommand());
620           request = new AgentXRequest(cmd);
621           request.setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
622         }
623       }
624       if (request != null) {
625         // Since this is an AgentX subagent it only processes a single phase at
626
// once.
627
if (request.isPhaseComplete()) {
628           // send response
629
sendResponse(cmd, request);
630         }
631         if (server != null) {
632           release(server, request);
633         }
634       }
635       if (pendingSessionClose) {
636         try {
637           disconnect(cmd.getPeerAddress());
638         }
639         catch (IOException JavaDoc ex) {
640           LOGGER.error("Failed to disconnect from master at "+
641                        cmd.getPeerAddress()+": "+ex.getMessage(), ex);
642         }
643       }
644     }
645     else {
646       processResponse(cmd);
647     }
648   }
649
650   protected void sendResponse(AgentXCommandEvent cmd, AgentXRequest request) {
651     AgentXMessageDispatcher dispatcher = cmd.getDispatcher();
652     AgentXResponsePDU response = request.getResponsePDU();
653     if (response != null) {
654       AgentXPDU rpdu = cmd.getCommand();
655       response.setSessionID(rpdu.getSessionID());
656       response.setTransactionID(rpdu.getTransactionID());
657       response.setByteOrder(rpdu.getByteOrder());
658       response.setPacketID(rpdu.getPacketID());
659       // only send a response if required
660
try {
661         dispatcher.send(cmd.getPeerTransport(),
662                         cmd.getPeerAddress(), response, null);
663       }
664       catch (IOException JavaDoc ex) {
665         LOGGER.warn("Failed to send AgentX response to '"+
666                     cmd.getPeerAddress()+"' with error: "+ex.getMessage());
667       }
668     }
669   }
670
671   protected void release(MOServer server, Request req) {
672     for (Iterator it = req.iterator(); it.hasNext();) {
673       SubRequest sreq = (SubRequest)it.next();
674       if (sreq.getTargetMO() != null) {
675         server.unlock(req, sreq.getTargetMO());
676       }
677     }
678   }
679
680   private static RequestID createRequestID(AgentXCommandEvent cmd) {
681     return new RequestID(cmd.getPeerAddress(),
682                          cmd.getCommand().getSessionID(),
683                          cmd.getCommand().getTransactionID());
684
685   }
686
687   protected void processResponse(AgentXCommandEvent cmd) {
688     if (LOGGER.isDebugEnabled()) {
689       LOGGER.debug("Received response "+cmd);
690     }
691   }
692
693   protected void processNextSubRequest(Request request, MOServer server,
694                                        OctetString context,
695                                        SubRequest sreq)
696       throws NoSuchElementException
697   {
698     // We can be sure to have a default context scope here because
699
// the inner class AgentXSubRequest creates it!
700
DefaultMOContextScope scope =
701         (DefaultMOContextScope)sreq.getScope();
702     MOQuery query = sreq.getQuery();
703     if (query == null) {
704       query = new DefaultMOQuery(scope, false, request);
705     }
706     while (!sreq.getStatus().isProcessed()) {
707       ManagedObject mo = server.lookup(query);
708       if (mo == null) {
709         if (LOGGER.isDebugEnabled()) {
710           LOGGER.debug("EndOfMibView at scope="+query.getScope()+
711                        " and query "+query);
712         }
713         sreq.getVariableBinding().setVariable(Null.endOfMibView);
714         sreq.getStatus().setPhaseComplete(true);
715         break;
716       }
717       try {
718         if (!mo.next(sreq)) {
719           // We can be sure to have a default context scope here because
720
// the inner class SnmpSubRequest creates it!
721
// don't forget to update query:
722
sreq.getVariableBinding().setVariable(Null.instance);
723           scope.substractScope(mo.getScope());
724           // query is updated automatically because scope is updated.
725
query.substractScope(mo.getScope());
726         }
727       }
728       catch (Exception JavaDoc moex) {
729         if (LOGGER.isDebugEnabled()) {
730           moex.printStackTrace();
731         }
732         LOGGER.warn(moex);
733         if (sreq.getStatus().getErrorStatus() == PDU.noError) {
734           sreq.getStatus().setErrorStatus(PDU.genErr);
735         }
736       }
737     }
738   }
739
740   /**
741    * Sends notifications (traps) to all appropriate notification targets
742    * through the master agent.
743    *
744    * @param context the context name of the context on whose behalf this
745    * notification has been generated.
746    * @param notificationID the object ID that uniquely identifies this
747    * notification. For SNMPv1 traps, the notification ID has to be build
748    * using the rules provided by RFC 2576.
749    * @param vbs an array of <code>VariableBinding</code> instances
750    * representing the payload of the notification.
751    * @return
752    * an {@link AgentXResponseEvent} instance or <code>null</code> if the
753    * notification request timed out.
754    */

755   public Object JavaDoc notify(OctetString context,
756                        OID notificationID,
757                        VariableBinding[] vbs) {
758     return notify(context, notificationID, null, vbs);
759   }
760
761   public Object JavaDoc notify(OctetString context, OID notificationID,
762                        TimeTicks sysUpTime, VariableBinding[] vbs) {
763     AgentXSession session = firstSession();
764     AgentXResponseEvent agentXResponse = null;
765     try {
766       agentXResponse =
767           notify(session, context, notificationID, sysUpTime, vbs);
768       if ((agentXResponse == null) || (agentXResponse.getResponse() == null)) {
769         LOGGER.warn("Timeout on sending notification in context '"+context+
770                    "' with ID '"+notificationID+"' and payload "+
771                    Arrays.asList(vbs));
772         return null;
773       }
774       return agentXResponse;
775     }
776     catch (IOException JavaDoc ex) {
777       LOGGER.error("Failed to send notification in context '"+context+
778                    "' with ID '"+notificationID+"' and payload "+
779                    Arrays.asList(vbs)+", reason is: "+ex.getMessage());
780       return null;
781     }
782   }
783
784   /**
785    * Returns the first session that have been opened by this subagent and is
786    * still open. If no open session exists, <code>null</code> is returned.
787    *
788    * @return
789    * an <code>AgentXSession</code>.
790    */

791   public synchronized final AgentXSession firstSession() {
792     if (sessions.size() > 0) {
793       return (AgentXSession) sessions.values().iterator().next();
794     }
795     return null;
796   }
797
798   public AgentXResponseEvent notify(AgentXSession session,
799                                     OctetString context,
800                                     OID notificationID,
801                                     TimeTicks sysUpTime,
802                                     VariableBinding[] vbs) throws IOException JavaDoc {
803     int offset = 1;
804     if (sysUpTime != null) {
805       offset = 2;
806     }
807     VariableBinding[] notifyVBs = new VariableBinding[vbs.length+offset];
808     if (sysUpTime != null) {
809       notifyVBs[0] = new VariableBinding(SnmpConstants.sysUpTime, sysUpTime);
810     }
811     notifyVBs[offset-1] =
812         new VariableBinding(SnmpConstants.snmpTrapOID, notificationID);
813     System.arraycopy(vbs, 0, notifyVBs, offset, vbs.length);
814     AgentXNotifyPDU notifyPDU = new AgentXNotifyPDU(context, notifyVBs);
815     notifyPDU.setSessionAttributes(session);
816     notifyPDU.setTransactionID(getNextTransactionID());
817     AgentXResponseEvent response =
818         agentX.send(notifyPDU, session.createAgentXTarget(),
819                     session.getPeer().getTransport());
820     return response;
821   }
822
823   public int addAgentCaps(AgentXSession session,
824                           OctetString context, OID id, OctetString descr) {
825     AgentXAddAgentCapsPDU pdu = new AgentXAddAgentCapsPDU(context, id, descr);
826     pdu.setSessionAttributes(session);
827     try {
828       AgentXResponseEvent resp = agentX.send(pdu, session.createAgentXTarget(),
829                                              session.getPeer().getTransport());
830       if (resp.getResponse() == null) {
831         return AgentXProtocol.AGENTX_TIMEOUT;
832       }
833       return resp.getResponse().getErrorStatus();
834     }
835     catch (IOException JavaDoc ex) {
836       LOGGER.error("Failed to send AgentX AddAgentCaps PDU "+pdu+
837                    " because: "+ex.getMessage(), ex);
838       return AgentXProtocol.AGENTX_NOT_OPEN;
839     }
840   }
841
842   public int removeAgentCaps(AgentXSession session,
843                              OctetString context, OID id) {
844     AgentXRemoveAgentCapsPDU pdu = new AgentXRemoveAgentCapsPDU(context, id);
845     pdu.setSessionAttributes(session);
846     try {
847       AgentXResponseEvent resp = agentX.send(pdu, session.createAgentXTarget(),
848                                              session.getPeer().getTransport());
849       return resp.getResponse().getErrorStatus();
850     }
851     catch (IOException JavaDoc ex) {
852       LOGGER.error("Failed to send AgentX RemoveAgentCaps PDU "+pdu+
853                    " because: "+ex.getMessage(), ex);
854       return AgentXProtocol.AGENTX_NOT_OPEN;
855     }
856   }
857
858   public void addPingListener(PingListener l) {
859     if (pingListeners == null) {
860       pingListeners = new Vector();
861     }
862     pingListeners.add(l);
863   }
864
865   public void removePingListener(PingListener l) {
866     if (pingListeners != null) {
867       synchronized (pingListeners) {
868         pingListeners.remove(l);
869       }
870     }
871   }
872
873   protected void firePinged(PingEvent event) {
874     if (pingListeners != null) {
875       synchronized (pingListeners) {
876         Vector listeners = pingListeners;
877         int count = listeners.size();
878         for (int i = 0; i < count; i++) {
879           ((PingListener) listeners.elementAt(i)).pinged(event);
880         }
881       }
882     }
883   }
884
885   private static void initRequestPhase(Request request) {
886     if (request.getPhase() == Request.PHASE_INIT) {
887       request.nextPhase();
888     }
889   }
890
891   static class GetRequestHandler implements RequestHandler {
892
893     public boolean isSupported(int pduType) {
894       return pduType == AgentXPDU.AGENTX_GET_PDU;
895     }
896
897     public void processPdu(Request request, MOServer server) {
898       initRequestPhase(request);
899       try {
900         SubRequestIterator it = (SubRequestIterator) request.iterator();
901         while (it.hasNext()) {
902           SubRequest sreq = it.nextSubRequest();
903           DefaultMOQuery query =
904               new DefaultMOQuery((MOContextScope)sreq.getScope(),
905                                  false, request);
906           ManagedObject mo = server.lookup(query);
907           if (mo == null) {
908             sreq.getVariableBinding().setVariable(Null.noSuchObject);
909             sreq.getStatus().setPhaseComplete(true);
910             continue;
911           }
912           try {
913             mo.get(sreq);
914           }
915           catch (Exception JavaDoc moex) {
916             if (LOGGER.isDebugEnabled()) {
917               moex.printStackTrace();
918             }
919             LOGGER.warn(moex);
920             if (sreq.getStatus().getErrorStatus() == PDU.noError) {
921               sreq.getStatus().setErrorStatus(PDU.genErr);
922             }
923           }
924         }
925       }
926       catch (NoSuchElementException nsex) {
927         if (LOGGER.isDebugEnabled()) {
928           nsex.printStackTrace();
929         }
930         LOGGER.error("SubRequest not found");
931         request.setErrorStatus(PDU.genErr);
932       }
933     }
934   }
935
936   class GetNextHandler implements RequestHandler {
937
938     public void processPdu(Request request, MOServer server) {
939       initRequestPhase(request);
940       OctetString context = request.getContext();
941       try {
942         SubRequestIterator it = (SubRequestIterator) request.iterator();
943         while (it.hasNext()) {
944           SubRequest sreq = it.nextSubRequest();
945           processNextSubRequest(request, server, context, sreq);
946         }
947       }
948       catch (NoSuchElementException nsex) {
949         if (LOGGER.isDebugEnabled()) {
950           nsex.printStackTrace();
951         }
952         LOGGER.error("SubRequest not found");
953         request.setErrorStatus(PDU.genErr);
954       }
955     }
956
957
958     public boolean isSupported(int pduType) {
959       return (pduType == PDU.GETNEXT);
960     }
961
962   }
963
964   class GetBulkHandler implements RequestHandler {
965
966     public void processPdu(Request request, MOServer server) {
967       initRequestPhase(request);
968       OctetString context = request.getContext();
969       AgentXRequest req = (AgentXRequest)request;
970       int nonRep = req.getNonRepeaters();
971       try {
972         SubRequestIterator it = (SubRequestIterator) request.iterator();
973         int i = 0;
974         // non repeaters
975
for (; ((i < nonRep) && it.hasNext()); i++) {
976           SubRequest sreq = it.nextSubRequest();
977           processNextSubRequest(request, server, context, sreq);
978         }
979         // repetitions
980
for (; it.hasNext(); i++) {
981           SubRequest sreq = it.nextSubRequest();
982           processNextSubRequest(request, server, context, sreq);
983         }
984       }
985       catch (NoSuchElementException nsex) {
986         if (LOGGER.isDebugEnabled()) {
987           nsex.printStackTrace();
988         }
989         LOGGER.error("SubRequest not found");
990         request.setErrorStatus(PDU.genErr);
991       }
992
993     }
994
995     public boolean isSupported(int pduType) {
996       return (pduType == PDU.GETBULK);
997     }
998
999   }
1000
1001
1002  static class TestSetHandler implements RequestHandler {
1003
1004    public void processPdu(Request request, MOServer server) {
1005      try {
1006        SubRequestIterator it = (SubRequestIterator) request.iterator();
1007        while ((!request.isPhaseComplete()) && (it.hasNext())) {
1008          SubRequest sreq = it.nextSubRequest();
1009          if (sreq.isComplete()) {
1010            continue;
1011          }
1012          DefaultMOQuery query =
1013              new DefaultMOQuery((MOContextScope)sreq.getScope(), false,
1014                                 request);
1015          ManagedObject mo = server.lookup(query);
1016          if (mo == null) {
1017            sreq.getStatus().setErrorStatus(PDU.notWritable);
1018            break;
1019          }
1020          sreq.setTargetMO(mo);
1021          server.lock(sreq.getRequest(), mo);
1022          try {
1023            mo.prepare(sreq);
1024            sreq.getStatus().setPhaseComplete(true);
1025          }
1026          catch (Exception JavaDoc moex) {
1027            if (sreq.getStatus().getErrorStatus() == PDU.noError) {
1028              sreq.getStatus().setErrorStatus(PDU.genErr);
1029            }
1030            LOGGER.error("Exception occurred while preparing SET request, "+
1031                         "returning genErr: "+moex.getMessage(), moex);
1032          }
1033        }
1034      }
1035      catch (NoSuchElementException nsex) {
1036        if (LOGGER.isDebugEnabled()) {
1037          nsex.printStackTrace();
1038        }
1039        LOGGER.error("Cannot find sub-request: ", nsex);
1040        request.setErrorStatus(PDU.genErr);
1041      }
1042    }
1043
1044    public boolean isSupported(int pduType) {
1045      return (pduType == AgentXPDU.AGENTX_TESTSET_PDU);
1046    }
1047  }
1048
1049  class UndoSetHandler implements RequestHandler {
1050
1051    public void processPdu(Request request, MOServer server) {
1052      try {
1053        SubRequestIterator it = (SubRequestIterator) request.iterator();
1054        while (it.hasNext()) {
1055          SubRequest sreq = it.nextSubRequest();
1056          if (sreq.isComplete()) {
1057            continue;
1058          }
1059          ManagedObject mo = sreq.getTargetMO();
1060          if (mo == null) {
1061            DefaultMOQuery query =
1062                new DefaultMOQuery((MOContextScope)sreq.getScope(), true);
1063            mo = server.lookup(query);
1064          }
1065          if (mo == null) {
1066            sreq.getStatus().setErrorStatus(PDU.undoFailed);
1067            continue;
1068          }
1069          try {
1070            mo.undo(sreq);
1071            sreq.getStatus().setPhaseComplete(true);
1072          }
1073          catch (Exception JavaDoc moex) {
1074            if (LOGGER.isDebugEnabled()) {
1075              moex.printStackTrace();
1076            }
1077            LOGGER.error(moex);
1078            if (sreq.getStatus().getErrorStatus() == PDU.noError) {
1079              sreq.getStatus().setErrorStatus(PDU.undoFailed);
1080            }
1081          }
1082        }
1083      }
1084      catch (NoSuchElementException nsex) {
1085        if (LOGGER.isDebugEnabled()) {
1086          nsex.printStackTrace();
1087        }
1088        LOGGER.error("Cannot find sub-request: ", nsex);
1089        request.setErrorStatus(PDU.genErr);
1090      }
1091    }
1092
1093    public boolean isSupported(int pduType) {
1094      return (pduType == AgentXPDU.AGENTX_UNDOSET_PDU);
1095    }
1096  }
1097
1098  class CommitSetHandler implements RequestHandler {
1099
1100    public void processPdu(Request request, MOServer server) {
1101      try {
1102        SubRequestIterator it = (SubRequestIterator) request.iterator();
1103        while ((!request.isPhaseComplete()) && (it.hasNext())) {
1104          SubRequest sreq = it.nextSubRequest();
1105          if (sreq.isComplete()) {
1106            continue;
1107          }
1108          ManagedObject mo = sreq.getTargetMO();
1109          if (mo == null) {
1110            DefaultMOQuery query =
1111                new DefaultMOQuery((MOContextScope)sreq.getScope(), true);
1112            mo = server.lookup(query);
1113          }
1114          if (mo == null) {
1115            sreq.getStatus().setErrorStatus(PDU.commitFailed);
1116            continue;
1117          }
1118          try {
1119            mo.commit(sreq);
1120            sreq.getStatus().setPhaseComplete(true);
1121          }
1122          catch (Exception JavaDoc moex) {
1123            if (LOGGER.isDebugEnabled()) {
1124              moex.printStackTrace();
1125            }
1126            LOGGER.error(moex);
1127            if (sreq.getStatus().getErrorStatus() == PDU.noError) {
1128              sreq.getStatus().setErrorStatus(PDU.commitFailed);
1129            }
1130          }
1131        }
1132      }
1133      catch (NoSuchElementException nsex) {
1134        if (LOGGER.isDebugEnabled()) {
1135          nsex.printStackTrace();
1136        }
1137        LOGGER.error("Cannot find sub-request: ", nsex);
1138        request.setErrorStatus(PDU.genErr);
1139      }
1140    }
1141
1142    public boolean isSupported(int pduType) {
1143      return (pduType == AgentXPDU.AGENTX_COMMITSET_PDU);
1144    }
1145
1146  }
1147
1148  class CleanupSetHandler implements RequestHandler {
1149
1150    public void processPdu(Request request, MOServer server) {
1151      try {
1152        SubRequestIterator it = (SubRequestIterator) request.iterator();
1153        while (it.hasNext()) {
1154          SubRequest sreq = it.nextSubRequest();
1155          if (sreq.isComplete()) {
1156            continue;
1157          }
1158          ManagedObject mo = sreq.getTargetMO();
1159          if (mo == null) {
1160            DefaultMOQuery query =
1161                new DefaultMOQuery((MOContextScope)sreq.getScope(), false);
1162            mo = server.lookup(query);
1163          }
1164          if (mo == null) {
1165            sreq.completed();
1166            continue;
1167          }
1168          server.unlock(sreq.getRequest(), mo);
1169          try {
1170            mo.cleanup(sreq);
1171            sreq.getStatus().setPhaseComplete(true);
1172          }
1173          catch (Exception JavaDoc moex) {
1174            if (LOGGER.isDebugEnabled()) {
1175              moex.printStackTrace();
1176            }
1177            LOGGER.error(moex);
1178          }
1179        }
1180      }
1181      catch (NoSuchElementException nsex) {
1182        if (LOGGER.isDebugEnabled()) {
1183          nsex.printStackTrace();
1184        }
1185        LOGGER.warn("Cannot find sub-request: "+ nsex.getMessage());
1186      }
1187    }
1188
1189    public boolean isSupported(int pduType) {
1190      return (pduType == AgentXPDU.AGENTX_CLEANUPSET_PDU);
1191    }
1192
1193  }
1194
1195
1196  static class DefaultAgentXRequestFactory implements RequestFactory {
1197
1198    public Request createRequest(EventObject initiatingEvent,
1199                                 CoexistenceInfo cinfo) {
1200      Request request = new AgentXRequest((AgentXCommandEvent)initiatingEvent);
1201      if (LOGGER.isDebugEnabled()) {
1202        LOGGER.debug("Creating AgentX request "+request+
1203                     " from "+initiatingEvent);
1204      }
1205      return request;
1206    }
1207
1208  }
1209
1210  class Command implements Runnable JavaDoc {
1211
1212    private AgentXCommandEvent request;
1213
1214    public Command(AgentXCommandEvent event) {
1215      this.request = event;
1216    }
1217
1218    public void run() {
1219      dispatchCommand(request);
1220    }
1221
1222  }
1223
1224
1225  static class RequestID implements Comparable JavaDoc {
1226    private Address masterAddress;
1227    private int sessionID;
1228    private int transactionID;
1229
1230    public RequestID(Address masterAddress, int sessionID, int transactionID) {
1231      this.masterAddress = masterAddress;
1232      this.sessionID = sessionID;
1233      this.transactionID = transactionID;
1234    }
1235
1236    public int compareTo(Object JavaDoc o) {
1237      RequestID other = (RequestID)o;
1238      int c = masterAddress.compareTo(other.masterAddress);
1239      if (c == 0) {
1240        c = sessionID - other.sessionID;
1241        if (c == 0) {
1242          c = transactionID - other.transactionID;
1243        }
1244      }
1245      return c;
1246    }
1247
1248    public boolean equals(Object JavaDoc obj) {
1249      if (obj instanceof RequestID) {
1250        return (compareTo(obj) == 0);
1251      }
1252      return false;
1253    }
1254
1255    public int hashCode() {
1256      return transactionID;
1257    }
1258
1259  }
1260
1261  class PingTask extends TimerTask {
1262
1263    public void run() {
1264      List l;
1265      synchronized (sessions) {
1266        l = new LinkedList(sessions.values());
1267      }
1268      for (Iterator it = l.iterator(); it.hasNext();) {
1269        AgentXSession session = (AgentXSession) it.next();
1270        for (Iterator cit = getContexts().iterator(); cit.hasNext(); ) {
1271          OctetString context = (OctetString) cit.next();
1272          AgentXPingPDU ping = new AgentXPingPDU(context);
1273          ping.setSessionAttributes(session);
1274          ping.setTransactionID(getNextTransactionID());
1275          PingEvent pingEvent;
1276          try {
1277            AgentXResponseEvent resp =
1278                agentX.send(ping, session.createAgentXTarget(),
1279                            session.getPeer().getTransport());
1280            pingEvent = new PingEvent(this, session,
1281                                      resp.getResponse());
1282          }
1283          catch (IOException JavaDoc ex) {
1284            pingEvent = new PingEvent(this, session, ex);
1285          }
1286          firePinged(pingEvent);
1287          if (LOGGER.isDebugEnabled()) {
1288            LOGGER.debug("Fired ping event "+pingEvent);
1289          }
1290          if (pingEvent.isCloseSession() || pingEvent.isResetSession()) {
1291            try {
1292              closeSession(session.getSessionID(),
1293                           AgentXProtocol.REASON_TIMEOUTS);
1294              if (pingEvent.isResetSession()) {
1295                reopenSession(session);
1296              }
1297            }
1298            catch (IOException JavaDoc ex1) {
1299            }
1300          }
1301        }
1302      }
1303    }
1304
1305    /**
1306     * Reopens a closed session.
1307     *
1308     * @param session
1309     * a closed AgentXSession instance.
1310     * @return
1311     * {@link AgentXProtocol#AGENTX_SUCCESS} if the session could be opened
1312     * sucessfully. Otherwise the AgentX error status is returned.
1313     * @throws IOException
1314     * if the session cannot be reopened due to an IO exception.
1315     */

1316    public int reopenSession(AgentXSession session) throws IOException JavaDoc {
1317      return openSession(session.getPeer().getTransport(),
1318                         session.getPeer().getAddress(),
1319                         session);
1320    }
1321
1322  }
1323}
1324
Popular Tags