1 24 25 package org.objectweb.dream.protocol.utobcast; 26 27 import java.util.Map ; 28 29 import org.objectweb.dream.AbstractComponent; 30 import org.objectweb.dream.IOPushException; 31 import org.objectweb.dream.InterruptedPushException; 32 import org.objectweb.dream.Push; 33 import org.objectweb.dream.PushException; 34 import org.objectweb.dream.PushWithReturn; 35 import org.objectweb.dream.message.ChunkAlreadyExistsException; 36 import org.objectweb.dream.message.ExtensibleMessage; 37 import org.objectweb.dream.message.Message; 38 import org.objectweb.dream.message.manager.MessageManager; 39 import org.objectweb.dream.protocol.Process; 40 import org.objectweb.dream.protocol.utobcast.message.CrashedLeaderChunk; 41 import org.objectweb.dream.protocol.utobcast.message.UTOBcastChunk; 42 import org.objectweb.dream.util.Error; 43 import org.objectweb.fractal.api.NoSuchInterfaceException; 44 import org.objectweb.fractal.api.control.IllegalBindingException; 45 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 46 import org.objectweb.util.monolog.api.BasicLevel; 47 48 53 public class BroadcastImpl extends AbstractComponent implements Push 54 { 55 56 60 61 protected Push outPushItf; 62 63 64 protected PushWithReturn outPushWithReturnItf; 65 66 67 protected ProcessMembership processMembershipItf; 68 69 70 protected MessageManager messageManagerItf; 71 72 Process myProcess; 73 boolean configured = false; 74 75 79 82 public BroadcastImpl() 83 { 84 } 85 86 90 94 public void push(Message message, Map context) throws PushException 95 { 96 if (logger.isLoggable(BasicLevel.DEBUG)) 97 { 98 logger.log(BasicLevel.DEBUG, "Received message " + message 99 + " to be broadcast"); 100 } 101 if (!configured) 102 { 103 try 104 { 105 configure(); 106 } 107 catch (InterruptedException e1) 108 { 109 throw new InterruptedPushException(e1); 110 } 111 } 112 UTOBcastChunk utobcastChunk = (UTOBcastChunk) message 114 .getChunk(UTOBcastChunk.DEFAULT_NAME); 115 if (utobcastChunk == null) 116 { 117 if (message instanceof ExtensibleMessage) 119 { 120 utobcastChunk = (UTOBcastChunk) messageManagerItf 121 .createChunk(UTOBcastChunk.TYPE); 122 try 123 { 124 ((ExtensibleMessage) message).addChunk(UTOBcastChunk.DEFAULT_NAME, 125 UTOBcastChunk.TYPE, utobcastChunk); 126 } 127 catch (ChunkAlreadyExistsException e2) 128 { 129 Error.bug(logger, e2); 131 } 132 } 133 else 134 { 135 Error.error("Message is not extensible: unable to add UTOBcast chunk", 137 logger); 138 } 139 } 140 141 utobcastChunk.setUTOBcastMessageType(UTOBcastChunk.DAT); 143 utobcastChunk.setProcessFrom(myProcess); 144 145 Message returnMessage = null; 146 147 while (returnMessage == null) 148 { 149 Process leader; 151 try 152 { 153 leader = processMembershipItf.getLeader(); 154 } 155 catch (InterruptedException e1) 156 { 157 throw new InterruptedPushException(e1); 158 } 159 utobcastChunk.setProcessTo(leader); 160 try 161 { 162 if (logger.isLoggable(BasicLevel.DEBUG)) 164 { 165 logger.log(BasicLevel.DEBUG, "Send DAT message " + message 166 + " to leader"); 167 } 168 returnMessage = outPushWithReturnItf.pushWithReturn(message, null); 169 if (returnMessage.getChunk(CrashedLeaderChunk.DEFAULT_NAME) != null) 170 { 171 if (logger.isLoggable(BasicLevel.DEBUG)) 174 { 175 logger.log(BasicLevel.DEBUG, 176 "Received AllKey -> the leader has crahsed"); 177 } 178 returnMessage = null; 179 continue; 180 } 181 else 182 { 183 if (logger.isLoggable(BasicLevel.DEBUG)) 185 { 186 logger.log(BasicLevel.DEBUG, "Received message to be delivered " 187 + returnMessage); 188 } 189 outPushItf.push(returnMessage, null); 190 } 191 } 192 catch (IOPushException e) 193 { 194 processMembershipItf.electBackupAsLeader(leader); 196 } 197 } 198 } 199 200 204 void configure() throws InterruptedException 205 { 206 myProcess = processMembershipItf.getMyself(); 207 configured = true; 208 } 209 210 214 218 public void bindFc(String clientItfName, Object serverItf) 219 throws NoSuchInterfaceException, IllegalBindingException, 220 IllegalLifeCycleException 221 { 222 super.bindFc(clientItfName, serverItf); 223 if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME)) 224 { 225 outPushItf = (Push) serverItf; 226 } 227 else if (clientItfName.equals(PushWithReturn.OUT_PUSH_WITH_RETURN_ITF_NAME)) 228 { 229 outPushWithReturnItf = (PushWithReturn) serverItf; 230 } 231 else if (clientItfName.equals(ProcessMembership.ITF_NAME)) 232 { 233 processMembershipItf = (ProcessMembership) serverItf; 234 } 235 else if (clientItfName.equals(MessageManager.ITF_NAME)) 236 { 237 messageManagerItf = (MessageManager) serverItf; 238 } 239 } 240 241 244 public String [] listFc() 245 { 246 return new String []{Push.OUT_PUSH_ITF_NAME, 247 PushWithReturn.OUT_PUSH_ITF_NAME, ProcessMembership.ITF_NAME, 248 MessageManager.ITF_NAME}; 249 } 250 251 } | Popular Tags |