1 24 33 34 package org.datashare; 35 36 import java.util.LinkedList ; 37 38 45 public class FifoQueue implements Runnable 46 { 47 private LinkedList list = new LinkedList (); 49 private boolean closed = false; 51 private FifoConsumer consumer = null; 53 private Thread myThread = null; 54 55 58 public FifoQueue() 59 { 60 } 61 62 66 public void 67 run() 68 { 69 while(!closed) 70 { 71 try{ 72 Object object = read(); 73 if(consumer != null) 74 consumer.newFifoDataAvailable(object); 75 } 76 catch(Exception e) 77 { 78 SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().ERROR, 79 SessionUtilities.getLoggingInterface().GENERALSTATUS, 80 "Problems in FifoQueue, thread named " + this.myThread.getName()); 81 e.printStackTrace(); 82 } 83 } 84 } 85 86 91 public void 92 setConsumer(FifoConsumer consumer) 93 { 94 if(this.consumer == null) 95 { 96 this.consumer = consumer; 97 myThread = new Thread (this, "FifoQueueThread-for-"+consumer); 98 myThread.setDaemon(true); 99 myThread.start(); 100 } 101 else 102 { 103 SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().DEBUG, 104 SessionUtilities.getLoggingInterface().GENERALSTATUS, 105 "FifoQueue.setConsumer, queue already has a consumer-> " + this.consumer); 106 Thread.dumpStack(); 107 } 108 } 109 110 114 public void write(Object object) 115 { 116 synchronized(list) 120 { 121 list.add(object); 122 list.notifyAll(); 123 } 124 } 125 126 129 public Object read() 130 { 131 Object object = null; 132 if(!closed) 133 { 134 synchronized(list) 135 { 136 if(list.isEmpty()) 138 { 139 try{ 140 list.wait(); 144 } 145 catch(InterruptedException ie) 146 { 147 SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().WARNING, 148 SessionUtilities.getLoggingInterface().GENERALSTATUS, 149 "FifoQueue had interrupted exception..."); 150 ie.printStackTrace(); 151 } 152 } 153 try{ 154 if(!closed) object = list.remove(0); } 157 catch(Exception e) 158 { 159 SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().ERROR, 160 SessionUtilities.getLoggingInterface().GENERALSTATUS, 161 "Problems getting object from pipe for reader " + Thread.currentThread().getName()); 162 e.printStackTrace(); 163 object = null; 164 } 165 } 166 } 167 168 if(object == null) 169 SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().DEBUG, 170 SessionUtilities.getLoggingInterface().GENERALSTATUS, 171 "FifoQueue issuing null object to thread " + Thread.currentThread().getName() +" (to close queue)"); 172 173 return object; 174 } 175 176 179 public int 180 size() 181 { 182 return list.size(); 183 } 184 185 190 public void 191 close() 192 { 193 SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().DEBUG, 194 SessionUtilities.getLoggingInterface().GENERALSTATUS, 195 "Closing FifoQueue"); 196 closed = true; 197 consumer = null; 198 reset(); 199 synchronized(list) 200 { 201 list.notifyAll(); } 203 } 204 205 208 public void 209 reset() 210 { 211 synchronized(list) 212 { 213 if(!list.isEmpty()) 214 { 215 list.clear(); 216 } 217 } 218 } 219 220 } 221 | Popular Tags |