1 6 7 package SOFA.SOFAnet.Search.RMI; 8 9 import SOFA.SOFAnet.Search.*; 10 import SOFA.SOFAnet.Repository.NodeInfo; 11 import SOFA.SOFAnet.Repository.ShareGroups; 12 import SOFA.SOFAnet.Core.Reporter; 13 import java.rmi.RemoteException ; 14 import java.util.*; 15 16 26 public class RMISearchConnection 27 { 28 private NodeInfo nodeInfo; 29 private ShareGroups shareGroups; 30 private RMISearchServerHolder serverHolder; 31 private Thread asyncThread; 32 private List asyncRequests; 33 private String myNodeName; 34 public boolean connected; 35 36 final static private int OP_STOP = 0; 37 final static private int OP_REQUEST = 1; 38 final static private int OP_CONNECT = 2; 39 40 private static class AsyncRequest 41 { 42 public int op; 43 public SearchRequestID requestID; 44 public SearchPattern searchPattern; 45 public boolean firstOnly; 46 public String shareGroup; 47 48 AsyncRequest(int op, SearchRequestID requestID, SearchPattern searchPattern, boolean firstOnly, String shareGroup) 49 { 50 this.op = op; 51 this.requestID = requestID; 52 this.searchPattern = searchPattern; 53 this.firstOnly = firstOnly; 54 this.shareGroup = shareGroup; 55 } 56 } 57 58 59 public RMISearchConnection(NodeInfo nodeInfo, ShareGroups shareGroups, boolean forReplyOnly) 60 { 61 this.nodeInfo = nodeInfo; 62 this.shareGroups = shareGroups; 63 64 serverHolder = new RMISearchServerHolder(); 65 serverHolder.setNode(nodeInfo); 66 67 myNodeName = NodeInfo.getLocalNodeName(); 68 69 asyncRequests = new LinkedList(); 70 71 if (!forReplyOnly) 72 { 73 asyncThread = new Thread () 74 { 75 public void run() 76 { 77 asyncThreadProcedure(); 78 } 79 }; 80 81 asyncThread.start(); 82 } 83 84 connected = false; 85 } 86 87 private void asyncThreadProcedure() 88 { 89 for (;;) 90 { 91 AsyncRequest request; 92 93 synchronized (asyncRequests) 94 { 95 while (asyncRequests.isEmpty()) 96 { 97 try 98 { 99 asyncRequests.wait(); 100 } 101 catch (InterruptedException e) 102 { 103 } 104 } 105 106 request = (AsyncRequest)asyncRequests.remove(0); 107 } 108 109 111 switch (request.op) 112 { 113 case OP_STOP: 114 return; 115 116 case OP_CONNECT: 117 { 118 RMISearchInterface server = null; 119 try 120 { 121 server = serverHolder.getServer(true); 122 } 123 catch (RMISearchException e) 124 { 125 Reporter.error("Connection to RMI search server at node '" + nodeInfo.getName() + "' failed", e); 126 } 127 128 if (server != null) 129 { 130 try 131 { 132 server.connect(myNodeName, shareGroups); 133 } 134 catch (RemoteException e) 135 { 136 serverHolder.releaseServer(true); 137 Reporter.error("Connection to RMI search server at node '" + nodeInfo.getName() + "' failed", e); 138 } 139 catch (RMISearchException e) 140 { 141 serverHolder.releaseServer(true); 142 Reporter.error("Connection to RMI search server at node '" + nodeInfo.getName() + "' failed", e); 143 } 144 } 145 } 146 break; 147 148 case OP_REQUEST: 149 if (shareGroups == null || request.shareGroup.length() == 0 || shareGroups.contains(request.shareGroup)) 150 { 151 boolean doConnect = !serverHolder.isServer(); 152 153 RMISearchInterface server = null; 154 try 155 { 156 server = serverHolder.getServer(true); } 158 catch (RMISearchException e) 159 { 160 Reporter.error("Connection to RMI search server at node '" + nodeInfo.getName() + "' failed", e); 161 } 162 163 if (server != null) 164 { 165 if (doConnect) 166 { 167 try 168 { 169 server.connect(myNodeName, shareGroups); 170 } 171 catch (RemoteException e) 172 { 173 server = null; 174 serverHolder.releaseServer(true); 175 Reporter.error("Connection to RMI search server at node '" + nodeInfo.getName() + "' failed", e); 176 } 177 catch (RMISearchException e) 178 { 179 server = null; 180 serverHolder.releaseServer(true); 181 Reporter.error("Connection to RMI search server at node '" + nodeInfo.getName() + "' failed", e); 182 } 183 } 184 } 185 186 if (server != null) 187 { 188 try 189 { 190 server.request(myNodeName, request.requestID, request.searchPattern, request.firstOnly, request.shareGroup); 191 } 192 catch (RemoteException e) 193 { 194 serverHolder.releaseServer(true); 195 Reporter.error("Sending (RMI) search request to node '" + nodeInfo.getName() + "' failed", e); 196 } 197 catch (RMISearchException e) 198 { 199 serverHolder.releaseServer(true); 200 Reporter.error("Sending (RMI) search request to node '" + nodeInfo.getName() + "' failed", e); 201 } 202 } 203 } 204 break; 205 } 206 } 207 } 208 209 public void sendRequest(SearchRequestID requestID, SearchPattern searchPattern, boolean firstOnly, String shareGroup) 210 { 211 synchronized (asyncRequests) 212 { 213 asyncRequests.add(new AsyncRequest(OP_REQUEST, requestID, searchPattern, firstOnly, shareGroup)); 214 asyncRequests.notify(); 215 } 216 } 217 218 public void connect() 219 { 220 synchronized (asyncRequests) 221 { 222 asyncRequests.add(new AsyncRequest(OP_CONNECT, null, null, false, null)); 223 asyncRequests.notify(); 224 } 225 226 connected = true; 227 } 228 229 public void stop() 230 { 231 synchronized (asyncRequests) 232 { 233 asyncRequests.add(new AsyncRequest(OP_STOP, null, null, false, null)); 234 asyncRequests.notify(); 235 } 236 } 237 238 public boolean isNode(String nodeName) 239 { 240 return nodeInfo.getName().compareTo(nodeName) == 0; 241 } 242 243 public boolean isConnected() 244 { 245 return connected; 246 } 247 248 public void sendReply(SearchReply searchReply) throws RMISearchException 249 { 250 RMISearchInterface server = serverHolder.getServer(false); 251 252 try 253 { 254 server.reply(myNodeName, searchReply); 255 } 256 catch (RemoteException e) 257 { 258 serverHolder.releaseServer(true); 259 throw new RMISearchException(e.getMessage(), e); 260 } 261 catch (RMISearchException e) 262 { 263 serverHolder.releaseServer(true); 264 throw e; 265 } 266 } 267 } 268 | Popular Tags |