1 6 7 package SOFA.SOFAnet.Transport.RMI; 8 9 import SOFA.SOFAnet.Repository.NodeInfo; 10 import SOFA.SOFAnet.Core.NetOps; 11 import SOFA.SOFAnet.Core.CoreException; 12 import SOFA.SOFAnet.Core.Reporter; 13 import SOFA.SOFAnet.Transport.*; 14 import java.rmi.RemoteException ; 15 import java.util.*; 16 17 27 public class RMITransportNodeConnection 28 { 29 private RMITransportClient parent; 30 private NetOps netOps; 31 private RMITransportServerHolder serverHolder; 32 private NodeInfo nodeInfo; 33 private Thread asyncThread; 34 private Thread stopThread; 35 private Object stopEvent; 36 private boolean stopped; 37 private long lastUseTime; 38 private List asyncOps; 39 private String myNodeName; 40 41 final static private int OP_STOP = 0; 42 final static private int OP_PUSH = 1; 43 final static private int OP_PULL = 2; 44 45 private static class AsyncOperation 46 { 47 public int op; 48 public IOParams params; 49 50 AsyncOperation(int op, IOParams params) 51 { 52 this.op = op; 53 this.params = params; 54 } 55 } 56 57 58 public RMITransportNodeConnection(NodeInfo nodeInfo, RMITransportClient parent) 59 { 60 this.parent = parent; 61 62 serverHolder = new RMITransportServerHolder(); 63 serverHolder.setNode(nodeInfo); 64 this.nodeInfo = nodeInfo; 65 66 stopEvent = new Object (); 67 68 stopped = false; 69 lastUseTime = System.currentTimeMillis(); 70 asyncOps = new LinkedList(); 71 72 myNodeName = NodeInfo.getLocalNodeName(); 73 74 75 asyncThread = new Thread () 76 { 77 public void run() 78 { 79 asyncThreadProcedure(); 80 } 81 }; 82 83 stopThread = new Thread () 84 { 85 public void run() 86 { 87 stopThreadProcedure(); 88 } 89 }; 90 91 asyncThread.start(); 92 stopThread.start(); 93 94 } 95 96 private void asyncThreadProcedure() 97 { 98 for (;;) 99 { 100 AsyncOperation operation; 101 102 synchronized (asyncOps) 103 { 104 while (asyncOps.isEmpty()) 105 { 106 try 107 { 108 asyncOps.wait(); 109 } 110 catch (InterruptedException e) 111 { 112 } 113 } 114 115 touch(); 116 117 119 int index = -1; 120 int priority = -1; 122 Iterator it = asyncOps.iterator(); 123 int i = 0; 124 while (it.hasNext()) 125 { 126 AsyncOperation asyncOp = (AsyncOperation)it.next(); 127 128 int p; 129 if (asyncOp.op == OP_STOP) p = 0; 130 else if (asyncOp.params.getPriority() == IOParams.PRIORITY_INTERACTIVE || 131 asyncOp.params.getPriority() == IOParams.PRIORITY_BLOCKING) p = 2; 132 else p = 1; 133 134 if (index == -1) 135 { 136 priority = p; 137 index = i; 138 } 139 else if (p > priority) 140 { 141 priority = p; 142 index = i; 143 } 144 145 i++; 146 } 147 148 operation = (AsyncOperation)asyncOps.remove(index); 149 } 150 151 153 switch (operation.op) 154 { 155 case OP_STOP: 156 return; 157 158 case OP_PUSH: 159 { 160 try 161 { 162 RMITransportInterface server = serverHolder.getServer(); 163 LicenceRMI licenceRMI = null; 164 if (operation.params.getLicence() != null) licenceRMI = new LicenceRMI(operation.params.getLicence()); 165 byte[] bundle = operation.params.getBundleData().getData(); 166 if (bundle != null) 167 { 168 server.push(myNodeName, operation.params.getBundleName(), bundle, licenceRMI, operation.params.isOffer()); 169 bundle = null; 170 } 171 else 172 { 173 Reporter.error("Asynchronous push of '" + operation.params.getBundleName() + "' failed: Cannot load bundle from BundleData"); 174 } 175 } 176 catch (RemoteException e) 177 { 178 serverHolder.releaseServer(); 179 Reporter.error("Asynchronous push of '" + operation.params.getBundleName() + "' failed", e); 180 } 181 catch (RMITransportException e) 182 { 183 Reporter.error("Asynchronous push of '" + operation.params.getBundleName() + "' failed", e); 184 } 185 186 operation.params.getBundleData().delete(); 187 } 188 break; 189 190 191 case OP_PULL: 192 { 193 RMIPullOutputHolder outputHolder = null; 194 try 195 { 196 RMITransportInterface server = serverHolder.getServer(); 197 outputHolder = server.pull(myNodeName, operation.params.getBundleName(), false, false, operation.params.getContractID()); 198 } 199 catch (RemoteException e) 200 { 201 serverHolder.releaseServer(); 202 Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed", e); 203 } 204 catch (RMITransportException e) 205 { 206 Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed", e); 207 } 208 209 touch(); 210 211 if (outputHolder != null) 212 { 213 if (outputHolder.errCode == 0) 214 { 215 if (outputHolder.licenceRMI != null) operation.params.setLicence(outputHolder.licenceRMI.getLicence()); 216 else operation.params.setLicence(null); 217 operation.params.setBundleData(new BundleData(outputHolder.bundle)); 218 operation.params.setOffer(false); 219 operation.params.setSourceNodeName(nodeInfo.getName()); 220 221 try 222 { 223 parent.getNetOps().deliverBundlePull(operation.params); 224 } 225 catch (CoreException e) 226 { 227 Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed", e); 228 } 229 230 operation.params.getBundleData().delete(); } 232 else 233 { 234 switch (outputHolder.errCode) 235 { 236 case 1: Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed: Cannot find binary bundle on remote node"); break; 237 case 2: Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed: Cannot create BundleData for bundle"); break; 238 case 3: Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed: Pull not allowed by remote node"); break; 239 default: Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed: Transport layer failed"); break; 240 } 241 } 242 243 outputHolder = null; 244 } 245 246 } 247 break; 248 } 249 250 touch(); 251 } 252 } 253 254 private void stopThreadProcedure() 255 { 256 synchronized (stopEvent) 257 { 258 do 259 { 260 try 261 { 262 stopEvent.wait(1000 * 60); 263 } 264 catch (InterruptedException e) 265 { 266 } 267 } 268 while (System.currentTimeMillis() - lastUseTime < 1000 * 60); 269 } 270 271 parent.removeFromMap(nodeInfo.getAddressAndPort(), this); 272 273 synchronized (asyncOps) 274 { 275 asyncOps.add(new AsyncOperation(OP_STOP, null)); 276 asyncOps.notify(); 277 stopped = true; 278 } 279 280 } 281 282 private void touch() 283 { 284 lastUseTime = System.currentTimeMillis(); 285 synchronized (stopEvent) 286 { 287 stopEvent.notify(); 288 } 289 } 290 291 309 public void testPush(IOParams params) throws TransportException 310 { 311 touch(); 312 313 boolean result = false; 314 try 315 { 316 RMITransportInterface server = serverHolder.getServer(); 317 result = server.testPush(myNodeName, params.getBundleName(), params.isOffer()); 318 } 319 catch (RemoteException e) 320 { 321 touch(); 322 serverHolder.releaseServer(); 323 throw new TransportException(e.getMessage(), e); 324 } 325 catch (RMITransportException e) 326 { 327 touch(); 328 throw new TransportException(e.getMessage(), e); 329 } 330 331 if (result) params.setErrCode(0); 332 else params.setErrCode(1); 333 334 touch(); 335 } 336 337 356 public boolean push(IOParams params) 357 { 358 touch(); 359 synchronized (asyncOps) 360 { 361 if (stopped) return false; 362 363 asyncOps.add(new AsyncOperation(OP_PUSH, params)); 364 asyncOps.notify(); 365 } 366 367 return true; 368 } 369 370 388 public void testPull(IOParams params) throws TransportException 389 { 390 touch(); 391 392 boolean result = false; 393 try 394 { 395 RMITransportInterface server = serverHolder.getServer(); 396 result = server.testPull(myNodeName, params.getBundleName(), false, params.getContractID()); 397 } 398 catch (RemoteException e) 399 { 400 touch(); 401 serverHolder.releaseServer(); 402 throw new TransportException(e.getMessage(), e); 403 } 404 catch (RMITransportException e) 405 { 406 touch(); 407 throw new TransportException(e.getMessage(), e); 408 } 409 410 if (result) params.setErrCode(0); 411 else params.setErrCode(1); 412 413 touch(); 414 } 415 416 434 public boolean pull(IOParams params) 435 { 436 touch(); 437 synchronized (asyncOps) 438 { 439 if (stopped) return false; 440 441 asyncOps.add(new AsyncOperation(OP_PULL, params)); 442 asyncOps.notify(); 443 } 444 445 return true; 446 } 447 448 476 public void synchroPull(IOParams params) throws TransportException 477 { 478 touch(); 479 480 RMIPullOutputHolder outputHolder = null; 481 try 482 { 483 RMITransportInterface server = serverHolder.getServer(); 484 outputHolder = server.pull(myNodeName, params.getBundleName(), false, false, params.getContractID()); 485 } 486 catch (RemoteException e) 487 { 488 touch(); 489 serverHolder.releaseServer(); 490 throw new TransportException(e.getMessage(), e); 491 } 492 catch (RMITransportException e) 493 { 494 touch(); 495 throw new TransportException(e.getMessage(), e); 496 } 497 498 params.setErrCode(outputHolder.errCode); 499 if (outputHolder.licenceRMI != null) params.setLicence(outputHolder.licenceRMI.getLicence()); 500 else params.setLicence(null); 501 params.setBundleData(new BundleData(outputHolder.bundle)); 502 503 touch(); 504 } 505 506 524 public void testAcquireShared(IOParams params) throws TransportException 525 { 526 touch(); 527 528 boolean result = false; 529 try 530 { 531 RMITransportInterface server = serverHolder.getServer(); 532 result = server.testPull(myNodeName, params.getBundleName(), true, ""); 533 } 534 catch (RemoteException e) 535 { 536 touch(); 537 serverHolder.releaseServer(); 538 throw new TransportException(e.getMessage(), e); 539 } 540 catch (RMITransportException e) 541 { 542 touch(); 543 throw new TransportException(e.getMessage(), e); 544 } 545 546 if (result) params.setErrCode(0); 547 else params.setErrCode(1); 548 549 touch(); 550 } 551 552 583 public void acquireShared(IOParams params) throws TransportException 584 { 585 touch(); 586 587 RMIPullOutputHolder outputHolder = null; 588 try 589 { 590 RMITransportInterface server = serverHolder.getServer(); 591 outputHolder = server.pull(myNodeName, params.getBundleName(), true, params.isLicenceOnly(), ""); 592 } 593 catch (RemoteException e) 594 { 595 touch(); 596 serverHolder.releaseServer(); 597 throw new TransportException(e.getMessage(), e); 598 } 599 catch (RMITransportException e) 600 { 601 touch(); 602 throw new TransportException(e.getMessage(), e); 603 } 604 605 params.setErrCode(outputHolder.errCode); 606 if (outputHolder.licenceRMI != null) params.setLicence(outputHolder.licenceRMI.getLicence()); 607 else params.setLicence(null); 608 if (params.isLicenceOnly()) 609 { 610 params.setAddress(false); 612 params.setAddressNodeName(""); 613 params.setBundleData(null); 614 } 615 else 616 { 617 if (outputHolder.address == null || outputHolder.address.length() == 0) 618 { 619 params.setAddress(false); 620 params.setAddressNodeName(""); 621 params.setBundleData(new BundleData(outputHolder.bundle)); 622 } 623 else 624 { 625 params.setAddress(true); 626 params.setAddressNodeName(outputHolder.address); 627 params.setBundleData(null); 628 } 629 } 630 631 touch(); 632 } 633 634 655 public void canReturnShared(IOParams params) throws TransportException 656 { 657 touch(); 658 659 boolean result = false; 660 try 661 { 662 RMITransportInterface server = serverHolder.getServer(); 663 result = server.canReturnShared(myNodeName, params.getBundleName()); 664 } 665 catch (RemoteException e) 666 { 667 touch(); 668 serverHolder.releaseServer(); 669 throw new TransportException(e.getMessage(), e); 670 } 671 catch (RMITransportException e) 672 { 673 touch(); 674 throw new TransportException(e.getMessage(), e); 675 } 676 677 if (result) params.setErrCode(0); 678 else params.setErrCode(1); 679 touch(); 680 } 681 682 707 public void returnShared(IOParams params) throws TransportException 708 { 709 touch(); 710 711 int errCode = 0; 712 try 713 { 714 RMITransportInterface server = serverHolder.getServer(); 715 errCode = server.returnShared(myNodeName, params.getBundleName(), params.isAddress(), params.getAddressNodeName()); 716 } 717 catch (RemoteException e) 718 { 719 touch(); 720 serverHolder.releaseServer(); 721 throw new TransportException(e.getMessage(), e); 722 } 723 catch (RMITransportException e) 724 { 725 touch(); 726 throw new TransportException(e.getMessage(), e); 727 } 728 729 params.setErrCode(errCode); 730 touch(); 731 } 732 733 755 public void manualReturnShared(IOParams params) throws TransportException 756 { 757 touch(); 758 759 int errCode = 0; 760 try 761 { 762 RMITransportInterface server = serverHolder.getServer(); 763 errCode = server.manualReturnShared(myNodeName, params.getBundleName()); 764 } 765 catch (RemoteException e) 766 { 767 touch(); 768 serverHolder.releaseServer(); 769 throw new TransportException(e.getMessage(), e); 770 } 771 catch (RMITransportException e) 772 { 773 touch(); 774 throw new TransportException(e.getMessage(), e); 775 } 776 777 params.setErrCode(errCode); 778 touch(); 779 } 780 781 } 782 | Popular Tags |