1 23 package fr.dyade.aaa.agent; 24 25 import java.io.*; 26 import java.net.*; 27 import java.util.Vector ; 28 import java.util.Enumeration ; 29 30 import org.objectweb.util.monolog.api.BasicLevel; 31 import org.objectweb.util.monolog.api.Logger; 32 33 import fr.dyade.aaa.util.*; 34 35 40 public class SimpleNetwork extends StreamNetwork { 41 42 MessageVector sendList; 43 44 private JGroups jgroups = null; 45 46 public void setJGroups(JGroups jgroups) { 47 this.jgroups = jgroups; 48 } 49 50 void ackMsg(JGroupsAckMsg ack) { 51 try { 52 AgentServer.getTransaction().begin(); 53 qout.remove(ack.getStamp()); 55 ack.delete(); 56 AgentServer.getTransaction().commit(); 57 AgentServer.getTransaction().release(); 58 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 59 this.logmon.log(BasicLevel.DEBUG, 60 this.getName() + ", ackMsg(...) done."); 61 } catch (Exception exc) { 62 this.logmon.log(BasicLevel.FATAL, 63 this.getName() + ", ackMsg unrecoverable exception", 64 exc); 65 } 66 } 67 68 71 public SimpleNetwork() { 72 super(); 73 } 74 75 76 NetServerIn netServerIn = null; 77 78 NetServerOut netServerOut = null; 79 80 83 public void start() throws IOException { 84 logmon.log(BasicLevel.DEBUG, getName() + ", starting"); 85 try { 86 if (sendList == null) 87 sendList = new MessageVector(getName(), 88 AgentServer.getTransaction().isPersistent()); 89 90 if (netServerIn == null) 91 netServerIn = new NetServerIn(getName(), logmon); 92 if (netServerOut == null) 93 netServerOut = new NetServerOut(getName(), logmon); 94 95 if (! netServerIn.isRunning()) netServerIn.start(); 96 if (! netServerOut.isRunning()) netServerOut.start(); 97 } catch (IOException exc) { 98 logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc); 99 throw exc; 100 } 101 logmon.log(BasicLevel.DEBUG, getName() + ", started"); 102 } 103 104 107 public void stop() { 108 if (netServerIn != null) netServerIn.stop(); 109 if (netServerOut != null) netServerOut.stop(); 110 logmon.log(BasicLevel.DEBUG, getName() + ", stopped"); 111 } 112 113 119 public boolean isRunning() { 120 if ((netServerIn != null) && netServerIn.isRunning() && 121 (netServerOut != null) && netServerOut.isRunning()) 122 return true; 123 else 124 return false; 125 } 126 127 133 public String toString() { 134 StringBuffer strbuf = new StringBuffer (); 135 136 strbuf.append(super.toString()).append("\n\t"); 137 if (netServerIn != null) 138 strbuf.append(netServerIn.toString()).append("\n\t"); 139 if (netServerOut != null) 140 strbuf.append(netServerOut.toString()).append("\n\t"); 141 142 return strbuf.toString(); 143 } 144 145 163 final class NetServerOut extends Daemon { 164 MessageOutputStream nos = null; 165 166 NetServerOut(String name, Logger logmon) { 167 super(name + ".NetServerOut"); 168 this.logmon = logmon; 170 this.setThreadGroup(AgentServer.getThreadGroup()); 171 } 172 173 protected void close() {} 174 175 protected void shutdown() {} 176 177 public void run() { 178 int ret; 179 Message msg = null; 180 short msgto; 181 ServerDesc server = null; 182 InputStream is = null; 183 184 try { 185 try { 186 nos = new MessageOutputStream(); 187 } catch (IOException exc) { 188 logmon.log(BasicLevel.FATAL, 189 getName() + ", cannot start."); 190 return; 191 } 192 193 while (running) { 194 canStop = true; 195 try { 196 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 197 this.logmon.log(BasicLevel.DEBUG, 198 this.getName() + ", waiting message"); 199 msg = qout.get(WDActivationPeriod); 200 } catch (InterruptedException exc) { 201 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 202 this.logmon.log(BasicLevel.DEBUG, 203 this.getName() + ", interrupted"); 204 continue; 205 } 206 canStop = false; 207 if (! running) break; 208 209 long currentTimeMillis = System.currentTimeMillis(); 210 watchdog(currentTimeMillis); 212 213 if (msg != null) { 214 msgto = msg.getDest(); 215 216 Socket socket = null; 217 try { 218 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 219 this.logmon.log(BasicLevel.DEBUG, 220 this.getName() + ", try to send message -> " + 221 msg + "/" + msgto); 222 223 if ((msg.not.expiration > 0) && 224 (msg.not.expiration < currentTimeMillis)) { 225 throw new ExpirationExceededException(); 226 } 227 228 server = AgentServer.getServerDesc(msgto); 230 try { 231 if ((! server.active) || 232 (server.last > currentTimeMillis)) { 233 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 234 this.logmon.log(BasicLevel.DEBUG, 235 this.getName() + ", AgentServer#" + msgto + " is down"); 236 throw new ConnectException("AgentServer#" + msgto + " is down"); 237 } 238 239 try { 241 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 242 this.logmon.log(BasicLevel.DEBUG, this.getName() + ", try to connect"); 243 244 for (Enumeration e = server.getSockAddrs(); e.hasMoreElements();) { 245 fr.dyade.aaa.util.SocketAddress sa = 246 (fr.dyade.aaa.util.SocketAddress) e.nextElement(); 247 try { 248 server.moveToFirst(sa); 249 socket = createSocket(server); 250 } catch (IOException ioexc) { 251 this.logmon.log(BasicLevel.DEBUG, 252 this.getName() + ", connection refused with addr=" + server.getAddr()+ 253 " port=" + server.getPort() +", try next element"); 254 continue; 255 } 256 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 257 this.logmon.log(BasicLevel.DEBUG, this.getName() + ", connected"); 258 break; 259 } 260 261 if (socket == null) 262 socket = createSocket(server); 263 } catch (IOException exc) { 264 this.logmon.log(BasicLevel.WARN, 265 this.getName() + ", connection refused", exc); 266 server.active = false; 267 server.last = System.currentTimeMillis(); 268 server.retry += 1; 269 throw exc; 270 } 271 setSocketOption(socket); 272 } catch (IOException exc) { 273 this.logmon.log(BasicLevel.WARN, 274 this.getName() + ", move msg in watchdog list", exc); 275 sendList.addMessage(msg); 278 qout.pop(); 279 continue; 280 } 281 282 try { 283 send(socket, msg, currentTimeMillis); 284 } catch (IOException exc) { 285 this.logmon.log(BasicLevel.WARN, 286 this.getName() + ", move msg in watchdog list", exc); 287 sendList.addMessage(msg); 290 qout.pop(); 291 continue; 292 } 293 } catch (UnknownServerException exc) { 294 this.logmon.log(BasicLevel.ERROR, 295 this.getName() + ", can't send message: " + msg, 296 exc); 297 } catch (ExpirationExceededException exc) { 300 if (logmon.isLoggable(BasicLevel.DEBUG)) 301 logmon.log(BasicLevel.DEBUG, 302 getName() + ": removes expired notification " + 303 msg.from + ", " + msg.not); 304 } 305 306 AgentServer.getTransaction().begin(); 307 qout.pop(); 310 if (jgroups != null) 312 jgroups.send(new JGroupsAckMsg(msg)); 313 msg.delete(); 314 msg.free(); 315 AgentServer.getTransaction().commit(); 316 AgentServer.getTransaction().release(); 317 } 318 } 319 } catch (Exception exc) { 320 this.logmon.log(BasicLevel.FATAL, 321 this.getName() + ", unrecoverable exception", exc); 322 AgentServer.stop(false); 325 } finally { 326 finish(); 327 } 328 } 329 330 333 337 void watchdog(long currentTimeMillis) throws IOException { 338 341 345 ServerDesc server = null; 346 347 for (int i=0; i<sendList.size(); i++) { 348 Message msg = (Message) sendList.getMessageAt(i); 349 short msgto = msg.getDest(); 350 351 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 352 this.logmon.log(BasicLevel.DEBUG, 353 this.getName() + 354 ", check msg#" + msg.getStamp() + 355 " from " + msg.from + 356 " to " + msg.to); 357 358 if ((msg.not.expiration > 0) && 359 (msg.not.expiration < currentTimeMillis)) { 360 if (logmon.isLoggable(BasicLevel.DEBUG)) 361 logmon.log(BasicLevel.DEBUG, 362 getName() + ": removes expired notification " + 363 msg.from + ", " + msg.not); 364 365 AgentServer.getTransaction().begin(); 367 sendList.removeMessageAt(i); i--; 369 msg.delete(); 374 msg.free(); 375 AgentServer.getTransaction().commit(); 376 AgentServer.getTransaction().release(); 377 } 378 379 try { 380 server = AgentServer.getServerDesc(msgto); 381 } catch (UnknownServerException exc) { 382 this.logmon.log(BasicLevel.ERROR, 383 this.getName() + ", can't send message: " + msg, 384 exc); 385 AgentServer.getTransaction().begin(); 388 sendList.removeMessageAt(i); i--; 390 msg.delete(); 395 msg.free(); 396 AgentServer.getTransaction().commit(); 397 AgentServer.getTransaction().release(); 398 399 continue; 400 } 401 402 if (server.last > currentTimeMillis) { 403 continue; 405 } 406 407 if ((server.active) || 408 ((server.retry < WDNbRetryLevel1) && 409 ((server.last + WDRetryPeriod1) < currentTimeMillis)) || 410 ((server.retry < WDNbRetryLevel2) && 411 ((server.last + WDRetryPeriod2) < currentTimeMillis)) || 412 ((server.last + WDRetryPeriod3) < currentTimeMillis)) { 413 try { 414 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 415 this.logmon.log(BasicLevel.DEBUG, 416 this.getName() + 417 ", send msg#" + msg.getStamp()); 418 419 Socket socket = createSocket(server); 421 server.active = true; 423 server.retry = 0; 424 server.last = currentTimeMillis; 427 428 setSocketOption(socket); 429 430 send(socket, msg, currentTimeMillis); 431 } catch (SocketException exc) { 432 if (this.logmon.isLoggable(BasicLevel.WARN)) 433 this.logmon.log(BasicLevel.WARN, 434 this.getName() + ", let msg in watchdog list", 435 exc); 436 server.active = false; 437 server.retry += 1; 438 server.last = currentTimeMillis +1; 441 continue; 444 } catch (Exception exc) { 445 this.logmon.log(BasicLevel.ERROR, 446 this.getName() + ", error", exc); 447 } 448 449 AgentServer.getTransaction().begin(); 450 sendList.removeMessageAt(i); i--; 452 msg.delete(); 457 msg.free(); 458 AgentServer.getTransaction().commit(); 459 AgentServer.getTransaction().release(); 460 } else { 461 server.last = currentTimeMillis +1; 464 } 465 } 466 } 467 468 public void send(Socket socket, 469 Message msg, 470 long currentTimeMillis) throws IOException { 471 int ret; 472 InputStream is = null; 473 474 try { 475 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 477 this.logmon.log(BasicLevel.DEBUG, 478 this.getName() + ", write message"); 479 nos.writeMessage(socket, msg, currentTimeMillis); 480 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 482 this.logmon.log(BasicLevel.DEBUG, 483 this.getName() + ", wait ack"); 484 is = socket.getInputStream(); 485 if ((ret = is.read()) == -1) 486 throw new ConnectException("Connection broken"); 487 488 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 489 this.logmon.log(BasicLevel.DEBUG, 490 this.getName() + ", receive ack"); 491 } finally { 492 try { 493 socket.getOutputStream().close(); 494 } catch (Exception exc) {} 495 try { 496 is.close(); 497 } catch (Exception exc) {} 498 try { 499 socket.close(); 500 } catch (Exception exc) {} 501 } 502 } 503 } 504 505 final class NetServerIn extends Daemon { 506 ServerSocket listen = null; 507 508 NetServerIn(String name, Logger logmon) throws IOException { 509 super(name + ".NetServerIn"); 510 listen = createServerSocket(); 511 this.logmon = logmon; 513 this.setThreadGroup(AgentServer.getThreadGroup()); 514 } 515 516 protected void close() { 517 try { 518 listen.close(); 519 } catch (Exception exc) {} 520 } 521 522 protected void shutdown() { 523 close(); 524 } 525 526 public void run() { 527 Socket socket = null; 528 OutputStream os = null; 529 ObjectInputStream ois = null; 530 byte[] iobuf = new byte[29]; 531 532 try { 533 while (running) { 534 try { 535 canStop = true; 536 537 try { 539 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 540 this.logmon.log(BasicLevel.DEBUG, 541 this.getName() + ", waiting connection"); 542 socket = listen.accept(); 543 } catch (IOException exc) { 544 continue; 545 } 546 canStop = false; 547 548 setSocketOption(socket); 549 550 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 551 this.logmon.log(BasicLevel.DEBUG, 552 this.getName() + ", connected"); 553 554 os = socket.getOutputStream(); 556 InputStream is = socket.getInputStream(); 557 558 Message msg = Message.alloc(); 559 int n = 0; 560 do { 561 int count = is.read(iobuf, n, Message.LENGTH +4 - n); 562 if (count < 0) throw new EOFException(); 563 n += count; 564 } while (n < (Message.LENGTH +4)); 565 566 int boot = ((iobuf[0] & 0xFF) << 24) + 568 ((iobuf[1] & 0xFF) << 16) + 569 ((iobuf[2] & 0xFF) << 8) + 570 ((iobuf[3] & 0xFF) << 0); 571 572 int idx = msg.readFromBuf(iobuf, 4); 573 574 boolean persistent = ((iobuf[idx] & Message.PERSISTENT) == 0)?false:true; 576 boolean detachable = ((iobuf[idx] & Message.DETACHABLE) == 0)?false:true; 577 578 ois = new ObjectInputStream(is); 580 msg.not = (Notification) ois.readObject(); 581 if (msg.not.expiration > 0) 582 msg.not.expiration += System.currentTimeMillis(); 583 msg.not.persistent = persistent; 584 msg.not.detachable = detachable; 585 msg.not.detached = false; 586 587 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 588 this.logmon.log(BasicLevel.DEBUG, 589 this.getName() + ", msg received"); 590 591 testBootTS(msg.getSource(), boot); 592 deliver(msg); 593 594 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 595 this.logmon.log(BasicLevel.DEBUG, this.getName() + ", send ack"); 596 597 os.write(0); 599 os.flush(); 600 } catch (Exception exc) { 601 this.logmon.log(BasicLevel.ERROR, 602 this.getName() + ", closed", exc); 603 } finally { 604 try { 605 os.close(); 606 } catch (Exception exc) {} 607 os = null; 608 try { 609 ois.close(); 610 } catch (Exception exc) {} 611 ois = null; 612 try { 613 socket.close(); 614 } catch (Exception exc) {} 615 socket = null; 616 } 617 } 618 } finally { 619 finish(); 620 } 621 } 622 } 623 624 627 final class MessageOutputStream extends ByteArrayOutputStream { 628 private ObjectOutputStream oos = null; 629 private OutputStream os = null; 630 631 MessageOutputStream() throws IOException { 632 super(256); 633 oos = new ObjectOutputStream(this); 634 count = 0; 635 buf[Message.LENGTH +4] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF); 636 buf[Message.LENGTH +5] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF); 637 buf[Message.LENGTH +6] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF); 638 buf[Message.LENGTH +7] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF); 639 } 640 641 void writeMessage(Socket sock, 642 Message msg, 643 long currentTimeMillis) throws IOException { 644 os = sock.getOutputStream(); 645 646 buf[0] = (byte) (getBootTS() >>> 24); 648 buf[1] = (byte) (getBootTS() >>> 16); 649 buf[2] = (byte) (getBootTS() >>> 8); 650 buf[3] = (byte) (getBootTS() >>> 0); 651 652 int idx = msg.writeToBuf(buf, 4); 653 buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) | 655 (msg.not.detachable?Message.DETACHABLE:0)); 656 657 count = Message.LENGTH +8; 659 660 try { 661 if (msg.not.expiration > 0) 662 msg.not.expiration -= currentTimeMillis; 663 oos.writeObject(msg.not); 664 oos.reset(); 665 oos.flush(); 666 os.write(buf, 0, count);; 667 os.flush(); 668 } finally { 669 if (msg.not.expiration > 0) 670 msg.not.expiration += currentTimeMillis; 671 count = 0; 672 } 673 } 674 } 675 } 676 | Popular Tags |