KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > demo > sharedqueue > Worker


1 /*
2  @COPYRIGHT@
3  */

4 package demo.sharedqueue;
5
6 import java.util.Date JavaDoc;
7 import java.util.Collections JavaDoc;
8 import java.util.ListIterator JavaDoc;
9 import java.util.LinkedList JavaDoc;
10 import java.util.List JavaDoc;
11 import java.util.Random JavaDoc;
12
13 class Worker implements Runnable JavaDoc {
14
15     private String JavaDoc name;
16
17     private int port;
18
19     private Queue queue;
20
21     private List JavaDoc jobs;
22
23     private String JavaDoc nodeId;
24
25     private int health = HEALTH_ALIVE;
26
27     private static int HEALTH_ALIVE = 0;
28
29     private static int HEALTH_DYING = 1;
30
31     private static int HEALTH_DEAD = 2;
32
33     private static final int MAX_LOAD = 10;
34
35     public Worker(Queue queue, int port, String JavaDoc nodeId) {
36         this.name = Queue.getHostName();
37         this.port = port;
38         this.queue = queue;
39         this.nodeId = nodeId;
40         jobs = Collections.synchronizedList(new LinkedList JavaDoc());
41     }
42
43     public String JavaDoc getNodeId() {
44         return this.nodeId;
45     }
46
47     public String JavaDoc getName() {
48         return "node: " + nodeId + " (" + name + ":" + port + ")";
49     }
50
51     public String JavaDoc toXml() {
52         synchronized (jobs) {
53             String JavaDoc data = "<worker><name>" + getName() + "</name><jobs>";
54             ListIterator JavaDoc i = jobs.listIterator();
55             while (i.hasNext()) {
56                 data += ((Job) i.next()).toXml();
57             }
58             data += "</jobs></worker>";
59             return data;
60         }
61     }
62
63     /**
64      * Attempt to mark the Worker as dead (if it's already dying); Note that we
65      * synchronize this method since it's mutating a shared object (this class)
66      *
67      * @return True if the Worker is dead.
68      */

69     public synchronized boolean expire() {
70         if (HEALTH_DYING == health) {
71             // a dying Worker wont die until it has
72
// consumed all of it's jobs
73
if (jobs.size() > 0) {
74                 queue.addJob((Job) jobs.remove(0));
75             } else {
76                 setHealth(HEALTH_DEAD);
77             }
78         }
79         return (HEALTH_DEAD == health);
80     }
81
82     /**
83      * Set the state of the Worker's health; Note that we synchronize this
84      * method since it's mutating a shared object (this class)
85      *
86      * @param health
87      */

88     private synchronized void setHealth(int health) {
89         this.health = health;
90     }
91
92     /**
93      * Set the state of the Worker's health to dying; Note that we synchronize
94      * this method since it's mutating a shared object (this class)
95      *
96      * @param health
97      */

98     public synchronized void markForExpiration() {
99         setHealth(HEALTH_DYING);
100     }
101
102     public void run() {
103         while (HEALTH_DEAD != health) {
104             if ((HEALTH_ALIVE == health) && (jobs.size() < MAX_LOAD)) {
105                 final Job job = queue.getJob();
106
107                 try {
108                     Thread.sleep(500);
109                 } catch (InterruptedException JavaDoc ie) {
110                     System.err.println(ie.getMessage());
111                 }
112
113                 synchronized (jobs) {
114                     jobs.add(job);
115                 }
116
117                 Thread JavaDoc processor = new Thread JavaDoc(new Runnable JavaDoc() {
118                     public void run() {
119                         job.run(Worker.this);
120                         synchronized (jobs) {
121                             jobs.remove(job);
122                         }
123                         queue.log(job);
124                     }
125                 });
126                 processor.start();
127             }
128         }
129     }
130 }
131
Popular Tags