1 24 25 package org.objectweb.dream.protocol.causality; 26 27 import java.util.Iterator ; 28 import java.util.LinkedList ; 29 import java.util.List ; 30 import java.util.Map ; 31 32 import org.objectweb.dream.AbstractComponent; 33 import org.objectweb.dream.Push; 34 import org.objectweb.dream.PushException; 35 import org.objectweb.dream.message.ExtensibleMessage; 36 import org.objectweb.dream.message.Message; 37 import org.objectweb.dream.message.manager.MessageManager; 38 import org.objectweb.dream.protocol.ArrowChunk; 39 import org.objectweb.dream.util.Error; 40 import org.objectweb.fractal.api.NoSuchInterfaceException; 41 import org.objectweb.fractal.api.control.IllegalBindingException; 42 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 43 import org.objectweb.util.monolog.api.BasicLevel; 44 45 49 public class InMessageSorterImpl extends AbstractComponent 50 implements 51 Push, 52 CausalityTransformerAttributeController 53 { 54 55 private MatrixClock matrixClockItf; 56 private Push outPushItf; 57 private MessageManager messageManagerItf; 58 59 private List waitingToDeliver; 60 private String arrowChunkName; 61 private String causalityChunkName; 62 63 66 public InMessageSorterImpl() 67 { 68 waitingToDeliver = new LinkedList (); 69 } 70 71 74 public synchronized void push(Message message, Map context) 75 throws PushException 76 { 77 ArrowChunk arrowChunk = (ArrowChunk) message.getChunk(arrowChunkName); 78 79 if (arrowChunk == null) 80 { 81 logger.log(BasicLevel.ERROR, "Unable to find arrow chunk named " 82 + arrowChunkName + ". The message is dropped"); 83 return; 84 } 85 86 CausalityChunk csltChunk = (CausalityChunk) message 87 .getChunk(causalityChunkName); 88 89 if (csltChunk == null) 90 { 91 throw new PushException("Unable to find causality chunk named " 92 + causalityChunkName); 93 } 94 95 int todo = matrixClockItf.testRecvMatrix(csltChunk.getCausalityStamp(), 96 arrowChunk.getProcessIdFrom(), arrowChunk.getProcessIdTo()); 97 if (todo == MatrixClock.DELIVER) 98 { 99 deliverMessage(message); 100 101 boolean loop = true; 103 while (loop) 104 { 105 loop = false; 106 Iterator iter = waitingToDeliver.iterator(); 107 while (iter.hasNext()) 108 { 109 Message msg = (Message) iter.next(); 110 ArrowChunk arrowChunk1 = (ArrowChunk) msg.getChunk(arrowChunkName); 111 CausalityChunk csltChunk1 = (CausalityChunk) msg 112 .getChunk(causalityChunkName); 113 114 if (logger.isLoggable(BasicLevel.DEBUG)) 115 { 116 logger.log(BasicLevel.DEBUG, "Try to deliver message : " + msg); 117 } 118 todo = matrixClockItf.testRecvMatrix(csltChunk1.getCausalityStamp(), 119 arrowChunk1.getProcessIdFrom(), arrowChunk1.getProcessIdTo()); 120 if (todo == MatrixClock.DELIVER) 121 { 122 iter.remove(); 123 deliverMessage(msg); 124 loop = true; 126 break; 127 } 128 else if (todo == MatrixClock.ALREADY_DELIVERED) 129 { 130 Error.bug(logger); 131 } 132 logger.log(BasicLevel.DEBUG, "Message can't be delivered yet"); 133 } 134 } 135 } 136 else if (todo == MatrixClock.WAIT_TO_DELIVER) 137 { 138 waitingToDeliver.add(message); 140 if (logger.isLoggable(BasicLevel.DEBUG)) 141 { 142 logger.log(BasicLevel.DEBUG, "block message " + message); 143 } 144 } 145 else 146 { 147 if (logger.isLoggable(BasicLevel.DEBUG)) 149 { 150 logger.log(BasicLevel.DEBUG, "message already delivred " + message); 151 } 152 messageManagerItf.deleteMessage(message); 153 } 154 } 155 156 private void deliverMessage(Message message) 157 { 158 if (message instanceof ExtensibleMessage) 160 { 161 Object causalityChunk = ((ExtensibleMessage) message) 162 .removeChunk(causalityChunkName); 163 messageManagerItf.deleteChunk(causalityChunk); 164 } 165 166 if (logger.isLoggable(BasicLevel.DEBUG)) 167 { 168 logger.log(BasicLevel.DEBUG, "deliver message " + message); 169 } 170 try 171 { 172 outPushItf.push(message, null); 173 } 174 catch (PushException e) 175 { 176 logger.log(BasicLevel.ERROR, "A push exception occurs, drop message", e); 177 messageManagerItf.deleteMessage(message); 178 } 179 } 180 181 185 188 public String getArrowChunkName() 189 { 190 return arrowChunkName; 191 } 192 193 196 public void setArrowChunkName(String arrowChunkName) 197 { 198 this.arrowChunkName = arrowChunkName; 199 } 200 201 204 public String getCausalityChunkName() 205 { 206 return causalityChunkName; 207 } 208 209 212 public void setCausalityChunkName(String causalityChunkName) 213 { 214 this.causalityChunkName = causalityChunkName; 215 } 216 217 221 224 public String [] listFc() 225 { 226 return new String []{Push.OUT_PUSH_ITF_NAME, 227 MatrixClock.MATRIX_CLOCK_ITF_NAME, MessageManager.ITF_NAME}; 228 } 229 230 234 public synchronized void bindFc(String clientItfName, Object serverItf) 235 throws NoSuchInterfaceException, IllegalBindingException, 236 IllegalLifeCycleException 237 { 238 super.bindFc(clientItfName, serverItf); 239 if (clientItfName.equals(MatrixClock.MATRIX_CLOCK_ITF_NAME)) 240 { 241 matrixClockItf = (MatrixClock) serverItf; 242 } 243 else if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME)) 244 { 245 outPushItf = (Push) serverItf; 246 } 247 else if (clientItfName.equals(MessageManager.ITF_NAME)) 248 { 249 messageManagerItf = (MessageManager) serverItf; 250 } 251 } 252 253 } | Popular Tags |