1 6 7 package com.quikj.application.web.oamp.plugin; 8 9 import org.xml.sax.*; 11 import org.w3c.dom.*; 12 13 import java.net.*; 14 import java.io.*; 15 import java.util.*; 16 17 import com.quikj.server.framework.*; 18 import com.quikj.server.web.*; 19 import com.quikj.application.web.oamp.messaging.*; 20 21 25 public class LogMessageMonitor extends AceThread 26 { 27 28 29 public LogMessageMonitor() 30 throws IOException, UnknownHostException, AceException 31 { 32 super("LogMessageMonitor"); 33 } 34 35 public void dispose() 36 { 37 if (interruptWait(AceSignalMessage.SIGNAL_TERM, 40 "Request to kill the thread received") == false) 41 { 42 AceLogger.Instance().log(AceLogger.ERROR, 43 AceLogger.SYSTEM_LOG, 44 "LogMessageMonitor.dispose() -- Error occured while sending signal : " 45 + getErrorMessage()); 46 } 47 48 if (readerThread != null) 49 { 50 readerThread.dispose(); 51 readerThread = null; 52 } 53 54 if (timerId != -1) 55 { 56 try 57 { 58 AceTimer.Instance().cancelTimer(timerId); 59 timerId = -1; 60 } 61 catch (IOException ex) 62 { 63 ; 64 } 65 } 66 67 super.dispose(); 68 } 69 70 public void run() 71 { 72 try 73 { 74 establishCommunications(); 75 } 76 catch (Exception ex) 77 { 78 dispose(); 79 return; 80 } 81 82 while (true) 83 { 84 AceMessageInterface message = waitMessage(); 85 if (message == null) 86 { 87 AceLogger.Instance().log(AceLogger.FATAL, AceLogger.SYSTEM_LOG, 89 "LogMessageMonitor.run() -- An event is received with no event message : " 90 + getErrorMessage()); 91 dispose(); 92 return; 93 } 94 95 96 if ((message instanceof AceTimerMessage) == true) 97 { 98 timerId = -1; 99 100 try 101 { 102 establishCommunications(); 104 } 105 catch (Exception ex) 106 { 107 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 108 "LogMessageMonitor.run() -- Exception " + ex.getClass().getName() 109 + " occured while establishing communications. " 110 + ex.getMessage()); 111 } 112 } 113 114 else if ((message instanceof AceInputSocketStreamMessage) == true) 115 { 116 AceInputSocketStreamMessage socket_message = (AceInputSocketStreamMessage)message; 117 int status = socket_message.getStatus(); 118 switch (status) 119 { 120 case AceInputSocketStreamMessage.EOF_REACHED: 121 communicationUp = false; 122 123 if (readerThread != null) 124 { 125 readerThread.dispose(); 126 readerThread = null; 127 } 128 129 try 131 { 132 establishCommunications(); 133 } 134 catch (Exception ex) 135 { 136 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 137 "LogMessageMonitor.run() -- Exception " + ex.getClass().getName() 138 + " occured while establishing communications. " 139 + ex.getMessage()); 140 } 141 break; 142 143 case AceInputSocketStreamMessage.READ_COMPLETED: 144 try 145 { 146 Document doc = null; 147 try 148 { 149 InputSource is = new InputSource(new StringReader(socket_message.getLines())); 150 doc = XMLBuilder.Instance().getDocumentBuilder().parse(is); 151 } 152 catch (IOException ex) 153 { 154 AceLogger.Instance().log(AceLogger.WARNING, AceLogger.SYSTEM_LOG, 155 "LogMessageMonitor.run() -- Could not parse the received log message, ignoring: " 156 + ex.getMessage()); 157 continue; 158 } 159 160 try 162 { 163 AceLogMessageParser parser = new AceLogMessageParser(0, doc); 164 165 AceLogMessageInterface msg = parser.getMessageElement(); 167 168 if ((msg instanceof AceLogMessage) == false) 170 { 171 AceLogger.Instance().log(AceLogger.WARNING, 172 AceLogger.SYSTEM_LOG, 173 "AceLogger.run() -- A message other than log_message is received, ignoring"); 174 continue; 175 } 176 177 formatAndBroadcastLog((AceLogMessage)msg); 178 179 } 180 catch (AceException ex) 181 { 182 AceLogger.Instance().log(AceLogger.WARNING, 183 AceLogger.SYSTEM_LOG, 184 "LogMessageMonitor.run() -- Error parsing received message" 185 + '\n' 186 + ex.getMessage()); 187 continue; 188 } 189 } 190 catch (SAXException ex1) 191 { 192 194 AceLogger.Instance().log(AceLogger.WARNING, 196 AceLogger.SYSTEM_LOG, 197 "LogMessageMontor.run() -- Error parsing trace information message" 198 + '\n' 199 + ex1.getMessage()); 200 continue; 201 } 202 break; 203 } 204 } 205 else if ((message instanceof AceSignalMessage) == true) 206 { 207 return; 208 } 209 else 210 { 211 AceLogger.Instance().log(AceLogger.WARNING, AceLogger.SYSTEM_LOG, 213 "LogProcessor.run() -- An unexpected event is received : " 214 + message.messageType()); 215 return; 216 217 } 218 } 219 } 220 221 private void formatAndBroadcastLog(AceLogMessage message) 222 { 223 String formatted = format(message.getTimeStamp(), 224 message.getHostName(), 225 message.getProcessName(), 226 message.getProcessInstance(), 227 message.getSeverity(), 228 message.getMessage(), 229 message.getMessageId()); 230 231 233 OAMPClient[] clients = OAMPClientList.Instance().getElements(); 235 236 LogRequestMessage req = new LogRequestMessage(); 237 req.setSeverity(message.getSeverity()); 238 req.setMessage(formatted); 239 for (int i = 0; i < clients.length; i++) 240 { 241 if (clients[i].isAdminLevel() == true) 242 { 243 if (clients[i].getParent().sendEvent(new OAMPMessageEvent(null, 244 req, 0, null, 245 OAMPSystemMessageParser.OAMP_SYSTEM_FEATURE_NAME)) == false) 246 { 247 System.err.println("Error sending log message to endpoint " 248 + clients[i].getParent()); 249 } 250 } 251 } 252 } 253 254 private String format(long time_stamp, 255 String host, 256 String process_name, 257 int process_instance, 258 int severity, 259 String message, 260 int msg_id) 261 { 262 StringBuffer buffer = new StringBuffer (); 263 264 Date date = new Date(time_stamp); 265 266 if (msg_id != -1) 267 { 268 buffer.append(pad0(process_instance, MSG_ID_LEN - 1)); 269 } 270 271 pad(buffer, MSG_ID_LEN); 272 273 buffer.append(AceLogger.SEVERITY_S[severity]); 274 pad(buffer, MSG_ID_LEN + AceLogger.MAX_SEVERITY_LENGTH+1); 275 276 buffer.append(host); 277 pad(buffer, MSG_ID_LEN + AceLogger.MAX_SEVERITY_LENGTH+1 + HOST_NAME_LEN); 278 279 buffer.append(process_name); 280 pad(buffer, MSG_ID_LEN + AceLogger.MAX_SEVERITY_LENGTH+1 + HOST_NAME_LEN + PROCESS_NAME_LEN); 281 282 buffer.append(pad0(process_instance, PROCESS_INSTANCE_LEN - 1)); 283 pad(buffer, MSG_ID_LEN + AceLogger.MAX_SEVERITY_LENGTH+1 + HOST_NAME_LEN + PROCESS_NAME_LEN + PROCESS_INSTANCE_LEN); 284 285 buffer.append(message); 286 287 return date.toString() 288 + " " 289 + buffer.toString() 290 + "\n"; 291 } 292 293 private void pad(StringBuffer buffer, int length) 294 { 295 int num_pad = length - buffer.length(); 296 297 if (num_pad > 0) 298 { 299 char[] char_pad = new char[num_pad]; 300 for (int i = 0; i < num_pad; i++) 301 { 302 char_pad[i] = ' '; 303 } 304 buffer.append(char_pad); 305 } 306 } 307 308 private String pad0(int num, int size) 309 { 310 String pad_str = ""; 311 String num_str = new String ((new Integer (num)).toString()); 312 313 int num_pad = size - num_str.length(); 314 315 if (num_pad > 0) 316 { 317 char[] char_pad = new char[num_pad]; 318 for (int i = 0; i < num_pad; i++) 319 { 320 char_pad[i] = '0'; 321 } 322 pad_str = new String (char_pad); 323 } 324 325 return new String (pad_str + num_str); 326 } 327 328 329 private void establishCommunications() 330 throws IOException, UnknownHostException, AceException 331 { 332 createRxSocket(); 333 334 if (communicationUp == false) 335 { 336 try 337 { 338 timerId = AceTimer.Instance().startTimer(60 * 1000L, 0L); 340 if (timerId == -1) 341 { 342 AceLogger.Instance().log(AceLogger.ERROR, 343 AceLogger.SYSTEM_LOG, 344 "LogMessageMonitor.establishCommunications() -- The timer could not be restarted"); 345 } 346 } 347 catch (IOException ex) 348 { 349 ; 350 } 351 } 352 } 353 354 private boolean createRxSocket() 355 throws IOException, UnknownHostException, AceException 356 { 357 if (communicationUp == false) 358 { 359 try 361 { 362 rxSocket = new Socket(HTTPApplicationConfiguration.Instance().getRxHostName(), 363 HTTPApplicationConfiguration.Instance().getRxPort()); 364 } 365 catch (IOException ex) 366 { 367 return false; 368 } 369 370 rxSocket.setKeepAlive(true); 371 rxSocket.setTcpNoDelay(true); 372 373 readerThread = new AceInputSocketStream(0L, "AceLogMonitor", 374 rxSocket, true); 375 readerThread.start(); 376 communicationUp = true; 377 } 378 379 return true; 380 } 381 382 private boolean communicationUp = false; 383 private int timerId = -1; 384 private Socket rxSocket = null; 385 private AceInputSocketStream readerThread = null; 386 387 private static final int MSG_ID_LEN = 5 + 1; private static final int HOST_NAME_LEN = 25 + 1; 389 private static final int PROCESS_NAME_LEN = 15 + 1; 390 private static final int PROCESS_INSTANCE_LEN = 3 + 1; 391 } 392 | Popular Tags |