KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > protocol > atomicity > AtomicReactorImpl


1 /**
2  * Dream
3  * Copyright (C) 2003-2004 INRIA Rhone-Alpes
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact : dream@objectweb.org
20  *
21  * Initial developer(s): Matthieu Leclercq
22  * Contributor(s):
23  */

24
25 package org.objectweb.dream.protocol.atomicity;
26
27 import java.util.Map JavaDoc;
28
29 import org.objectweb.dream.AbstractComponent;
30 import org.objectweb.dream.Pull;
31 import org.objectweb.dream.PullException;
32 import org.objectweb.dream.Push;
33 import org.objectweb.dream.PushException;
34 import org.objectweb.dream.message.Message;
35 import org.objectweb.dream.message.manager.MessageManager;
36 import org.objectweb.fractal.api.NoSuchInterfaceException;
37 import org.objectweb.fractal.api.control.IllegalBindingException;
38 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
39 import org.objectweb.util.monolog.api.BasicLevel;
40
41 /**
42  * Transient impelmentation of the Reactor component. This implementation
43  * guarantees that notifications emited by the reacting agent are effectuively
44  * sent if the reaction doesn't throw an exception.
45  */

46 public class AtomicReactorImpl extends AbstractComponent implements Push
47 {
48
49   /**
50    * The name of the client interface used to pass message received on the
51    * incoming-in-push interface.
52    */

53   public static final String JavaDoc INCOMING_OUT_PUSH_ITF_NAME = "incoming-out-push";
54
55   /**
56    * The name of the client interface used to send message pulled from the
57    * waiting list.
58    */

59   public static final String JavaDoc OUTGOING_OUT_PUSH_ITF_NAME = "outgoing-out-push";
60
61   /**
62    * The name of the client interface used to retrieve notifications emitted
63    * during reaction. <B>This interface MUST be in non blocking mode. </B>
64    *
65    * @see org.objectweb.dream.queue.PullQueueAttributeController
66    */

67   public static final String JavaDoc WAITING_PULL_ITF_NAME = "waitingPull";
68
69   private Push incomingOutPushItf;
70   private Push outgoingOutPushItf;
71   private Pull waitingPullItf;
72   private SetReactorThread setReactorThreadItf;
73   private MessageManager messageManagerItf;
74
75   /** The previous thread that was executing a reaction. */
76   private Thread JavaDoc previousReactorThread = null;
77
78   /**
79    * @see Push#push(Message, Map)
80    */

81   public synchronized void push(Message message, Map JavaDoc context)
82       throws PushException
83   {
84
85     Thread JavaDoc currentThread = Thread.currentThread();
86     if (currentThread != previousReactorThread)
87     {
88       previousReactorThread = currentThread;
89       setReactorThreadItf.setReactorThread(currentThread);
90     }
91
92     try
93     {
94       try
95       {
96         if (logger.isLoggable(BasicLevel.DEBUG))
97         {
98           logger.log(BasicLevel.DEBUG, "Forward incoming message : " + message);
99         }
100         incomingOutPushItf.push(message, context);
101       }
102       catch (PushException e)
103       {
104         logger.log(BasicLevel.INFO,
105             "Exception catched during reaction, emited messages are dropped. (message="
106                 + message + ")", e.getCause());
107
108         Message msg = waitingPullItf.pull(null);
109         while (msg != null)
110         {
111           messageManagerItf.deleteMessage(msg);
112           msg = waitingPullItf.pull(null);
113         }
114       }
115
116       Message msg = waitingPullItf.pull(null);
117       while (msg != null)
118       {
119         if (logger.isLoggable(BasicLevel.DEBUG))
120         {
121           logger.log(BasicLevel.DEBUG, "Push outgoing message : " + message);
122         }
123         outgoingOutPushItf.push(msg, null);
124         msg = waitingPullItf.pull(null);
125       }
126     }
127     catch (PullException e)
128     {
129       throw new PushException("An error occurs while pulling waiting messages",
130           e);
131     }
132   }
133
134   // ---------------------------------------------------------------------------
135
// Implementation of BindingController interface
136
// ---------------------------------------------------------------------------
137

138   /**
139    * @see org.objectweb.fractal.api.control.BindingController#listFc()
140    */

141   public String JavaDoc[] listFc()
142   {
143     return new String JavaDoc[]{OUTGOING_OUT_PUSH_ITF_NAME, INCOMING_OUT_PUSH_ITF_NAME,
144         WAITING_PULL_ITF_NAME, SetReactorThread.SET_REACTOR_THREAD_ITF_NAME,
145         MessageManager.ITF_NAME};
146   }
147
148   /**
149    * @see org.objectweb.fractal.api.control.BindingController#bindFc(String,
150    * Object)
151    */

152   public synchronized void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
153       throws NoSuchInterfaceException, IllegalBindingException,
154       IllegalLifeCycleException
155   {
156     super.bindFc(clientItfName, serverItf);
157     if (clientItfName.equals(OUTGOING_OUT_PUSH_ITF_NAME))
158     {
159       outgoingOutPushItf = (Push) serverItf;
160     }
161     else if (clientItfName.equals(INCOMING_OUT_PUSH_ITF_NAME))
162     {
163       incomingOutPushItf = (Push) serverItf;
164     }
165     else if (clientItfName.equals(WAITING_PULL_ITF_NAME))
166     {
167       waitingPullItf = (Pull) serverItf;
168     }
169     else if (clientItfName.equals(SetReactorThread.SET_REACTOR_THREAD_ITF_NAME))
170     {
171       setReactorThreadItf = (SetReactorThread) serverItf;
172     }
173     else if (clientItfName.equals(MessageManager.ITF_NAME))
174     {
175       messageManagerItf = (MessageManager) serverItf;
176     }
177   }
178
179 }
Popular Tags