1 20 21 package org.snmp4j; 22 23 import java.io.IOException ; 24 import java.util.*; 25 26 import org.snmp4j.asn1.*; 27 import org.snmp4j.event.*; 28 import org.snmp4j.log.*; 29 import org.snmp4j.mp.*; 30 import org.snmp4j.smi.*; 31 import java.nio.ByteBuffer ; 32 import org.snmp4j.transport.UnsupportedAddressClassException; 33 34 54 public class MessageDispatcherImpl implements MessageDispatcher { 55 56 private static final LogAdapter logger = 57 LogFactory.getLogger(MessageDispatcherImpl.class); 58 59 private Vector mpm = new Vector(3); 60 private Hashtable transportMappings = new Hashtable(5); 61 62 private int nextTransactionID = new Random().nextInt(Integer.MAX_VALUE-2)+1; 63 transient private Vector commandResponderListeners; 64 private transient Vector counterListeners; 65 private transient Vector authenticationFailureListeners; 66 67 private boolean checkOutgoingMsg = true; 68 69 73 public MessageDispatcherImpl() { 74 } 75 76 84 public synchronized void addMessageProcessingModel(MessageProcessingModel model) { 85 while (mpm.size() <= model.getID()) { 86 mpm.add(null); 87 } 88 if (mpm.get(model.getID()) == null) { 89 mpm.set(model.getID(), model); 90 } 91 } 92 93 98 public synchronized void removeMessageProcessingModel(MessageProcessingModel model) { 99 mpm.set(model.getID(), null); 100 } 101 102 114 public void addTransportMapping(TransportMapping transport) { 115 if (transportMappings.get(transport.getSupportedAddressClass()) == null) { 116 transportMappings.put(transport.getSupportedAddressClass(), transport); 117 } 118 } 119 120 128 public TransportMapping removeTransportMapping(TransportMapping transport) { 129 return (TransportMapping) 130 transportMappings.remove(transport.getSupportedAddressClass()); 131 } 132 133 138 public Collection getTransportMappings() { 139 return transportMappings.values(); 140 } 141 142 public synchronized int getNextRequestID() { 143 int nextID = nextTransactionID++; 144 if (nextID <= 0) { 145 nextID = 1; 146 nextTransactionID = 2; 147 } 148 return nextID; 149 } 150 151 protected PduHandle createPduHandle() { 152 return new PduHandle(getNextRequestID()); 153 } 154 155 171 protected void sendMessage(TransportMapping transport, 172 Address destAddress, byte[] message) 173 throws IOException 174 { 175 if (destAddress instanceof GenericAddress) { 176 destAddress = ((GenericAddress)destAddress).getAddress(); 177 } 178 if (transport != null) { 179 transport.sendMessage(destAddress, message); 180 } 181 else { 182 String txt = "No transport mapping for address class: "+ 183 destAddress.getClass().getName()+"="+destAddress; 184 logger.error(txt); 185 throw new IOException (txt); 186 } 187 } 188 189 199 public TransportMapping getTransport(Address destAddress) { 200 Class addressClass = destAddress.getClass(); 201 TransportMapping transport = 202 (TransportMapping) transportMappings.get(addressClass); 203 return transport; 204 } 205 206 221 protected void dispatchMessage(TransportMapping sourceTransport, 222 MessageProcessingModel mp, 223 Address incomingAddress, 224 BERInputStream wholeMessage) throws IOException { 225 MutablePDU pdu = new MutablePDU(); 226 Integer32 messageProcessingModel = new Integer32(); 227 Integer32 securityModel = new Integer32(); 228 OctetString securityName = new OctetString(); 229 Integer32 securityLevel = new Integer32(); 230 231 PduHandle handle = createPduHandle(); 232 233 Integer32 maxSizeRespPDU = 234 new Integer32(sourceTransport.getMaxInboundMessageSize()); 235 StatusInformation statusInfo = new StatusInformation(); 236 MutableStateReference mutableStateReference = new MutableStateReference(); 237 StateReference stateReference = new StateReference(); 240 stateReference.setTransportMapping(sourceTransport); 241 stateReference.setAddress(incomingAddress); 242 mutableStateReference.setStateReference(stateReference); 243 244 int status = mp.prepareDataElements(this, incomingAddress, wholeMessage, 245 messageProcessingModel, securityModel, 246 securityName, securityLevel, pdu, 247 handle, maxSizeRespPDU, statusInfo, 248 mutableStateReference); 249 if (mutableStateReference.getStateReference() != null) { 250 mutableStateReference. 252 getStateReference().setTransportMapping(sourceTransport); 253 } 254 if (status == SnmpConstants.SNMP_ERROR_SUCCESS) { 255 CommandResponderEvent e = 257 new CommandResponderEvent(this, 258 sourceTransport, 259 incomingAddress, 260 messageProcessingModel.getValue(), 261 securityModel.getValue(), 262 securityName.getValue(), 263 securityLevel.getValue(), 264 handle, 265 pdu.getPdu(), 266 maxSizeRespPDU.getValue(), 267 mutableStateReference.getStateReference()); 268 fireProcessPdu(e); 269 } 270 else { 271 switch (status) { 272 case SnmpConstants.SNMP_MP_UNSUPPORTED_SECURITY_MODEL: 273 case SnmpConstants.SNMP_MP_WRONG_USER_NAME: 274 case SnmpConstants.SNMP_MP_USM_ERROR: { 275 AuthenticationFailureEvent event = 276 new AuthenticationFailureEvent(this, incomingAddress, 277 sourceTransport, status, 278 wholeMessage); 279 fireAuthenticationFailure(event); 280 break; 281 } 282 } 283 logger.warn(statusInfo.toString()); 284 } 285 } 286 287 public void processMessage(TransportMapping sourceTransport, 288 Address incomingAddress, 289 ByteBuffer wholeMessage) { 290 processMessage(sourceTransport, incomingAddress, 291 new BERInputStream(wholeMessage)); 292 } 293 294 public void processMessage(TransportMapping sourceTransport, 295 Address incomingAddress, 296 BERInputStream wholeMessage) { 297 fireIncrementCounter(new CounterEvent(this, SnmpConstants.snmpInPkts)); 298 if (!wholeMessage.markSupported()) { 299 String txt = "Message stream must support marks"; 300 logger.error(txt); 301 throw new IllegalArgumentException (txt); 302 } 303 try { 304 wholeMessage.mark(16); 305 BER.MutableByte type = new BER.MutableByte(); 306 BER.decodeHeader(wholeMessage, type, false); 309 if (type.getValue() != BER.SEQUENCE) { 310 logger.error("ASN.1 parse error (message is not a sequence)"); 311 CounterEvent event = new CounterEvent(this, 312 SnmpConstants.snmpInASNParseErrs); 313 fireIncrementCounter(event); 314 } 315 Integer32 version = new Integer32(); 316 version.decodeBER(wholeMessage); 317 MessageProcessingModel mp = getMessageProcessingModel(version.getValue()); 318 if (mp == null) { 319 logger.warn("SNMP version "+version+" is not supported"); 320 CounterEvent event = new CounterEvent(this, 321 SnmpConstants.snmpInBadVersions); 322 fireIncrementCounter(event); 323 } 324 else { 325 wholeMessage.reset(); 327 dispatchMessage(sourceTransport, mp, incomingAddress, wholeMessage); 329 } 330 } 331 catch (Exception ex) { 332 logger.error(ex); 333 if (logger.isDebugEnabled()) { 334 ex.printStackTrace(); 335 } 336 if (SNMP4JSettings.isFowardRuntimeExceptions()) { 337 throw new RuntimeException (ex); 338 } 339 } 340 catch (OutOfMemoryError oex) { 341 logger.error(oex); 342 if (SNMP4JSettings.isFowardRuntimeExceptions()) { 343 throw oex; 344 } 345 } 346 } 347 348 public PduHandle sendPdu(Address transportAddress, 349 int messageProcessingModel, 350 int securityModel, 351 byte[] securityName, 352 int securityLevel, 353 PDU pdu, 354 boolean expectResponse) throws MessageException { 355 return sendPdu(null, transportAddress, messageProcessingModel, 356 securityModel, securityName, securityLevel, 357 pdu, expectResponse); 358 } 359 360 public PduHandle sendPdu(TransportMapping transport, 361 Address transportAddress, 362 int messageProcessingModel, 363 int securityModel, 364 byte[] securityName, 365 int securityLevel, 366 PDU pdu, 367 boolean expectResponse, 368 PduHandleCallback pduHandleCallback) 369 throws MessageException 370 { 371 try { 372 MessageProcessingModel mp = 373 getMessageProcessingModel(messageProcessingModel); 374 if (mp == null) { 375 throw new MessageException("Unsupported message processing model: " 376 + messageProcessingModel); 377 } 378 if (!mp.isProtocolVersionSupported(messageProcessingModel)) { 379 throw new MessageException("SNMP version "+messageProcessingModel+ 380 " is not supported "+ 381 "by message processing model "+ 382 messageProcessingModel); 383 } 384 if (transport == null) { 385 transport = getTransport(transportAddress); 386 } 387 if (transport == null) { 388 throw new UnsupportedAddressClassException( 389 "Unsupported address class (transport mapping): "+ 390 transportAddress.getClass().getName(), 391 transportAddress.getClass()); 392 } 393 else if (pdu.isConfirmedPdu()) { 394 checkListening4ConfirmedPDU(pdu, transportAddress, transport); 395 } 396 397 checkOutgoingMsg(transportAddress, messageProcessingModel, pdu); 399 400 PduHandle pduHandle; 403 if ((pdu.getRequestID().getValue() == 0) && 404 (pdu.getType() != PDU.RESPONSE)) { 405 pduHandle = createPduHandle(); 406 } 407 else { 408 pduHandle = new PduHandle(pdu.getRequestID().getValue()); 409 } 410 411 pdu.setRequestID(new Integer32(pduHandle.getTransactionID())); 413 414 GenericAddress destAddress = new GenericAddress(); 416 417 BEROutputStream outgoingMessage = new BEROutputStream(); 418 int status = mp.prepareOutgoingMessage(transportAddress, 419 transport.getMaxInboundMessageSize(), 420 messageProcessingModel, 421 securityModel, 422 securityName, 423 securityLevel, 424 pdu, 425 expectResponse, 426 pduHandle, 427 destAddress, 428 outgoingMessage); 429 430 if (status == SnmpConstants.SNMP_ERROR_SUCCESS) { 431 if (pduHandleCallback != null) { 433 pduHandleCallback.pduHandleAssigned(pduHandle, pdu); 434 } 435 byte[] messageBytes = outgoingMessage.getBuffer().array(); 436 sendMessage(transport, transportAddress, messageBytes); 437 } 438 else { 439 throw new MessageException("Message processing model "+ 440 mp.getID()+" returned error: "+status); 441 } 442 return pduHandle; 443 } 444 catch (IndexOutOfBoundsException iobex) { 445 throw new MessageException("Unsupported message processing model: " 446 + messageProcessingModel); 447 } 448 catch (MessageException mex) { 449 if (logger.isDebugEnabled()) { 450 mex.printStackTrace(); 451 } 452 throw mex; 453 } 454 catch (IOException iox) { 455 if (logger.isDebugEnabled()) { 456 iox.printStackTrace(); 457 } 458 throw new MessageException(iox.getMessage()); 459 } 460 } 461 462 private static void checkListening4ConfirmedPDU(PDU pdu, Address target, 463 TransportMapping transport) { 464 if ((transport != null) && (!transport.isListening())) { 465 logger.warn("Sending confirmed PDU "+pdu+" to target "+target+ 466 " although transport mapping "+transport+ 467 " is not listening for a response"); 468 } 469 } 470 471 482 protected void checkOutgoingMsg(Address transportAddress, 483 int messageProcessingModel, PDU pdu) 484 throws MessageException 485 { 486 if (checkOutgoingMsg) { 487 if (messageProcessingModel == MessageProcessingModel.MPv1) { 488 if (pdu.getType() == PDU.GETBULK) { 489 logger.warn("Converting GETBULK PDU to GETNEXT for SNMPv1 target: "+ 490 transportAddress); 491 pdu.setType(PDU.GETNEXT); 492 pdu.setMaxRepetitions(0); 493 } 494 } 495 } 496 } 497 498 public int returnResponsePdu(int messageProcessingModel, 499 int securityModel, 500 byte[] securityName, 501 int securityLevel, 502 PDU pdu, 503 int maxSizeResponseScopedPDU, 504 StateReference stateReference, 505 StatusInformation statusInformation) 506 throws MessageException 507 { 508 try { 509 MessageProcessingModel mp = 510 getMessageProcessingModel(messageProcessingModel); 511 if (mp == null) { 512 throw new MessageException("Unsupported message processing model: " 513 + messageProcessingModel); 514 } 515 TransportMapping transport = stateReference.getTransportMapping(); 516 if (transport == null) { 517 transport = getTransport(stateReference.getAddress()); 518 } 519 if (transport == null) { 520 throw new MessageException("Unsupported address class (transport mapping): "+ 521 stateReference.getAddress().getClass().getName()); 522 } 523 BEROutputStream outgoingMessage = new BEROutputStream(); 524 int status = mp.prepareResponseMessage(messageProcessingModel, 525 transport.getMaxInboundMessageSize(), 526 securityModel, 527 securityName, securityLevel, pdu, 528 maxSizeResponseScopedPDU, 529 stateReference, 530 statusInformation, 531 outgoingMessage); 532 if (status == SnmpConstants.SNMP_MP_OK) { 533 sendMessage(transport, 534 stateReference.getAddress(), 535 outgoingMessage.getBuffer().array()); 536 } 537 return status; 538 } 539 catch (ArrayIndexOutOfBoundsException aex) { 540 throw new MessageException("Unsupported message processing model: " 541 + messageProcessingModel); 542 } 543 catch (IOException iox) { 544 throw new MessageException(iox.getMessage()); 545 } 546 } 547 548 public void releaseStateReference(int messageProcessingModel, 549 PduHandle pduHandle) { 550 MessageProcessingModel mp = getMessageProcessingModel(messageProcessingModel); 551 if (mp == null) { 552 throw new IllegalArgumentException ("Unsupported message processing model: "+ 553 messageProcessingModel); 554 } 555 mp.releaseStateReference(pduHandle); 556 } 557 558 public synchronized void removeCommandResponder(CommandResponder l) { 559 if (commandResponderListeners != null && commandResponderListeners.contains(l)) { 560 Vector v = (Vector) commandResponderListeners.clone(); 561 v.removeElement(l); 562 commandResponderListeners = v; 563 } 564 } 565 566 public synchronized void addCommandResponder(CommandResponder l) { 567 Vector v = (commandResponderListeners == null) ? 568 new Vector(2) : (Vector) commandResponderListeners.clone(); 569 if (!v.contains(l)) { 570 v.addElement(l); 571 commandResponderListeners = v; 572 } 573 } 574 575 582 protected void fireProcessPdu(CommandResponderEvent e) { 583 if (commandResponderListeners != null) { 584 Vector listeners = commandResponderListeners; 585 int count = listeners.size(); 586 for (int i = 0; i < count; i++) { 587 ((CommandResponder) listeners.elementAt(i)).processPdu(e); 588 if (e.isProcessed()) { 591 return; 592 } 593 } 594 } 595 } 596 597 608 public MessageProcessingModel getMessageProcessingModel(int messageProcessingModel) { 609 try { 610 return (MessageProcessingModel) mpm.get(messageProcessingModel); 611 } 612 catch (IndexOutOfBoundsException iobex) { 613 return null; 614 } 615 } 616 617 622 public synchronized void removeCounterListener(CounterListener counterListener) { 623 if (counterListeners != null && counterListeners.contains(counterListener)) { 624 Vector v = (Vector) counterListeners.clone(); 625 v.removeElement(counterListener); 626 counterListeners = v; 627 } 628 } 629 630 636 public synchronized void addCounterListener(CounterListener counterListener) { 637 Vector v = (counterListeners == null) ? 638 new Vector(2) : (Vector) counterListeners.clone(); 639 if (!v.contains(counterListener)) { 640 v.addElement(counterListener); 641 counterListeners = v; 642 } 643 } 644 645 651 protected void fireIncrementCounter(CounterEvent event) { 652 if (counterListeners != null) { 653 Vector listeners = counterListeners; 654 int count = listeners.size(); 655 for (int i = 0; i < count; i++) { 656 ((CounterListener) listeners.elementAt(i)).incrementCounter(event); 657 } 658 } 659 } 660 661 676 public void setCheckOutgoingMsg(boolean checkOutgoingMsg) { 677 this.checkOutgoingMsg = checkOutgoingMsg; 678 } 679 680 686 public boolean isCheckOutgoingMsg() { 687 return checkOutgoingMsg; 688 } 689 690 697 public synchronized void addAuthenticationFailureListener( 698 AuthenticationFailureListener l) { 699 Vector v = (authenticationFailureListeners == null) ? 700 new Vector(2) : (Vector) authenticationFailureListeners.clone(); 701 if (!v.contains(l)) { 702 v.addElement(l); 703 authenticationFailureListeners = v; 704 } 705 } 706 707 712 public synchronized void removeAuthenticationFailureListener( 713 AuthenticationFailureListener l) { 714 if (authenticationFailureListeners != null && 715 authenticationFailureListeners.contains(l)) { 716 Vector v = (Vector) authenticationFailureListeners.clone(); 717 v.removeElement(l); 718 authenticationFailureListeners = v; 719 } 720 } 721 722 728 protected void fireAuthenticationFailure(AuthenticationFailureEvent event) { 729 if (authenticationFailureListeners != null) { 730 Vector listeners = authenticationFailureListeners; 731 int count = listeners.size(); 732 for (int i = 0; i < count; i++) { 733 ((AuthenticationFailureListener) listeners.elementAt(i)). 734 authenticationFailure(event); 735 } 736 } 737 } 738 739 public PduHandle sendPdu(TransportMapping transportMapping, 740 Address transportAddress, 741 int messageProcessingModel, 742 int securityModel, byte[] securityName, 743 int securityLevel, PDU pdu, 744 boolean expectResponse) throws MessageException { 745 return sendPdu(transportMapping, transportAddress, messageProcessingModel, 746 securityModel, 747 securityName, securityLevel, pdu, expectResponse, null); 748 } 749 750 } 751 752 | Popular Tags |