1 24 25 package org.objectweb.dream.pushwithreturn; 26 27 import java.util.HashMap ; 28 import java.util.Iterator ; 29 import java.util.Map ; 30 31 import org.objectweb.dream.AbstractComponent; 32 import org.objectweb.dream.InterruptedPushException; 33 import org.objectweb.dream.Push; 34 import org.objectweb.dream.Push1; 35 import org.objectweb.dream.PushException; 36 import org.objectweb.dream.PushWithReturn; 37 import org.objectweb.dream.message.Message; 38 import org.objectweb.dream.message.manager.MessageManager; 39 import org.objectweb.dream.pool.ObjectPool; 40 import org.objectweb.dream.pool.Recyclable; 41 import org.objectweb.fractal.api.NoSuchInterfaceException; 42 import org.objectweb.fractal.api.control.IllegalBindingException; 43 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 44 import org.objectweb.util.monolog.api.BasicLevel; 45 46 61 public class SynchronizerImpl extends AbstractComponent 62 implements 63 PushWithReturn, 64 Push1, 65 SynchronizerAttributeController 66 { 67 68 69 HashMap waitingKeysMap = new HashMap (); 70 71 73 MessageManager messageManagerItf; 74 75 76 ObjectPool waitingKeyPoolItf; 77 78 79 KeyGenerator keyGeneratorItf; 80 81 82 Push outPushItf; 83 84 boolean mustClone; 85 86 90 94 public Message pushWithReturn(Message message, Map context) 95 throws PushException 96 { 97 return doPush(message); 98 } 99 100 103 public void push(Message message, Map context) throws PushException 104 { 105 doPush(message); 106 } 107 108 112 116 public void push1(Message message, Map context) throws PushException 117 { 118 Key key = null; 119 try 120 { 121 key = keyGeneratorItf.generateKey(message); 123 } 124 catch (Exception e) 125 { 126 logger.log(BasicLevel.ERROR, e); 127 throw new PushException(e); 128 } 129 130 if (key.isAll()) 131 { 132 synchronized (waitingKeysMap) 133 { 134 Iterator iter = waitingKeysMap.keySet().iterator(); 135 while (iter.hasNext()) 136 { 137 Object keyTemp = iter.next(); 138 WaitingKey waitingKey = (WaitingKey) waitingKeysMap.get(keyTemp); 139 waitingKey.setReturnMessage(message); 140 synchronized (waitingKey) 141 { 142 waitingKey.setCanPass(true); 143 waitingKey.notifyAll(); 144 } 145 } 146 } 147 } 148 else 149 { 150 WaitingKey waitingKey = (WaitingKey) waitingKeysMap.get(key); 151 if (waitingKey != null) 152 { 153 waitingKey.setReturnMessage(message); 154 synchronized (waitingKey) 155 { 156 waitingKey.setCanPass(true); 157 waitingKey.notifyAll(); 158 } 159 } 160 else 161 { 162 messageManagerItf.deleteMessage(message); 163 } 164 } 165 } 166 167 171 private Message doPush(Message message) throws PushException 172 { 173 Key key = null; 174 try 175 { 176 key = keyGeneratorItf.generateKey(message); 178 } 179 catch (Exception e) 180 { 181 logger.log(BasicLevel.ERROR, e); 182 throw new PushException(e); 183 } 184 185 WaitingKey waitingKey; 187 synchronized (waitingKeysMap) 188 { 189 waitingKey = (WaitingKey) waitingKeysMap.get(key); 190 if (waitingKey == null) 191 { 192 waitingKey = (WaitingKey) waitingKeyPoolItf.newInstance(); 193 waitingKeysMap.put(key, waitingKey); 194 } 195 synchronized (waitingKey) 196 { 197 waitingKey.incrementNbWaitings(); 198 } 199 } 200 201 outPushItf.push(message, null); 203 204 Message returnMessage; 206 synchronized (waitingKey) 207 { 208 if (!waitingKey.canPass()) 209 { 210 try 211 { 212 waitingKey.wait(); 213 } 214 catch (InterruptedException e) 215 { 216 throw new InterruptedPushException(e); 217 } 218 } 219 returnMessage = waitingKey.getReturnMessage(); 220 waitingKey.decrementNbWaitings(); 221 if (waitingKey.getNbWaitings() > 0) 222 { 223 return messageManagerItf.duplicateMessage(returnMessage, mustClone); 225 } 226 } 227 synchronized (waitingKeysMap) 228 { 229 synchronized (waitingKey) 230 { 231 if (waitingKey.getNbWaitings() <= 0) 232 { 233 waitingKeyPoolItf.recycleInstance((Recyclable) waitingKeysMap 234 .remove(key)); 235 } 236 } 237 } 238 return returnMessage; 239 } 240 241 246 249 public void setMustClone(boolean mustClone) 250 { 251 this.mustClone = mustClone; 252 } 253 254 257 public boolean getMustClone() 258 { 259 return mustClone; 260 } 261 262 266 270 public void bindFc(String clientItfName, Object serverItf) 271 throws NoSuchInterfaceException, IllegalBindingException, 272 IllegalLifeCycleException 273 { 274 super.bindFc(clientItfName, serverItf); 275 if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME)) 276 { 277 outPushItf = (Push) serverItf; 278 } 279 else if (clientItfName.equals(KeyGenerator.ITF_NAME)) 280 { 281 keyGeneratorItf = (KeyGenerator) serverItf; 282 } 283 else if (clientItfName.equals(ObjectPool.ITF_NAME)) 284 { 285 waitingKeyPoolItf = (ObjectPool) serverItf; 286 } 287 else if (clientItfName.equals(MessageManager.ITF_NAME)) 288 { 289 messageManagerItf = (MessageManager) serverItf; 290 } 291 } 292 293 296 public String [] listFc() 297 { 298 return new String []{KeyGenerator.ITF_NAME, Push.OUT_PUSH_ITF_NAME, 299 ObjectPool.ITF_NAME, MessageManager.ITF_NAME}; 300 } 301 302 306 309 public static class WaitingKey implements Recyclable 310 { 311 312 private Message returnMessage; 313 314 315 private int nbWaitings = 0; 316 317 private boolean canPass = false; 318 319 324 public Message getReturnMessage() 325 { 326 return returnMessage; 327 } 328 329 334 public void setReturnMessage(Message returnMessage) 335 { 336 this.returnMessage = returnMessage; 337 } 338 339 342 public void incrementNbWaitings() 343 { 344 nbWaitings++; 345 } 346 347 350 public void decrementNbWaitings() 351 { 352 nbWaitings--; 353 } 354 355 360 public int getNbWaitings() 361 { 362 return nbWaitings; 363 } 364 365 370 public void setCanPass(boolean canPass) 371 { 372 this.canPass = canPass; 373 } 374 375 380 public boolean canPass() 381 { 382 return this.canPass; 383 } 384 385 389 392 public void recycle() 393 { 394 this.returnMessage = null; 395 this.nbWaitings = 0; 396 this.canPass = false; 397 } 398 } 399 400 } | Popular Tags |