1 package com.quikj.server.framework; 2 3 import java.io.*; 4 import java.util.*; 5 6 public class AceThread extends Thread 7 { 8 public AceThread() 9 throws IOException 10 { 11 super(); 12 createMessageQueue(); 13 aceThreadListId = getUniqueAceThreadId(); 14 synchronized (aceThreadList) 15 { 16 aceThreadList.put(new Integer (aceThreadListId), this); 17 } 18 } 19 20 public AceThread(String name) 21 throws IOException 22 { 23 super(name); 24 createMessageQueue(); 25 aceThreadListId = getUniqueAceThreadId(); 26 synchronized (aceThreadList) 27 { 28 aceThreadList.put(new Integer (aceThreadListId), this); 29 } 30 } 31 32 public AceThread(ThreadGroup group, String name) 33 throws IOException 34 { 35 super(group, name); 36 createMessageQueue(); 37 aceThreadListId = getUniqueAceThreadId(); 38 synchronized (aceThreadList) 39 { 40 aceThreadList.put(new Integer (aceThreadListId), this); 41 } 42 } 43 44 public AceThread(boolean nomsgq) 45 throws IOException 46 { 47 super(); 48 49 if (nomsgq == false) 50 { 51 createMessageQueue(); 52 } 53 54 aceThreadListId = getUniqueAceThreadId(); 55 synchronized (aceThreadList) 56 { 57 aceThreadList.put(new Integer (aceThreadListId), this); 58 } 59 } 60 61 62 public AceThread(String name, boolean nomsgq) 63 throws IOException 64 { 65 super(name); 66 67 if (nomsgq == false) 68 { 69 createMessageQueue(); 70 } 71 72 aceThreadListId = getUniqueAceThreadId(); 73 synchronized (aceThreadList) 74 { 75 aceThreadList.put(new Integer (aceThreadListId), this); 76 } 77 } 78 79 public AceThread(ThreadGroup group, String name, boolean nomsgq) 80 throws IOException 81 { 82 super(group, name); 83 84 if (nomsgq == false) 85 { 86 createMessageQueue(); 87 } 88 89 aceThreadListId = getUniqueAceThreadId(); 90 synchronized(aceThreadList) 91 { 92 aceThreadList.put(new Integer (aceThreadListId), this); 93 } 94 } 95 96 public final boolean sendMessage(AceMessageInterface msg) 97 { 98 if (messageQueue == null) 99 { 100 writeErrorMessage("The thread has not enabled message queue"); 101 return false; 102 } 103 104 synchronized (messageQueue) { 106 messageQueue.addLast(msg); 107 108 messageQueue.notify(); 110 } 111 112 return true; 113 } 114 115 public final boolean removeMessage(AceMessageInterface obj, 116 AceCompareMessageInterface comp) 117 { 118 boolean removed = false; 119 120 if (messageQueue == null) 121 { 122 writeErrorMessage("The thread has not enabled message queue"); 123 return false; 124 } 125 126 synchronized (remLock) 127 { 128 synchronized (messageQueue) 129 { 130 ListIterator iter = messageQueue.listIterator(0); 131 132 while (iter.hasNext() == true) 133 { 134 AceMessageInterface msg = (AceMessageInterface)iter.next(); 135 if (comp.same(obj, msg) == true) { 137 iter.remove(); 139 removed = true; 140 } 141 } 142 } 143 } 144 145 return removed; 146 } 147 148 public final boolean flushMessages() 149 { 150 if (messageQueue == null) 151 { 152 writeErrorMessage("The thread has not enabled message queue"); 153 return false; 154 } 155 156 synchronized (remLock) 157 { 158 synchronized (messageQueue) 159 { 160 messageQueue.clear(); 161 } 162 } 163 return true; 164 } 165 166 protected final AceMessageInterface waitMessage() 167 { 168 if (messageQueue == null) 169 { 170 writeErrorMessage("The thread has not enabled message queue"); 171 return null; 172 } 173 174 synchronized (remLock) 175 { 176 synchronized (messageQueue) 177 { 178 if (messageQueue.size() > 0) 180 { 181 try 182 { 183 return (AceMessageInterface)messageQueue.removeFirst(); 184 } 185 catch (NoSuchElementException ex1) 186 { 187 writeErrorMessage(ex1.getMessage()); 188 return null; 189 } 190 } 191 else { 193 while (true) 194 { 195 try 196 { 197 messageQueue.wait(); 198 break; 199 } 200 catch (InterruptedException ex) 201 { 202 ; } 204 } 205 206 try 208 { 209 return (AceMessageInterface)messageQueue.removeFirst(); 210 } 211 catch (NoSuchElementException ex1) 212 { 213 writeErrorMessage(ex1.getMessage()); 214 return null; 215 } 216 } 217 } 218 } 219 } 220 221 public final boolean interruptWait(int sig_id, String message) 222 { 223 return sendMessage(new AceSignalMessage(sig_id, message)); 225 } 226 227 public final boolean interruptWait(int sig_id) 228 { 229 return interruptWait(sig_id, ""); 230 } 231 232 public void dispose() 233 { 234 235 if (messageQueue != null) 236 { 237 messageQueue.clear(); 238 messageQueue = null; 239 } 240 241 synchronized (aceThreadList) 242 { 243 aceThreadList.remove(new Integer (aceThreadListId)); 244 } 245 246 } 247 248 public final String getErrorMessage() 249 { 250 synchronized (errorLock) 251 { 252 return new String (errorMessage); 253 } 254 } 255 256 protected void writeErrorMessage(String error) 257 { 258 Thread cthread = Thread.currentThread(); 259 260 if ((cthread instanceof AceThread) == true) 261 { 262 ((AceThread)cthread).dispatchErrorMessage(error); 263 } 264 else 265 { 266 System.err.println(error); 267 } 268 } 269 270 protected void dispatchErrorMessage(String error) 271 { 272 synchronized (errorLock) 273 { 274 errorMessage = new String (error); 275 } 276 } 277 278 protected final void setOperationContext(AceOperationContextInterface context) 279 { 280 operationContext = context; 281 } 282 283 protected final AceOperationContextInterface getOperationContext() 284 { 285 return operationContext; 286 } 287 288 public static AceThread getAceThreadObject(int code) 289 { 290 synchronized (aceThreadList) 291 { 292 return (AceThread)aceThreadList.get(new Integer (code)); 293 } 294 } 295 296 protected final int getAceThreadId() 297 { 298 return aceThreadListId; 299 } 300 301 public static int getUniqueAceThreadId() 302 { 303 do 304 { 305 int random_num = randomNumberGenerator.nextInt(); 306 if (random_num <= 0) continue; 307 308 synchronized (aceThreadList) 309 { 310 if (aceThreadList.get(new Integer (random_num)) == null) { 312 return random_num; 313 } 314 } 315 } 316 while (true); 317 } 318 319 private void createMessageQueue() 320 throws IOException 321 { 322 messageQueue = new LinkedList(); 323 } 324 325 private LinkedList messageQueue = null; 326 private Object errorLock = new Object (); 327 private String errorMessage = ""; 328 private AceOperationContextInterface operationContext = null; 329 private int aceThreadListId; 330 331 private Object remLock = new Object (); 335 336 337 private static WeakHashMap aceThreadList = new WeakHashMap(); 338 private static Random randomNumberGenerator = new Random((new Date()).getTime()); 339 340 public static void main(String [] args) 342 { 343 class MyMessage implements AceMessageInterface 344 { 345 public MyMessage(String message) 346 { 347 this.message = message; 348 } 349 350 public String messageType() 351 { 352 return new String (message); 353 } 354 355 private String message; 356 } 357 358 359 class MyAceThreadClass extends AceThread 360 { 361 public MyAceThreadClass(String name) throws IOException 362 { 363 super(name); 364 } 365 366 public void run() 367 { 368 System.out.println(getName() + " started."); 369 while (true) 370 { 371 AceMessageInterface message = this.waitMessage(); 372 if (message == null) 373 { 374 System.err.println(getName() + ": Null message received, killing thread"); 375 this.dispose(); 376 break; 377 } 378 379 if (message.messageType().equals("kill") == true) 380 { 381 System.out.println(getName() + ": kill received, killing thread"); 382 this.dispose(); 383 break; 384 } 385 386 System.out.println(getName() + ": " + message.messageType() + " received"); 387 388 } 389 } 390 } 391 392 try 393 { 394 BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 395 396 int num_threads = 10; 397 398 if (args.length > 0) 399 { 400 try 401 { 402 num_threads = Integer.parseInt(args[0]); 403 } 404 catch (NumberFormatException ex) 405 { 406 System.err.println("Command line syntax error - the argument should specify the number of threads to run"); 407 System.exit(1); 408 } 409 } 410 else 411 { 412 System.out.println("No argument specified, defaulting number of threads to " 413 + num_threads); 414 } 415 416 417 MyAceThreadClass[] threads = new MyAceThreadClass [num_threads]; 418 for (int i = 0; i < num_threads; i++) 419 { 420 threads[i] = new MyAceThreadClass("AceThread-" + i); 421 422 threads[i].start(); } 424 425 String syntax = new String ("Invalid syntax! Correct syntax : <thread number> <message> <number of times>"); 426 while (true) 427 { 428 System.out.print("Input> "); 429 System.out.flush(); 430 431 String line = reader.readLine().trim(); 432 if (line.length() > 0) 433 { 434 StringTokenizer strtok = new StringTokenizer(line, " "); 435 if (strtok.countTokens() >= 3) 436 { 437 int num_times; 438 int thread_index; 439 String message; 440 441 try 442 { 443 thread_index = Integer.parseInt(strtok.nextToken()); 444 message = new String (strtok.nextToken()); 445 num_times = Integer.parseInt(strtok.nextToken()); 446 447 if (thread_index >= num_threads) 448 { 449 System.err.println("That thread is not running"); 450 continue; 451 } 452 453 } 454 catch (NumberFormatException ex3) 455 { 456 System.err.println(syntax); 457 continue; 458 } 459 460 for (int i = 0; i < num_times; i++) 461 { 462 if (threads[thread_index].sendMessage(new MyMessage(message)) == false) 463 { 464 System.err.println("Error occured while sending message"); 465 break; 466 } 467 } 468 } 469 else if (strtok.countTokens() == 1) 470 { 471 if (strtok.nextToken().equals("quit") == true) 472 { 473 for (int i = 0; i < num_threads; i++) 474 { 475 if (threads[i].sendMessage(new MyMessage("kill")) == false) 476 { 477 System.err.println("Error occured while sending message"); 478 } 479 } 480 System.exit(0); 481 } 482 else 483 { 484 System.err.println(syntax); 485 } 486 } 487 else 488 { 489 System.err.println(syntax); 490 } 491 } 492 } 493 } 494 catch (IOException ex) 495 { 496 System.err.println(ex.getMessage()); 497 System.exit(1); 498 } 499 } 500 } 501 502 503 504 505 506 507 | Popular Tags |