1 24 25 package org.objectweb.dream.protocol.utobcast; 26 27 import java.util.Map ; 28 29 import org.objectweb.dream.AbstractComponent; 30 import org.objectweb.dream.IOPushException; 31 import org.objectweb.dream.InterruptedPullException; 32 import org.objectweb.dream.InterruptedPushException; 33 import org.objectweb.dream.Pull; 34 import org.objectweb.dream.PullException; 35 import org.objectweb.dream.Push; 36 import org.objectweb.dream.PushException; 37 import org.objectweb.dream.message.Message; 38 import org.objectweb.dream.message.manager.MessageManager; 39 import org.objectweb.dream.protocol.utobcast.message.UTOBcastChunk; 40 import org.objectweb.dream.util.Error; 41 import org.objectweb.fractal.api.NoSuchInterfaceException; 42 import org.objectweb.fractal.api.control.IllegalBindingException; 43 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 44 import org.objectweb.util.monolog.api.BasicLevel; 45 46 56 public class UTOImpl extends AbstractComponent implements Push 57 { 58 59 63 67 public static final String DELIVERY_ITF_NAME = "delivery"; 68 69 73 public static final String TO_BE_DELIVERED_IN_ITF_NAME = "to-be-delivered-in"; 74 75 79 public static final String TO_BE_DELIVERED_OUT_ITF_NAME = "to-be-delivered-out"; 80 81 82 protected Push deliveryItf; 83 84 85 protected Push toBeDeliveredInItf; 86 87 91 protected Pull toBeDeliveredOutItf; 92 93 94 protected Push outPushItf; 95 96 97 protected ProcessMembership processMembershipItf; 98 99 100 protected MessageManager messageManagerItf; 101 102 106 109 public UTOImpl() 110 { 111 } 112 113 117 121 public void push(Message message, Map context) throws PushException 122 { 123 if (logger.isLoggable(BasicLevel.DEBUG)) 124 { 125 logger.log(BasicLevel.DEBUG, "Received UTO message " + message); 126 } 127 131 if (logger.isLoggable(BasicLevel.DEBUG)) 132 { 133 logger.log(BasicLevel.DEBUG, "Add UTO message " + message 134 + " to the toBeDelivered queue"); 135 } 136 Message msg = messageManagerItf.duplicateMessage(message, true); 137 toBeDeliveredInItf.push(msg, null); 138 139 try 144 { 145 while ((msg = toBeDeliveredOutItf.pull(null)) != null) 146 { 147 if (logger.isLoggable(BasicLevel.DEBUG)) 148 { 149 logger.log(BasicLevel.DEBUG, "Deliver UTO message " + msg); 150 } 151 deliveryItf.push(msg, null); 152 } 153 } 154 catch (InterruptedPullException e) 155 { 156 throw new InterruptedPushException("Interrupted while pulling a message"); 158 } 159 catch (PullException e) 160 { 161 Error.bug(logger, e); 163 } 164 168 UTOBcastChunk chunk = (UTOBcastChunk) message 169 .getChunk(UTOBcastChunk.DEFAULT_NAME); 170 chunk.setUTOBcastMessageType(UTOBcastChunk.ACK); 171 172 try 174 { 175 chunk.setProcessTo(processMembershipItf.getLeader()); 176 } 177 catch (InterruptedException e1) 178 { 179 throw new InterruptedPushException(e1); 180 } 181 try 182 { 183 messageManagerItf.duplicateMessage(message, false); 185 if (logger.isLoggable(BasicLevel.DEBUG)) 186 { 187 logger.log(BasicLevel.DEBUG, "Send ACK message " + message 188 + " to leader"); 189 } 190 outPushItf.push(message, null); 191 } 192 catch (IOPushException e) 193 { 194 e.printStackTrace(); 197 } 198 199 try 201 { 202 chunk.setProcessTo(processMembershipItf.getBackup()); 203 } 204 catch (InterruptedException e1) 205 { 206 throw new InterruptedPushException(e1); 207 } 208 try 209 { 210 if (logger.isLoggable(BasicLevel.DEBUG)) 212 { 213 logger.log(BasicLevel.DEBUG, "Send ACK message " + message 214 + " to backup"); 215 } 216 outPushItf.push(message, null); 217 } 218 catch (IOPushException e) 219 { 220 e.printStackTrace(); 223 } 224 225 } 226 227 231 235 public void bindFc(String clientItfName, Object serverItf) 236 throws NoSuchInterfaceException, IllegalBindingException, 237 IllegalLifeCycleException 238 { 239 super.bindFc(clientItfName, serverItf); 240 if (clientItfName.equals(DELIVERY_ITF_NAME)) 241 { 242 deliveryItf = (Push) serverItf; 243 } 244 else if (clientItfName.equals(TO_BE_DELIVERED_IN_ITF_NAME)) 245 { 246 toBeDeliveredInItf = (Push) serverItf; 247 } 248 else if (clientItfName.equals(TO_BE_DELIVERED_OUT_ITF_NAME)) 249 { 250 toBeDeliveredOutItf = (Pull) serverItf; 251 } 252 else if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME)) 253 { 254 outPushItf = (Push) serverItf; 255 } 256 else if (clientItfName.equals(ProcessMembership.ITF_NAME)) 257 { 258 processMembershipItf = (ProcessMembership) serverItf; 259 } 260 else if (clientItfName.equals(MessageManager.ITF_NAME)) 261 { 262 messageManagerItf = (MessageManager) serverItf; 263 } 264 } 265 266 269 public String [] listFc() 270 { 271 return new String []{DELIVERY_ITF_NAME, TO_BE_DELIVERED_IN_ITF_NAME, 272 TO_BE_DELIVERED_OUT_ITF_NAME, Push.OUT_PUSH_ITF_NAME, 273 ProcessMembership.ITF_NAME, MessageManager.ITF_NAME}; 274 } 275 276 } | Popular Tags |