1 2 package com.ca.directory.jxplorer.broker; 3 4 import com.ca.directory.jxplorer.*; 5 6 import com.ca.commons.naming.DN; 7 import com.ca.commons.naming.DXEntry; 8 import com.ca.commons.naming.DXNamingEnumeration; 9 import com.ca.commons.jndi.SchemaOps; 10 11 import javax.naming.directory.DirContext ; 12 import javax.naming.NamingException ; 13 import java.util.Vector ; 14 import java.util.ArrayList ; 15 16 public abstract class Broker implements Runnable , DataSource 17 { 18 protected Vector requestQueue = new Vector (10); protected Vector listeners = new Vector (); 21 22 private static int noBrokers = 0; 24 public int id; 25 26 protected DataQuery current = null; 27 28 StopMonitor stopMonitor = null; 29 30 public Broker() { id = (noBrokers++); } 31 32 private static boolean debug = false; 33 34 38 39 public void registerStopMonitor(StopMonitor monitor) { stopMonitor = monitor; } 40 41 46 47 public DataQuery push(DataQuery request) 48 { 49 for (int i=0; i<listeners.size(); i++) 50 request.addDataListener((DataListener)listeners.get(i)); 51 52 synchronized(requestQueue) 53 { 54 requestQueue.add(request); 55 } 56 57 if (stopMonitor != null) stopMonitor.updateWatchers(); 58 59 synchronized(requestQueue) 60 { 61 requestQueue.notifyAll(); 62 } 63 64 65 return request; } 67 68 72 73 public DataQuery pop() 74 { 75 DataQuery request = null; 76 77 synchronized(requestQueue) 78 { 79 if (requestQueue.isEmpty()) return null; 80 request = (DataQuery)requestQueue.firstElement(); 81 requestQueue.removeElementAt(0); 82 request.setRunning(); } 84 85 return request; 86 } 87 88 91 92 public void removeQuery(DataQuery query) 93 { 94 synchronized(requestQueue) 95 { 96 requestQueue.remove(query); 97 } 98 } 99 100 103 104 public boolean hasRequests() 105 { 106 synchronized(requestQueue) 107 { 108 return !requestQueue.isEmpty(); 109 } 110 } 111 112 118 119 public void run() 120 { 121 while (true) 122 { 123 if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " processing Queue of length: " + requestQueue.size() + " in broker " + id); 124 if (processQueue()==false) 125 { 126 if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " ending." + requestQueue.size()); 127 return; 128 } 129 130 try 131 { 132 if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " waiting in run() loop"); 133 synchronized(requestQueue) 134 { 135 requestQueue.wait(); 136 } 137 if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " notified in run() loop"); 138 } 139 catch (Exception e) 140 { 141 if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " interrupted in run() loop \n " + e); 142 } 143 } 144 } 145 146 149 150 protected boolean processQueue() 151 { 152 while (hasRequests()) 153 { 154 current = pop(); 156 if (current == null) return true; 158 processRequest(current); 159 160 if (stopMonitor != null) stopMonitor.updateWatchers(); 161 162 if (current != null && current.isCancelled()) { return false; } 166 else 167 { 168 current = null; 169 } 170 } 171 172 return true; } 174 175 178 179 protected void processRequest(DataQuery request) 180 { 181 if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " process request " + request.id ); 182 183 if (request.isCancelled() == true) 184 { 185 request.finish(); 186 return; 187 } 188 189 try 190 { 191 if (!isActive()) 192 request.setException(new Exception ("No Data Connection Enabled")); 193 194 if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " process request " + request.id + " of type " + request.getTypeString()); 195 196 switch(request.getType()) 197 { 198 case DataQuery.EXISTS: doExistsQuery(request); break; 199 200 case DataQuery.READENTRY: doEntryQuery(request); break; 201 202 case DataQuery.LIST: doListQuery(request); break; 203 204 case DataQuery.SEARCH: doSearchQuery(request); break; 205 206 case DataQuery.MODIFY: doModifyQuery(request); break; 207 208 case DataQuery.COPY: doCopyQuery(request); break; 209 210 212 case DataQuery.GETRECOC: doGetRecOCsQuery(request); break; 213 214 case DataQuery.EXTENDED: doExtendedQuery(request); break; 215 216 case DataQuery.UNKNOWN: 217 218 default: throw new NamingException ("JX Internal Error: Unknown Data Broker Request type: " + request.getType()); 219 } 220 } 221 catch (Exception e) 222 { 223 request.setException(e); 224 } 225 226 request.finish(); 228 229 } 230 231 232 237 238 public DataQuery getChildren(DN nodeDN) 239 { 240 return push(new DataQuery(DataQuery.LIST, nodeDN)); 241 } 242 243 public DataQuery getEntry(DN nodeDN) 244 { 245 return push(new DataQuery(DataQuery.READENTRY, nodeDN)); 246 } 247 248 public DataQuery exists(DN nodeDN) 249 { 250 return push(new DataQuery(DataQuery.EXISTS, nodeDN)); 251 } 252 258 public DataQuery getRecommendedObjectClasses(DN dn) 259 { 260 return push(new DataQuery(DataQuery.GETRECOC, dn)); 261 } 262 263 public DataQuery modifyEntry(DXEntry oldEntry, DXEntry newEntry) 264 { 265 return push(new DataQuery(DataQuery.MODIFY, oldEntry, newEntry)); 266 } 267 268 public DataQuery copyTree(DN oldNodeDN, DN newNodeDN) 269 { 270 return push(new DataQuery(DataQuery.COPY, oldNodeDN, newNodeDN)); 271 } 272 273 public DataQuery search(DN nodeDN, String filter, int searchLevel, String [] returnAttributes) 274 { 275 return push(new DataQuery(DataQuery.SEARCH, nodeDN, filter, searchLevel, returnAttributes)); 276 } 277 278 public DataQuery extendedRequest(DataQuery query) 279 { 280 return push(query); 281 } 282 283 290 public void addDataListener(DataListener l) 291 { 292 if (listeners.contains(l) == false) 293 { 294 listeners.add(l); 295 } 296 } 297 298 299 300 304 305 public void removeDataListener(DataListener l) 306 { 307 if (listeners.contains(l)==true) 308 { 309 listeners.remove(l); 310 } 311 } 312 313 314 315 317 public abstract boolean isModifiable(); 318 319 public abstract DirContext getDirContext(); 320 321 public abstract boolean isActive(); 322 323 public abstract SchemaOps getSchemaOps(); 324 325 329 330 protected DataQuery finish(DataQuery request) 331 { 332 if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " request " + request.id + " finished "); 333 request.finish(); 334 return request; 335 } 336 337 338 339 340 342 346 public DataQuery getCurrent() { return current; } 347 348 352 public synchronized Vector getRequestQueue() 353 { 354 return requestQueue; 355 } 356 357 359 protected DataQuery doExtendedQuery(DataQuery request) 360 throws NamingException 361 { 362 request.doExtendedRequest(this); 363 return finish(request); 364 } 365 366 367 371 372 protected DataQuery doExistsQuery(DataQuery request) 373 throws NamingException 374 { 375 unthreadedExists(request.requestDN()); 376 request.setStatus(true); 377 return finish(request); 378 } 379 380 384 385 protected DataQuery doListQuery(DataQuery request) 386 throws NamingException 387 { 388 request.setEnum(unthreadedList(request.requestDN())); 389 return finish(request); 390 } 391 392 396 397 protected DataQuery doEntryQuery(DataQuery request) 398 throws NamingException 399 { 400 request.setEntry(unthreadedReadEntry(request.requestDN(), null)); 401 return finish(request); 402 } 403 404 408 409 protected DataQuery doSearchQuery(DataQuery request) 410 throws NamingException 411 { 412 DXNamingEnumeration en = unthreadedSearch(request.requestDN(), request.filter(), request.searchLevel(), request.returnAttributes()); 413 request.setEnum(en); 414 return finish(request); 415 } 416 417 421 422 protected DataQuery doModifyQuery(DataQuery request) 423 throws NamingException 424 { 425 unthreadedModify(request.oldEntry(), request.newEntry()); 426 request.setStatus(true); 427 return finish(request); 428 } 429 430 434 435 protected DataQuery doCopyQuery(DataQuery request) 436 throws NamingException 437 { 438 unthreadedCopy(request.oldDN(), request.requestDN()); 439 request.setStatus(true); 440 return finish(request); 441 } 442 443 447 448 protected DataQuery doGetRecOCsQuery(DataQuery request) 449 throws NamingException 450 { 451 request.setArrayList(unthreadedGetRecOCs(request.requestDN())); 452 return finish(request); 453 } 454 455 459 466 473 474 public abstract DXNamingEnumeration unthreadedList(DN searchbase) throws NamingException ; 475 476 486 487 public abstract DXNamingEnumeration unthreadedSearch(DN dn, String filter, int search_level, String [] returnAttributes) throws NamingException ; 488 489 497 public abstract void unthreadedCopy(DN oldNodeDN, DN newNodeDN) throws NamingException ; 498 499 502 503 public abstract boolean unthreadedExists(DN checkMe) throws NamingException ; 504 505 508 509 511 518 519 public abstract DXEntry unthreadedReadEntry(DN entryDN, String [] returnAttributes) throws NamingException ; 520 521 522 527 528 public abstract void unthreadedModify(DXEntry oldEntry, DXEntry newEntry) throws NamingException ; 529 530 537 538 public abstract ArrayList unthreadedGetRecOCs(DN dn) throws NamingException ; 539 540 541 542 547 548 public Exception getException() 549 { 550 return null; 551 } 552 553 557 558 public void clearException() 559 { 560 } 561 562 567 568 public Broker getBroker() { return this; } 569 570 } | Popular Tags |