1 11 12 13 package com.sun.jmx.snmp.daemon; 14 15 16 17 import java.io.ObjectInputStream ; 20 import java.io.IOException ; 21 import java.net.InetAddress ; 22 import java.util.Date ; 23 import java.util.Vector ; 24 import java.util.Enumeration ; 25 26 import javax.management.MBeanServer ; 29 import javax.management.MBeanRegistration ; 30 import javax.management.ObjectName ; 31 import javax.management.NotificationListener ; 32 import javax.management.NotificationFilter ; 33 import javax.management.NotificationBroadcaster ; 34 import javax.management.NotificationBroadcasterSupport ; 35 import javax.management.MBeanNotificationInfo ; 36 import javax.management.AttributeChangeNotification ; 37 import javax.management.ListenerNotFoundException ; 38 import javax.management.loading.ClassLoaderRepository ; 39 import javax.management.MBeanServerFactory ; 40 41 import com.sun.jmx.trace.Trace; 44 45 import javax.management.remote.MBeanServerForwarder ; 53 54 103 104 public abstract class CommunicatorServer 105 implements Runnable , MBeanRegistration , NotificationBroadcaster , 106 CommunicatorServerMBean { 107 108 112 115 public static final int ONLINE = 0 ; 116 117 120 public static final int OFFLINE = 1 ; 121 122 125 public static final int STOPPING = 2 ; 126 127 130 public static final int STARTING = 3 ; 131 132 136 139 141 144 146 149 151 154 public static final int SNMP_TYPE = 4 ; 155 156 159 161 165 168 transient volatile int state = OFFLINE ; 169 170 174 ObjectName objectName ; 175 176 MBeanServer topMBS; 177 MBeanServer bottomMBS; 178 179 181 transient String dbgTag = null ; 182 183 188 int maxActiveClientCount = 1 ; 189 190 192 transient int servedClientCount = 0 ; 193 194 198 String host = null ; 199 200 204 int port = -1 ; 205 206 207 211 214 private transient Object stateLock = new Object (); 215 216 private transient Vector clientHandlerVector = new Vector () ; 217 218 private transient Thread fatherThread = Thread.currentThread() ; 219 private transient Thread mainThread = null ; 220 221 private volatile boolean stopRequested = false ; 222 private boolean interrupted = false; 223 private transient Exception startException = null; 224 225 private transient long notifCount = 0; 227 private transient NotificationBroadcasterSupport notifBroadcaster = 228 new NotificationBroadcasterSupport (); 229 private transient MBeanNotificationInfo [] notifInfos = null; 230 231 232 241 public CommunicatorServer(int connectorType) 242 throws IllegalArgumentException { 243 switch (connectorType) { 244 case SNMP_TYPE : 245 infoType = Trace.INFO_ADAPTOR_SNMP ; 246 break; 247 default: 248 throw new IllegalArgumentException ("Invalid connector Type") ; 249 } 250 dbgTag = makeDebugTag() ; 251 } 252 253 protected Thread createMainThread() { 254 return new Thread (this, makeThreadName()); 255 } 256 257 274 public void start(long timeout) 275 throws CommunicationException, InterruptedException { 276 boolean start; 277 278 synchronized (stateLock) { 279 if (state == STOPPING) { 280 waitState(OFFLINE, 60000); 283 } 284 start = (state == OFFLINE); 285 if (start) { 286 changeState(STARTING); 287 stopRequested = false; 288 interrupted = false; 289 startException = null; 290 } 291 } 292 293 if (!start) { 294 if (isTraceOn()) 295 trace("start","Connector is not OFFLINE") ; 296 return; 297 } 298 299 if (isTraceOn()) 300 trace("start","--> Start connector ") ; 301 302 mainThread = createMainThread(); 303 304 mainThread.start() ; 305 306 if (timeout > 0) waitForStart(timeout); 307 } 308 309 315 public void start() { 316 try { 317 start(0); 318 } catch (InterruptedException x) { 319 trace("start","interrupted: " + x); 321 } 322 } 323 324 330 public void stop() { 331 synchronized (stateLock) { 332 if (state == OFFLINE || state == STOPPING) { 333 if (isTraceOn()) 334 trace("stop","Connector is not ONLINE") ; 335 return; 336 } 337 changeState(STOPPING); 338 if (isTraceOn()) 342 trace("stop","Interrupt main thread") ; 343 stopRequested = true ; 344 if (!interrupted) { 345 interrupted = true; 346 mainThread.interrupt(); 347 } 348 } 349 350 if (isTraceOn()) { 354 trace("stop","terminateAllClient") ; 355 } 356 terminateAllClient() ; 357 358 synchronized (stateLock) { 362 if (state == STARTING) 363 changeState(OFFLINE); 364 } 365 } 366 367 372 public boolean isActive() { 373 synchronized (stateLock) { 374 return (state == ONLINE); 375 } 376 } 377 378 410 public boolean waitState(int wantedState, long timeOut) { 411 if (isTraceOn()) 412 trace("waitState", wantedState + "(0on,1off,2st) TO=" + timeOut + 413 " ; current state = " + getStateString()); 414 415 long endTime = 0; 416 if (timeOut > 0) 417 endTime = System.currentTimeMillis() + timeOut; 418 419 synchronized (stateLock) { 420 while (state != wantedState) { 421 if (timeOut < 0) { 422 if (isTraceOn()) 423 trace("waitState", "timeOut < 0, return without wait"); 424 return false; 425 } else { 426 try { 427 if (timeOut > 0) { 428 long toWait = endTime - System.currentTimeMillis(); 429 if (toWait <= 0) { 430 if (isTraceOn()) 431 trace("waitState", "timed out"); 432 return false; 433 } 434 stateLock.wait(toWait); 435 } else { stateLock.wait(); 437 } 438 } catch (InterruptedException e) { 439 if (isTraceOn()) 440 trace("waitState", "wait interrupted"); 441 return (state == wantedState); 442 } 443 } 444 } 445 if (isTraceOn()) 446 trace("waitState", "returning in desired state"); 447 return true; 448 } 449 } 450 451 468 private void waitForStart(long timeout) 469 throws CommunicationException, InterruptedException { 470 if (isTraceOn()) 471 trace("waitForStart", "Timeout=" + timeout + 472 " ; current state = " + getStateString()); 473 474 final long startTime = System.currentTimeMillis(); 475 476 synchronized (stateLock) { 477 while (state == STARTING) { 478 final long elapsed = System.currentTimeMillis() - startTime; 481 482 final long remainingTime = timeout-elapsed; 488 489 if (remainingTime < 0) { 492 if (isTraceOn()) 493 trace("waitForStart", 494 "timeout < 0, return without wait"); 495 throw new InterruptedException ("Timeout expired"); 496 } 497 498 try { 503 stateLock.wait(remainingTime); 504 } catch (InterruptedException e) { 505 if (isTraceOn()) 506 trace("waitForStart", "wait interrupted"); 507 508 if (state != ONLINE) throw e; 513 } 514 } 515 516 if (state == ONLINE) { 519 if (isTraceOn()) trace("waitForStart", "started"); 522 return; 523 } else if (startException instanceof CommunicationException) { 524 throw (CommunicationException)startException; 528 } else if (startException instanceof InterruptedException ) { 529 throw (InterruptedException )startException; 533 } else if (startException != null) { 534 throw new CommunicationException(startException, 538 "Failed to start: "+ 539 startException); 540 } else { 541 throw new CommunicationException("Failed to start: state is "+ 545 getStringForState(state)); 546 } 547 } 548 } 549 550 556 public int getState() { 557 synchronized (stateLock) { 558 return state ; 559 } 560 } 561 562 568 public String getStateString() { 569 return getStringForState(state) ; 570 } 571 572 577 public String getHost() { 578 try { 579 host = InetAddress.getLocalHost().getHostName(); 580 } catch (Exception e) { 581 host = "Unknown host"; 582 } 583 return host ; 584 } 585 586 591 public int getPort() { 592 synchronized (stateLock) { 593 return port ; 594 } 595 } 596 597 606 public void setPort(int port) throws java.lang.IllegalStateException { 607 synchronized (stateLock) { 608 if ((state == ONLINE) || (state == STARTING)) 609 throw new IllegalStateException ("Stop server before " + 610 "carrying out this operation"); 611 this.port = port; 612 dbgTag = makeDebugTag(); 613 } 614 } 615 616 620 public abstract String getProtocol() ; 621 622 631 int getServedClientCount() { 632 return servedClientCount ; 633 } 634 635 642 int getActiveClientCount() { 643 int result = clientHandlerVector.size() ; 644 return result ; 645 } 646 647 655 int getMaxActiveClientCount() { 656 return maxActiveClientCount ; 657 } 658 659 668 void setMaxActiveClientCount(int c) 669 throws java.lang.IllegalStateException { 670 synchronized (stateLock) { 671 if ((state == ONLINE) || (state == STARTING)) { 672 throw new IllegalStateException ( 673 "Stop server before carrying out this operation"); 674 } 675 maxActiveClientCount = c ; 676 } 677 } 678 679 682 void notifyClientHandlerCreated(ClientHandler h) { 683 clientHandlerVector.addElement(h) ; 684 } 685 686 689 synchronized void notifyClientHandlerDeleted(ClientHandler h) { 690 clientHandlerVector.removeElement(h); 691 notifyAll(); 692 } 693 694 698 protected int getBindTries() { 699 return 50; 700 } 701 702 706 protected long getBindSleepTime() { 707 return 100; 708 } 709 710 715 public void run() { 716 717 int i = 0; 723 boolean success = false; 724 725 try { 729 final int bindRetries = getBindTries(); 732 final long sleepTime = getBindSleepTime(); 733 while (i < bindRetries && !success) { 734 try { 735 doBind(); 738 success = true; 739 } catch (CommunicationException ce) { 740 i++; 741 try { 742 Thread.sleep(sleepTime); 743 } catch (InterruptedException ie) { 744 throw ie; 745 } 746 } 747 } 748 if (!success) { 751 doBind(); 754 } 755 756 } catch(Exception x) { 757 if (isDebugOn()) { 758 debug("run","Unexpected exception = "+x) ; 759 } 760 synchronized(stateLock) { 761 startException = x; 762 changeState(OFFLINE); 763 } 764 if (isTraceOn()) { 765 trace("run","State is OFFLINE") ; 766 } 767 doError(x); 768 return; 769 } 770 771 try { 772 changeState(ONLINE) ; 776 if (isTraceOn()) { 777 trace("run","State is ONLINE") ; 778 } 779 780 while (!stopRequested) { 784 servedClientCount++; 785 doReceive() ; 786 waitIfTooManyClients() ; 787 doProcess() ; 788 } 789 if (isTraceOn()) { 790 trace("run","Stop has been requested") ; 791 } 792 793 } catch(InterruptedException x) { 794 if (isTraceOn()) { 795 trace("run","Interrupt caught") ; 796 } 797 changeState(STOPPING); 798 } catch(Exception x) { 799 if (isDebugOn()) { 800 debug("run","Unexpected exception = "+x) ; 801 } 802 changeState(STOPPING); 803 } finally { 804 synchronized (stateLock) { 805 interrupted = true; 806 Thread.currentThread().interrupted(); 807 } 808 809 try { 813 doUnbind() ; 814 waitClientTermination() ; 815 changeState(OFFLINE); 816 if (isTraceOn()) { 817 trace("run","State is OFFLINE") ; 818 } 819 } catch(Exception x) { 820 if (isDebugOn()) { 821 debug("run","Unexpected exception = "+x) ; 822 } 823 changeState(OFFLINE); 824 } 825 826 } 827 } 828 829 831 protected abstract void doError(Exception e) throws CommunicationException; 832 833 853 855 protected abstract void doBind() 856 throws CommunicationException, InterruptedException ; 857 858 864 protected abstract void doReceive() 865 throws CommunicationException, InterruptedException ; 866 867 873 protected abstract void doProcess() 874 throws CommunicationException, InterruptedException ; 875 876 881 protected abstract void doUnbind() 882 throws CommunicationException, InterruptedException ; 883 884 890 public synchronized MBeanServer getMBeanServer() { 891 return topMBS; 892 } 893 894 912 public synchronized void setMBeanServer(MBeanServer newMBS) 913 throws IllegalArgumentException , IllegalStateException { 914 synchronized (stateLock) { 915 if (state == ONLINE || state == STARTING) 916 throw new IllegalStateException ("Stop server before " + 917 "carrying out this operation"); 918 } 919 final String error = 920 "MBeanServer argument must be MBean server where this " + 921 "server is registered, or an MBeanServerForwarder " + 922 "leading to that server"; 923 Vector seenMBS = new Vector (); 924 for (MBeanServer mbs = newMBS; 925 mbs != bottomMBS; 926 mbs = ((MBeanServerForwarder ) mbs).getMBeanServer()) { 927 if (!(mbs instanceof MBeanServerForwarder )) 928 throw new IllegalArgumentException (error); 929 if (seenMBS.contains(mbs)) 930 throw new IllegalArgumentException ("MBeanServerForwarder " + 931 "loop"); 932 seenMBS.addElement(mbs); 933 } 934 topMBS = newMBS; 935 } 936 937 943 ObjectName getObjectName() { 944 return objectName ; 945 } 946 947 950 void changeState(int newState) { 951 int oldState; 952 synchronized (stateLock) { 953 if (state == newState) 954 return; 955 oldState = state; 956 state = newState; 957 stateLock.notifyAll(); 958 } 959 sendStateChangeNotification(oldState, newState); 960 } 961 962 965 String makeDebugTag() { 966 return "CommunicatorServer["+ getProtocol() + ":" + getPort() + "]" ; 967 } 968 969 972 String makeThreadName() { 973 String result ; 974 975 if (objectName == null) 976 result = "CommunicatorServer" ; 977 else 978 result = objectName.toString() ; 979 980 return result ; 981 } 982 983 988 private synchronized void waitIfTooManyClients() 989 throws InterruptedException { 990 while (getActiveClientCount() >= maxActiveClientCount) { 991 if (isTraceOn()) { 992 trace("waitIfTooManyClients", 993 "Waiting for a client to terminate") ; 994 } 995 wait(); 996 } 997 } 998 999 1002 private void waitClientTermination() { 1003 int s = clientHandlerVector.size() ; 1004 if (isTraceOn()) { 1005 if (s >= 1) { 1006 trace("waitClientTermination","waiting for " + 1007 s + " clients to terminate") ; 1008 } 1009 } 1010 1011 for (Enumeration e = clientHandlerVector.elements() ; 1012 e.hasMoreElements();){ 1013 ClientHandler h = (ClientHandler)e.nextElement() ; 1014 h.join() ; 1015 } 1016 1017 if (isTraceOn()) { 1018 if (s >= 1) { 1019 trace("waitClientTermination","Ok, let's go...") ; 1020 } 1021 } 1022 } 1023 1024 1027 private void terminateAllClient() { 1028 int s = clientHandlerVector.size() ; 1029 if (isTraceOn()) { 1030 if (s >= 1) { 1031 trace("terminateAllClient","Interrupting " + s + " clients") ; 1032 } 1033 } 1034 1035 for (Enumeration e = clientHandlerVector.elements() ; 1036 e.hasMoreElements();){ 1037 ClientHandler h = (ClientHandler)e.nextElement() ; 1038 h.interrupt() ; 1039 } 1040 } 1041 1042 1045 private void readObject(ObjectInputStream stream) 1046 throws IOException , ClassNotFoundException { 1047 1048 stream.defaultReadObject(); 1051 1052 stateLock = new Object (); 1057 state = OFFLINE; 1058 stopRequested = false; 1059 servedClientCount = 0; 1060 clientHandlerVector = new Vector (); 1061 fatherThread = Thread.currentThread(); 1062 mainThread = null; 1063 notifCount = 0; 1064 notifInfos = null; 1065 notifBroadcaster = new NotificationBroadcasterSupport (); 1066 dbgTag = makeDebugTag(); 1067 } 1068 1069 1070 1074 1091 public void addNotificationListener(NotificationListener listener, 1092 NotificationFilter filter, 1093 Object handback) 1094 throws java.lang.IllegalArgumentException { 1095 1096 if (isDebugOn()) { 1097 debug("addNotificationListener","Adding listener "+ listener + 1098 " with filter "+ filter + " and handback "+ handback); 1099 } 1100 notifBroadcaster.addNotificationListener(listener, filter, handback); 1101 } 1102 1103 1113 public void removeNotificationListener(NotificationListener listener) 1114 throws ListenerNotFoundException { 1115 1116 if (isDebugOn()) { 1117 debug("removeNotificationListener","Removing listener "+ listener); 1118 } 1119 notifBroadcaster.removeNotificationListener(listener); 1120 } 1121 1122 1130 public MBeanNotificationInfo [] getNotificationInfo() { 1131 1132 if (notifInfos == null) { 1135 notifInfos = new MBeanNotificationInfo [1]; 1136 String [] notifTypes = { 1137 AttributeChangeNotification.ATTRIBUTE_CHANGE}; 1138 notifInfos[0] = new MBeanNotificationInfo ( notifTypes, 1139 AttributeChangeNotification .class.getName(), 1140 "Sent to notify that the value of the State attribute "+ 1141 "of this CommunicatorServer instance has changed."); 1142 } 1143 1144 return notifInfos; 1145 } 1146 1147 1150 private void sendStateChangeNotification(int oldState, int newState) { 1151 1152 String oldStateString = getStringForState(oldState); 1153 String newStateString = getStringForState(newState); 1154 String message = new StringBuffer ().append(dbgTag) 1155 .append(" The value of attribute State has changed from ") 1156 .append(oldState).append(" (").append(oldStateString) 1157 .append(") to ").append(newState).append(" (") 1158 .append(newStateString).append(").").toString(); 1159 1160 notifCount++; 1161 AttributeChangeNotification notif = 1162 new AttributeChangeNotification (this, notifCount, System.currentTimeMillis(), message, "State", "int", new Integer (oldState), new Integer (newState) ); 1171 if (isDebugOn()) { 1172 debug("sendStateChangeNotification", 1173 "Sending AttributeChangeNotification #"+ notifCount + 1174 " with message: "+ message); 1175 } 1176 notifBroadcaster.sendNotification(notif); 1177 } 1178 1179 1182 private static String getStringForState(int s) { 1183 switch (s) { 1184 case ONLINE: return "ONLINE"; 1185 case STARTING: return "STARTING"; 1186 case OFFLINE: return "OFFLINE"; 1187 case STOPPING: return "STOPPING"; 1188 default: return "UNDEFINED"; 1189 } 1190 } 1191 1192 1193 1197 1210 public ObjectName preRegister(MBeanServer server, ObjectName name) 1211 throws java.lang.Exception { 1212 objectName = name; 1213 synchronized (this) { 1214 if (bottomMBS != null) { 1215 throw new IllegalArgumentException ("connector already " + 1216 "registered in an MBean " + 1217 "server"); 1218 } 1219 topMBS = bottomMBS = server; 1220 } 1221 dbgTag = makeDebugTag(); 1222 return name; 1223 } 1224 1225 1231 public void postRegister(Boolean registrationDone) { 1232 if (!registrationDone.booleanValue()) { 1233 synchronized (this) { 1234 topMBS = bottomMBS = null; 1235 } 1236 } 1237 } 1238 1239 1246 public void preDeregister() throws java.lang.Exception { 1247 synchronized (this) { 1248 topMBS = bottomMBS = null; 1249 } 1250 objectName = null ; 1251 final int cstate = getState(); 1252 if ((cstate == ONLINE) || ( cstate == STARTING)) { 1253 stop() ; 1254 } 1255 } 1256 1257 1260 public void postDeregister(){ 1261 } 1262 1263 1266 Class loadClass(String className) 1267 throws ClassNotFoundException { 1268 try { 1269 return Class.forName(className); 1270 } catch (ClassNotFoundException e) { 1271 final ClassLoaderRepository clr = 1272 MBeanServerFactory.getClassLoaderRepository(bottomMBS); 1273 if (clr == null) throw new ClassNotFoundException (className); 1274 return clr.loadClass(className); 1275 } 1276 } 1277 1278 1282 1284 int infoType; 1285 1286 1288 boolean isTraceOn() { 1289 return Trace.isSelected(Trace.LEVEL_TRACE, infoType); 1290 } 1291 1292 1294 void trace(String clz, String func, String info) { 1295 Trace.send(Trace.LEVEL_TRACE, infoType, clz, func, info); 1296 } 1297 1298 1300 boolean isDebugOn() { 1301 return Trace.isSelected(Trace.LEVEL_DEBUG, infoType); 1302 } 1303 1304 1306 void debug(String clz, String func, String info) { 1307 Trace.send(Trace.LEVEL_DEBUG, infoType, clz, func, info); 1308 } 1309 1310 1312 void debug(String clz, String func, Throwable exception) { 1313 Trace.send(Trace.LEVEL_DEBUG, infoType, clz, func, exception); 1314 } 1315 1316 1318 void trace(String func, String info) { 1319 trace(dbgTag, func, info); 1320 } 1321 1322 1324 void debug(String func, String info) { 1325 debug(dbgTag, func, info); 1326 } 1327 1328 1330 void debug(String func, Throwable exception) { 1331 debug(dbgTag, func, exception); 1332 } 1333} 1334 | Popular Tags |