1 23 24 package org.objectweb.clif.datacollector.lib; 25 26 import org.objectweb.fractal.api.control.BindingController; 27 import org.objectweb.fractal.api.control.LifeCycleController; 28 import org.objectweb.clif.storage.api.StorageWrite; 29 import org.objectweb.clif.storage.api.LifeCycleEvent; 30 import org.objectweb.clif.storage.api.AlarmEvent; 31 import org.objectweb.clif.storage.api.BladeEvent; 32 import org.objectweb.clif.storage.api.ActionEvent; 33 import org.objectweb.clif.storage.api.ProbeEvent; 34 import org.objectweb.clif.datacollector.api.DataCollectorWrite; 35 import org.objectweb.clif.datacollector.api.DataCollectorAdmin; 36 import java.io.Serializable ; 37 import java.util.SortedSet ; 38 import java.util.TreeSet ; 39 import java.util.Collections ; 40 41 42 46 abstract public class AbstractDataCollector 47 implements 48 DataCollectorWrite, 49 DataCollectorAdmin, 50 BindingController, 51 LifeCycleController 52 { 53 static protected long DELAY_MS = 10000; 54 static final String [] interfaceNames = new String [] { StorageWrite.STORAGE_WRITE }; 55 56 57 protected StorageWrite sws; 58 SortedSet eventQ = Collections.synchronizedSortedSet(new TreeSet ()); 59 DelayedWriter writer; 60 boolean stopped; 61 Object state_lock = new Object (); 62 63 64 public AbstractDataCollector() 65 { 66 } 67 68 69 73 74 public void startFc() 75 { 76 synchronized (state_lock) 77 { 78 stopped = false; 79 } 80 } 81 82 83 public void stopFc() 84 { 85 terminate(); 86 } 87 88 89 public String getFcState() 90 { 91 synchronized (state_lock) 92 { 93 return stopped ? LifeCycleController.STOPPED : LifeCycleController.STARTED; 94 } 95 } 96 97 98 102 103 public Object lookupFc(String clientItfName) 104 { 105 if (clientItfName.equals(StorageWrite.STORAGE_WRITE)) 106 { 107 return sws; 108 } 109 else 110 { 111 return null; 112 } 113 } 114 115 116 public void bindFc(String clientItfName, Object serverItf) 117 { 118 if (clientItfName.equals(StorageWrite.STORAGE_WRITE)) 119 { 120 sws = (StorageWrite) serverItf; 121 } 122 } 123 124 125 public void unbindFc(String clientItfName) 126 { 127 if (clientItfName.equals(StorageWrite.STORAGE_WRITE)) 128 { 129 sws = null; 130 } 131 } 132 133 134 public String [] listFc() 135 { 136 return sws == null ? new String [0] : interfaceNames; 137 } 138 139 140 144 145 149 public void init(Serializable testId, String bladeId) 150 { 151 stopped = false; 152 writer = new DelayedWriter(); 153 writer.start(); 154 } 155 156 157 161 public void terminate() 162 { 163 synchronized (state_lock) 164 { 165 if (writer != null) 166 { 167 stopped = true; 168 try 169 { 170 state_lock.wait(); 171 } 172 catch (InterruptedException ex) 173 { 174 ex.printStackTrace(System.err); 175 } 176 } 177 } 178 } 179 180 181 185 public void add(LifeCycleEvent event) 186 { 187 eventQ.add(event); 188 } 189 190 191 195 public void add(ActionEvent action) 196 { 197 eventQ.add(action); 198 } 199 200 201 205 public void add(AlarmEvent alarm) 206 { 207 eventQ.add(alarm); 208 } 209 210 211 215 public void add(ProbeEvent measure) 216 { 217 if (measure != null) 218 { 219 eventQ.add(measure); 220 } 221 } 222 223 224 228 229 class DelayedWriter extends Thread 230 { 231 public DelayedWriter() 232 { 233 super("Delayed writer"); 234 } 235 236 237 public void run() 238 { 239 BladeEvent firstEvent; 240 while (! (stopped && eventQ.isEmpty())) 241 { 242 if (eventQ.isEmpty()) 243 { 244 try 245 { 246 sleep(DELAY_MS); 247 } 248 catch (InterruptedException ex) 249 { 250 } 251 } 252 else if ((firstEvent = (BladeEvent)eventQ.first()).getDate() + DELAY_MS < System.currentTimeMillis()) 253 { 254 eventQ.remove(firstEvent); 255 if (sws != null) 256 { 257 sws.write(firstEvent); 258 } 259 } 260 else 261 { 262 try 263 { 264 sleep(DELAY_MS - System.currentTimeMillis() + firstEvent.getDate()); 265 } 266 catch (Exception ex) 267 { 268 } 269 } 270 } 271 synchronized (state_lock) 272 { 273 writer = null; 274 state_lock.notify(); 275 } 276 } 277 } 278 } 279 | Popular Tags |