1 24 25 package org.objectweb.dream.protocol.atomicity; 26 27 import java.util.Map ; 28 29 import org.objectweb.dream.AbstractComponent; 30 import org.objectweb.dream.Pull; 31 import org.objectweb.dream.PullException; 32 import org.objectweb.dream.Push; 33 import org.objectweb.dream.PushException; 34 import org.objectweb.dream.message.Message; 35 import org.objectweb.dream.message.manager.MessageManager; 36 import org.objectweb.fractal.api.NoSuchInterfaceException; 37 import org.objectweb.fractal.api.control.IllegalBindingException; 38 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 39 import org.objectweb.util.monolog.api.BasicLevel; 40 41 46 public class AtomicReactorImpl extends AbstractComponent implements Push 47 { 48 49 53 public static final String INCOMING_OUT_PUSH_ITF_NAME = "incoming-out-push"; 54 55 59 public static final String OUTGOING_OUT_PUSH_ITF_NAME = "outgoing-out-push"; 60 61 67 public static final String WAITING_PULL_ITF_NAME = "waitingPull"; 68 69 private Push incomingOutPushItf; 70 private Push outgoingOutPushItf; 71 private Pull waitingPullItf; 72 private SetReactorThread setReactorThreadItf; 73 private MessageManager messageManagerItf; 74 75 76 private Thread previousReactorThread = null; 77 78 81 public synchronized void push(Message message, Map context) 82 throws PushException 83 { 84 85 Thread currentThread = Thread.currentThread(); 86 if (currentThread != previousReactorThread) 87 { 88 previousReactorThread = currentThread; 89 setReactorThreadItf.setReactorThread(currentThread); 90 } 91 92 try 93 { 94 try 95 { 96 if (logger.isLoggable(BasicLevel.DEBUG)) 97 { 98 logger.log(BasicLevel.DEBUG, "Forward incoming message : " + message); 99 } 100 incomingOutPushItf.push(message, context); 101 } 102 catch (PushException e) 103 { 104 logger.log(BasicLevel.INFO, 105 "Exception catched during reaction, emited messages are dropped. (message=" 106 + message + ")", e.getCause()); 107 108 Message msg = waitingPullItf.pull(null); 109 while (msg != null) 110 { 111 messageManagerItf.deleteMessage(msg); 112 msg = waitingPullItf.pull(null); 113 } 114 } 115 116 Message msg = waitingPullItf.pull(null); 117 while (msg != null) 118 { 119 if (logger.isLoggable(BasicLevel.DEBUG)) 120 { 121 logger.log(BasicLevel.DEBUG, "Push outgoing message : " + message); 122 } 123 outgoingOutPushItf.push(msg, null); 124 msg = waitingPullItf.pull(null); 125 } 126 } 127 catch (PullException e) 128 { 129 throw new PushException("An error occurs while pulling waiting messages", 130 e); 131 } 132 } 133 134 138 141 public String [] listFc() 142 { 143 return new String []{OUTGOING_OUT_PUSH_ITF_NAME, INCOMING_OUT_PUSH_ITF_NAME, 144 WAITING_PULL_ITF_NAME, SetReactorThread.SET_REACTOR_THREAD_ITF_NAME, 145 MessageManager.ITF_NAME}; 146 } 147 148 152 public synchronized void bindFc(String clientItfName, Object serverItf) 153 throws NoSuchInterfaceException, IllegalBindingException, 154 IllegalLifeCycleException 155 { 156 super.bindFc(clientItfName, serverItf); 157 if (clientItfName.equals(OUTGOING_OUT_PUSH_ITF_NAME)) 158 { 159 outgoingOutPushItf = (Push) serverItf; 160 } 161 else if (clientItfName.equals(INCOMING_OUT_PUSH_ITF_NAME)) 162 { 163 incomingOutPushItf = (Push) serverItf; 164 } 165 else if (clientItfName.equals(WAITING_PULL_ITF_NAME)) 166 { 167 waitingPullItf = (Pull) serverItf; 168 } 169 else if (clientItfName.equals(SetReactorThread.SET_REACTOR_THREAD_ITF_NAME)) 170 { 171 setReactorThreadItf = (SetReactorThread) serverItf; 172 } 173 else if (clientItfName.equals(MessageManager.ITF_NAME)) 174 { 175 messageManagerItf = (MessageManager) serverItf; 176 } 177 } 178 179 } | Popular Tags |