1 22 23 package org.snmp4j.agent.agentx; 24 25 import java.io.IOException ; 26 import java.util.*; 27 28 import org.snmp4j.TransportMapping; 29 import org.snmp4j.log.LogAdapter; 30 import org.snmp4j.log.LogFactory; 31 import org.snmp4j.mp.PduHandle; 32 import org.snmp4j.mp.PduHandleCallback; 33 34 41 public class AgentX implements AgentXCommandListener { 42 43 private static final LogAdapter logger = LogFactory.getLogger(AgentX.class); 44 45 private AgentXMessageDispatcher messageDispatcher; 46 47 51 private Map pendingRequests = Collections.synchronizedMap(new HashMap(50)); 52 53 57 private Map asyncRequests = Collections.synchronizedMap(new HashMap(50)); 58 59 private Timer timer = new Timer(true); 61 private Vector commandListeners; 62 63 public AgentX(AgentXMessageDispatcher dispatcher) { 64 this.messageDispatcher = dispatcher; 65 this.messageDispatcher.addCommandListener(this); 66 } 67 68 public void addTransportMapping(TransportMapping transport) { 69 messageDispatcher.addTransportMapping(transport); 70 transport.addTransportListener(messageDispatcher); 71 } 72 73 public void removeTransportMapping(TransportMapping transport) { 74 messageDispatcher.removeTransportMapping(transport); 75 transport.removeTransportListener(messageDispatcher); 76 } 77 78 83 public synchronized void removeCommandResponder(AgentXCommandListener listener) { 84 if (commandListeners != null && 85 commandListeners.contains(listener)) { 86 Vector v = (Vector) commandListeners.clone(); 87 v.removeElement(listener); 88 commandListeners = v; 89 } 90 } 91 92 101 public synchronized void addCommandResponder(AgentXCommandListener listener) { 102 Vector v = (commandListeners == null) ? 103 new Vector(2) : (Vector) commandListeners.clone(); 104 if (!v.contains(listener)) { 105 v.addElement(listener); 106 commandListeners = v; 107 } 108 } 109 110 131 public AgentXResponseEvent send(AgentXPDU pdu, AgentXTarget target, 132 TransportMapping transport) throws IOException { 133 SyncResponseListener syncResponse = new SyncResponseListener(); 134 if (!pdu.isConfirmedPDU()) { 135 sendMessage(pdu, target, transport, null); 136 return null; 137 } 138 synchronized (syncResponse) { 139 PendingRequest request = 140 new PendingRequest(null, syncResponse, target, pdu, target); 141 sendMessage(pdu, target, transport, request); 142 try { 143 syncResponse.wait(); 144 } 145 catch (InterruptedException iex) { 146 logger.warn(iex); 147 } 149 } 150 return syncResponse.response; 151 } 152 153 174 public void send(AgentXPDU pdu, AgentXTarget target, 175 TransportMapping transport, 176 Object userHandle, 177 AgentXResponseListener listener) throws IOException { 178 PendingRequest request = 179 new PendingRequest(null, listener, userHandle, 180 pdu, target); 181 sendMessage(pdu, target, transport, request); 182 } 183 184 204 protected PduHandle sendMessage(AgentXPDU pdu, AgentXTarget target, 205 TransportMapping transport, 206 PduHandleCallback pduHandleCallback) 207 throws IOException 208 { 209 PduHandle handle = 210 messageDispatcher.send(transport, target.getAddress(), pdu, 211 pduHandleCallback); 212 return handle; 213 } 214 215 public void processCommand(AgentXCommandEvent event) { 216 AgentXPDU pdu = event.getCommand(); 217 PduHandle handle = new PduHandle(pdu.getPacketID()); 218 if (pdu.getType() == AgentXPDU.AGENTX_RESPONSE_PDU) { 219 event.setProcessed(true); 220 PendingRequest request; 221 if (logger.isDebugEnabled()) { 222 logger.debug("Removing pending request with handle " + handle); 223 } 224 request = (PendingRequest) pendingRequests.remove(handle); 225 if (request == null) { 226 if (logger.isWarnEnabled()) { 227 logger.warn("Received response that cannot be matched to any " + 228 "outstanding request, address=" + 229 event.getPeerAddress() + 230 ", packetID=" + pdu.getPacketID()); 231 } 232 } 233 else { 234 request.finished = true; 236 request.listener.onResponse(new AgentXResponseEvent(this, 237 request.target, 238 event.getPeerAddress(), 239 request.pdu, 240 (AgentXResponsePDU)pdu, 241 request.userObject)); 242 } 243 } 244 else { 245 if (logger.isDebugEnabled()) { 246 logger.debug("Fire process PDU event: " + event.toString()); 247 } 248 fireProcessPdu(event); 249 } 250 } 251 252 259 protected void fireProcessPdu(AgentXCommandEvent event) { 260 if (commandListeners != null) { 261 Vector listeners = commandListeners; 262 int count = listeners.size(); 263 for (int i = 0; i < count; i++) { 264 ((AgentXCommandListener) listeners.elementAt(i)).processCommand(event); 265 if (event.isProcessed()) { 268 return; 269 } 270 } 271 } 272 } 273 274 275 class AsyncRequestKey { 276 private AgentXPDU request; 277 private AgentXResponseListener listener; 278 279 public AsyncRequestKey(AgentXPDU request, AgentXResponseListener listener) { 280 this.request = request; 281 this.listener = listener; 282 } 283 284 291 public boolean equals(Object obj) { 292 if (obj instanceof AsyncRequestKey) { 293 AsyncRequestKey other = (AsyncRequestKey) obj; 294 return (request.equals(other.request) && listener.equals(other.listener)); 295 } 296 return false; 297 } 298 299 public int hashCode() { 300 return request.hashCode(); 301 } 302 } 303 304 class PendingRequest extends TimerTask implements PduHandleCallback { 305 306 protected PduHandle key; 307 protected AgentXResponseListener listener; 308 protected Object userObject; 309 310 protected AgentXPDU pdu; 311 protected AgentXTarget target; 312 313 private volatile boolean finished = false; 314 315 316 public PendingRequest(PduHandle key, 317 AgentXResponseListener listener, 318 Object userObject, 319 AgentXPDU pdu, 320 AgentXTarget target) { 321 this.key = key; 322 this.userObject = userObject; 323 this.listener = listener; 324 this.pdu = pdu; 325 this.target = target; 326 } 327 328 protected void registerRequest(PduHandle handle) { 329 } 330 331 public synchronized void run() { 332 pendingRequests.remove(key); 333 if (!finished) { 335 if (logger.isDebugEnabled()) { 336 logger.debug("AgentX request timed out: " + key.getTransactionID()); 337 } 338 finished = true; 339 listener.onResponse(new AgentXResponseEvent(AgentX.this, target, null, 340 pdu, null, userObject)); 341 } 342 } 343 344 public synchronized boolean setFinished() { 345 boolean currentState = finished; 346 this.finished = true; 347 return currentState; 348 } 349 350 public synchronized void pduHandleAssigned(PduHandle handle, Object pdu) { 351 if (key == null) { 352 key = handle; 353 if (logger.isDebugEnabled()) { 354 logger.debug("New pending request "+pdu+" with handle " + handle); 355 } 356 registerRequest(handle); 357 pendingRequests.put(handle, this); 358 long delay = target.getTimeout(); 359 timer.schedule(this, delay); 360 } 361 } 362 } 363 364 class AsyncPendingRequest extends PendingRequest { 365 366 public AsyncPendingRequest(PduHandle key, 367 AgentXResponseListener listener, 368 Object userObject, 369 AgentXPDU pdu, 370 AgentXTarget target) { 371 super(key, listener, userObject, pdu, target); 372 } 373 374 protected void registerRequest(PduHandle handle) { 375 asyncRequests.put(new AsyncRequestKey(pdu, listener), handle); 376 } 377 } 378 379 class SyncResponseListener implements AgentXResponseListener { 380 381 private AgentXResponseEvent response = null; 382 383 public synchronized void onResponse(AgentXResponseEvent event) { 384 this.response = event; 385 this.notify(); 386 } 387 388 public AgentXResponseEvent getResponse() { 389 return response; 390 } 391 392 } 393 394 public AgentXMessageDispatcher getMessageDispatcher() { 395 return messageDispatcher; 396 } 397 } 398 | Popular Tags |