1 24 25 package org.objectweb.dream.protocol.utobcast; 26 27 import java.util.Map ; 28 29 import org.objectweb.dream.AbstractComponent; 30 import org.objectweb.dream.IOPushException; 31 import org.objectweb.dream.InterruptedPushException; 32 import org.objectweb.dream.Push; 33 import org.objectweb.dream.PushException; 34 import org.objectweb.dream.message.Message; 35 import org.objectweb.dream.message.manager.MessageManager; 36 import org.objectweb.dream.protocol.utobcast.message.UTOBcastChunk; 37 import org.objectweb.fractal.api.NoSuchInterfaceException; 38 import org.objectweb.fractal.api.control.IllegalBindingException; 39 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 40 import org.objectweb.util.monolog.api.BasicLevel; 41 42 49 public class DATImpl extends AbstractComponent implements Push 50 { 51 52 56 60 public static final String PENDING_MESSAGES_IN_ITF_NAME = "pending-messages-in"; 61 62 63 protected Push pendingMessagesInItf; 64 65 66 protected Push outPushItf; 67 68 69 protected ProcessMembership processMembershipItf; 70 71 72 protected SequenceNumber sequenceNumberItf; 73 74 75 protected BackupElection backupElectionItf; 76 77 78 protected MessageManager messageManagerItf; 79 80 final Object lock = new Object (); 81 82 86 89 public DATImpl() 90 { 91 } 92 93 97 101 public void push(Message message, Map context) throws PushException 102 { 103 UTOBcastChunk chunk = (UTOBcastChunk) message 104 .getChunk(UTOBcastChunk.DEFAULT_NAME); 105 synchronized (lock) 106 { 107 if (logger.isLoggable(BasicLevel.DEBUG)) 108 { 109 logger.log(BasicLevel.DEBUG, "Received DAT message " + message); 110 } 111 chunk.setSequenceNumber(sequenceNumberItf.getSequenceNumber()); 113 if (logger.isLoggable(BasicLevel.DEBUG)) 115 { 116 logger.log(BasicLevel.DEBUG, "Add message " + message 117 + " to pending messages"); 118 } 119 messageManagerItf.duplicateMessage(message, false); 120 pendingMessagesInItf.push(message, null); 121 chunk.setUTOBcastMessageType(UTOBcastChunk.REP); 123 try 125 { 126 chunk.setProcessTo(processMembershipItf.getBackup()); 127 } 128 catch (InterruptedException e) 129 { 130 messageManagerItf.deleteMessage(message); 132 throw new InterruptedPushException(e); 133 } 134 try 136 { 137 if (logger.isLoggable(BasicLevel.DEBUG)) 138 { 139 logger.log(BasicLevel.DEBUG, "Send REP message " + message 140 + " to backup"); 141 } 142 outPushItf.push(message, null); 143 } 144 catch (InterruptedPushException e) 145 { 146 messageManagerItf.deleteMessage(message); 148 throw e; 149 } 150 catch (IOPushException e) 151 { 152 backupElectionItf.elect(); 154 return; 155 } 156 if (logger.isLoggable(BasicLevel.DEBUG)) 158 { 159 logger.log(BasicLevel.DEBUG, "Increment sequence number"); 160 } 161 sequenceNumberItf.incrementSequenceNumber(); 162 } 163 } 164 165 169 173 public void bindFc(String clientItfName, Object serverItf) 174 throws NoSuchInterfaceException, IllegalBindingException, 175 IllegalLifeCycleException 176 { 177 super.bindFc(clientItfName, serverItf); 178 if (clientItfName.equals(PENDING_MESSAGES_IN_ITF_NAME)) 179 { 180 pendingMessagesInItf = (Push) serverItf; 181 } 182 else if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME)) 183 { 184 outPushItf = (Push) serverItf; 185 } 186 else if (clientItfName.equals(SequenceNumber.ITF_NAME)) 187 { 188 sequenceNumberItf = (SequenceNumber) serverItf; 189 } 190 else if (clientItfName.equals(ProcessMembership.ITF_NAME)) 191 { 192 processMembershipItf = (ProcessMembership) serverItf; 193 } 194 else if (clientItfName.equals(BackupElection.ITF_NAME)) 195 { 196 backupElectionItf = (BackupElection) serverItf; 197 } 198 else if (clientItfName.equals(MessageManager.ITF_NAME)) 199 { 200 messageManagerItf = (MessageManager) serverItf; 201 } 202 203 } 204 205 208 public String [] listFc() 209 { 210 return new String []{PENDING_MESSAGES_IN_ITF_NAME, SequenceNumber.ITF_NAME, 211 Push.OUT_PUSH_ITF_NAME, ProcessMembership.ITF_NAME, 212 BackupElection.ITF_NAME, MessageManager.ITF_NAME}; 213 } 214 215 } | Popular Tags |