1 23 24 package com.sun.enterprise.admin.jmx.remote.server.notification; 25 26 import java.io.*; 27 import java.nio.channels.ClosedChannelException ; 28 import java.net.SocketException ; 29 import java.rmi.ConnectException ; 30 import java.rmi.ConnectIOException ; 31 import java.util.logging.Logger ; 32 33 import javax.management.*; 34 35 import com.sun.enterprise.admin.jmx.remote.DefaultConfiguration; 36 import com.sun.enterprise.admin.jmx.remote.notification.SimpleQueue; 37 import com.sun.enterprise.admin.jmx.remote.notification.NotificationWrapper; 38 39 45 public class NotificationConnection implements Runnable { 46 private SimpleQueue que = new SimpleQueue(); 47 private int bufsiz = 0; 48 private Thread dispatchThr = null; 49 private long lastNotifTime = 0; 50 51 private OutputStream out = null; 52 53 private boolean exiting = false; 54 private boolean dispatching = false; 55 private boolean isIOException = false; 56 57 private static final Logger logger = Logger.getLogger( 58 DefaultConfiguration.JMXCONNECTOR_LOGGER); 60 61 public NotificationConnection(OutputStream out, int bufsiz) { 62 this.out = out; 63 if (bufsiz <= DefaultConfiguration.NOTIF_MIN_BUFSIZ) 64 this.bufsiz = DefaultConfiguration.NOTIF_MAX_BUFSIZ; 65 else 66 this.bufsiz = bufsiz; 67 dispatchThr = new Thread (this); 68 dispatchThr.start(); 69 } 70 71 76 public void reinit(OutputStream out) { 77 this.out = out; 78 isIOException = false; 79 dispatchThr = new Thread (this); 80 dispatchThr.start(); 81 } 82 83 90 public boolean hasIOExceptionOccurred() { 91 return isIOException; 92 } 93 94 private boolean isIdle() { 95 boolean ret = ((System.currentTimeMillis() - lastNotifTime) >= DefaultConfiguration.NOTIF_WAIT_INTERVAL); 96 return ret; 97 } 98 99 103 public void fireWaitNotif() { 104 if (!hasIOExceptionOccurred() && (que.size() < bufsiz) && !dispatching && isIdle()) { 105 synchronized (que) { 106 que.add(new NotificationWrapper(NotificationWrapper.WAIT, null, null)); 107 que.notify(); 108 } 109 } 110 } 111 112 116 public void fireNotification(NotificationWrapper wrapr) { 117 if (que.size() < bufsiz) { 118 synchronized (que) { 119 que.add(wrapr); 120 que.notify(); 121 } 122 } 123 } 124 125 130 public void close() { 131 exiting = true; 132 synchronized (que) { 133 que.notify(); 134 } 135 try { 136 dispatchThr.join(); 137 } catch (InterruptedException intre) { 138 } 139 try { 140 out.close(); 141 } catch (IOException ioe) { 142 } 144 } 145 146 public boolean isExiting() { 147 return exiting; 148 } 149 150 156 public void run() { 157 162 while (!isExiting() && !hasIOExceptionOccurred()) { 163 synchronized (que) { 164 while (que.isEmpty() && !isExiting() && !hasIOExceptionOccurred()) { 165 try { 166 que.wait(); 167 } catch (InterruptedException intre) { 168 } 169 } 170 } 171 if (isExiting() || hasIOExceptionOccurred()) 172 break; 173 dispatching = true; 174 while (!que.isEmpty() && !isExiting() && !hasIOExceptionOccurred()) { 175 NotificationWrapper wrapr = (NotificationWrapper) que.remove(); 176 try { 177 sendNotificationMsg(wrapr); 178 } catch (IOException ioe) { 179 if (isExiting()) 180 break; 181 if (!isDisconnected(ioe)) 183 break; 184 isIOException = true; 185 synchronized (this) { 186 this.notify(); 187 } 188 break; 189 } 190 } 191 lastNotifTime = System.currentTimeMillis(); 192 dispatching = false; 193 } 194 } 195 196 private boolean isDisconnected(IOException ioe) { 197 if (ioe instanceof ClosedChannelException || 198 ioe instanceof SocketException || 199 ioe instanceof ConnectException || 200 ioe instanceof ConnectIOException ) 201 return true; 202 return false; 203 } 204 205 private void sendNotificationMsg(NotificationWrapper wrapr) throws IOException { 206 ObjectOutputStream objout = new ObjectOutputStream(out); 207 objout.writeObject(wrapr); 208 objout.flush(); 209 } 210 } 211 212 | Popular Tags |