1 4 package demo.sharedqueue; 5 6 import java.net.InetAddress ; 7 import java.net.UnknownHostException ; 8 import java.util.Collections ; 9 import java.util.HashMap ; 10 import java.util.LinkedList ; 11 import java.util.List ; 12 import java.util.ListIterator ; 13 import java.util.Random ; 14 15 public class Queue { 16 private List queue = Collections.synchronizedList(new LinkedList ()); 17 private List workers = Collections.synchronizedList(new LinkedList ()); 18 private List completedJobs = Collections.synchronizedList(new LinkedList ()); 19 private int nextJobId; 20 private int port; 21 private static final int MAX_HISTORY_LENGTH = 15; 22 private static final int MAX_QUEUE_LENGTH = 150; 23 24 public Queue(int port) { 25 this.port = port; 26 this.nextJobId = 1; 27 } 28 29 public Job getJob() { 30 synchronized (queue) { 31 while (queue.size() == 0) { 32 try { 33 queue.wait(); 34 } 35 catch (InterruptedException e) { 36 throw new RuntimeException (e); 37 } 38 } 39 return (Job) queue.remove(0); 40 } 41 } 42 43 public String getXmlData() { 44 String data = "<workqueue>"; 46 synchronized (queue) { 47 ListIterator i = queue.listIterator(); 48 while (i.hasNext()) { 49 Job job = (Job) i.next(); 50 data += job.toXml(); 51 } 52 } 53 data += "</workqueue>"; 54 55 data += "<completed>"; 57 synchronized (completedJobs) { 58 ListIterator i = completedJobs.listIterator(); 59 while (i.hasNext()) { 60 Job job = (Job) i.next(); 61 data += job.toXml(); 62 } 63 } 64 data += "</completed>"; 65 66 data += "<consumers>"; 68 synchronized (workers) { 69 ListIterator i = workers.listIterator(); 70 while (i.hasNext()) { 71 Worker worker = (Worker) i.next(); 72 data += worker.toXml(); 73 } 74 } 75 data += "</consumers>"; 76 return data; 77 } 78 79 public Worker createWorker(String nodeId) { 80 synchronized (workers) { 81 Worker worker = new Worker(this, port, nodeId); 82 workers.add(worker); 83 Thread t = new Thread (worker); 84 t.setDaemon(true); 85 t.start(); 86 return worker; 87 } 88 } 89 90 public Worker getWorker(String nodeId) { 91 synchronized (workers) { 92 ListIterator i = workers.listIterator(); 93 while (i.hasNext()) { 94 Worker worker = (Worker) i.next(); 95 if (worker.getNodeId().equals(nodeId)) { 96 return worker; 97 } 98 } 99 } 100 return null; 101 } 102 103 public void log(Job job) { 104 synchronized (completedJobs) { 105 completedJobs.add(0, job); 106 if (completedJobs.size() > MAX_HISTORY_LENGTH) { 107 completedJobs.remove(completedJobs.size() - 1); 108 } 109 } 110 } 111 112 public void reap() { 113 synchronized (workers) { 114 ListIterator i = workers.listIterator(); 115 while (i.hasNext()) { 116 Worker worker = (Worker) i.next(); 117 if (worker.expire()) { 118 i.remove(); 119 } 120 } 121 } 122 } 123 124 public void addJob() { 125 synchronized (queue) { 126 if (queue.size() >= MAX_QUEUE_LENGTH) { 127 return; 128 } 129 130 Job job = new Job(Queue.getHostName() + " " + this.port, this.nextJobId); 131 this.nextJobId = this.nextJobId < 999 ? this.nextJobId + 1 : 1; 132 queue.add(job); 133 queue.notifyAll(); 134 } 135 } 136 137 public void addJob(Job job) { 138 synchronized (queue) { 139 queue.add(job); 140 queue.notifyAll(); 141 } 142 } 143 144 public static String getHostName() { 145 try { 146 InetAddress addr = InetAddress.getLocalHost(); 147 byte[] ipAddr = addr.getAddress(); 148 return addr.getHostName(); 149 } 150 catch (UnknownHostException e) { 151 return "Unknown"; 152 } 153 } 154 } 155 | Popular Tags |