1 4 package demo.sharedqueue; 5 6 import java.util.Date ; 7 import java.util.Collections ; 8 import java.util.ListIterator ; 9 import java.util.LinkedList ; 10 import java.util.List ; 11 import java.util.Random ; 12 13 class Worker implements Runnable { 14 15 private String name; 16 17 private int port; 18 19 private Queue queue; 20 21 private List jobs; 22 23 private String nodeId; 24 25 private int health = HEALTH_ALIVE; 26 27 private static int HEALTH_ALIVE = 0; 28 29 private static int HEALTH_DYING = 1; 30 31 private static int HEALTH_DEAD = 2; 32 33 private static final int MAX_LOAD = 10; 34 35 public Worker(Queue queue, int port, String nodeId) { 36 this.name = Queue.getHostName(); 37 this.port = port; 38 this.queue = queue; 39 this.nodeId = nodeId; 40 jobs = Collections.synchronizedList(new LinkedList ()); 41 } 42 43 public String getNodeId() { 44 return this.nodeId; 45 } 46 47 public String getName() { 48 return "node: " + nodeId + " (" + name + ":" + port + ")"; 49 } 50 51 public String toXml() { 52 synchronized (jobs) { 53 String data = "<worker><name>" + getName() + "</name><jobs>"; 54 ListIterator i = jobs.listIterator(); 55 while (i.hasNext()) { 56 data += ((Job) i.next()).toXml(); 57 } 58 data += "</jobs></worker>"; 59 return data; 60 } 61 } 62 63 69 public synchronized boolean expire() { 70 if (HEALTH_DYING == health) { 71 if (jobs.size() > 0) { 74 queue.addJob((Job) jobs.remove(0)); 75 } else { 76 setHealth(HEALTH_DEAD); 77 } 78 } 79 return (HEALTH_DEAD == health); 80 } 81 82 88 private synchronized void setHealth(int health) { 89 this.health = health; 90 } 91 92 98 public synchronized void markForExpiration() { 99 setHealth(HEALTH_DYING); 100 } 101 102 public void run() { 103 while (HEALTH_DEAD != health) { 104 if ((HEALTH_ALIVE == health) && (jobs.size() < MAX_LOAD)) { 105 final Job job = queue.getJob(); 106 107 try { 108 Thread.sleep(500); 109 } catch (InterruptedException ie) { 110 System.err.println(ie.getMessage()); 111 } 112 113 synchronized (jobs) { 114 jobs.add(job); 115 } 116 117 Thread processor = new Thread (new Runnable () { 118 public void run() { 119 job.run(Worker.this); 120 synchronized (jobs) { 121 jobs.remove(job); 122 } 123 queue.log(job); 124 } 125 }); 126 processor.start(); 127 } 128 } 129 } 130 } 131 | Popular Tags |