KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > mux > MuxChannel


1 package org.jgroups.mux;
2
3 import org.jgroups.*;
4 import org.jgroups.util.Util;
5 import org.jgroups.stack.ProtocolStack;
6
7 import java.io.Serializable JavaDoc;
8 import java.util.Map JavaDoc;
9
10 /**
11  * Multiplexer channel. This is returned as result of calling
12  * {@link org.jgroups.ChannelFactory#createMultiplexerChannel(String,String,boolean,String)}. Maintains the multiplexer
13  * ID, which is used to add a header to each message, so that the message can be demultiplexed at the receiver
14  * @author Bela Ban
15  * @version $Id: MuxChannel.java,v 1.34 2007/06/26 17:38:26 vlada Exp $
16  */

17 public class MuxChannel extends JChannel {
18
19     /** the real channel to delegate to */
20     final JChannel ch;
21
22     /** The service ID */
23     final String JavaDoc id;
24
25     /** a reference back to the factory that created us */
26     final JChannelFactory factory;
27
28     /** The name of the JGroups stack, e.g. as defined in stacks.xml */
29     final String JavaDoc stack_name;
30
31     /** will be added to each message sent */
32     final MuxHeader hdr;
33
34     static final String JavaDoc name="MUX";
35     final Multiplexer mux;
36
37
38     public MuxChannel(JChannelFactory f, JChannel ch, String JavaDoc id, String JavaDoc stack_name, Multiplexer mux) {
39         super(false); // don't create protocol stack, queues and threads
40
factory=f;
41         this.ch=ch;
42         this.stack_name=stack_name;
43         this.id=id;
44         hdr=new MuxHeader(id);
45         this.mux=mux;
46         closed=!ch.isOpen();
47         // connected=ch.isConnected();
48
}
49
50     public String JavaDoc getStackName() {return stack_name;}
51
52     public String JavaDoc getId() {return id;}
53
54     public Multiplexer getMultiplexer() {return mux;}
55
56     public String JavaDoc getChannelName() {
57         return ch.getClusterName();
58     }
59
60     public String JavaDoc getClusterName() {
61         return ch.getClusterName();
62     }
63
64     public Address getLocalAddress() {
65         return ch != null? ch.getLocalAddress() : null;
66     }
67
68     /** This should never be used (just for testing) ! */
69     public JChannel getChannel() {
70         return ch;
71     }
72
73
74     /**
75      * Returns the <em>service</em> view, ie. the cluster view (see {@link #getView()}) <em>minus</em> the nodes on
76      * which this service is not running, e.g. if S1 runs on A and C, and the cluster view is {A,B,C}, then the service
77      * view is {A,C}
78      * @return The service view (list of nodes on which this service is running)
79      */

80     public View getView() {
81         return closed || !connected ? null : mux.getServiceView(id);
82     }
83
84     /** Returns the JGroups view of a cluster, e.g. if we have nodes A, B and C, then the view will
85      * be {A,B,C}
86      * @return The JGroups view
87      */

88     public View getClusterView() {
89         return ch != null? ch.getView() : null;
90     }
91
92     public ProtocolStack getProtocolStack() {
93         return ch != null? ch.getProtocolStack() : null;
94     }
95
96     public boolean isOpen() {
97         return !closed;
98     }
99
100     public boolean isConnected() {
101         return connected;
102     }
103
104     public Map JavaDoc dumpStats() {
105         return ch.dumpStats();
106     }
107
108
109     public void setClosed(boolean f) {
110         closed=f;
111     }
112
113     public void setConnected(boolean f) {
114         connected=f;
115     }
116
117     public Object JavaDoc getOpt(int option) {
118         return ch.getOpt(option);
119     }
120
121     public void setOpt(int option, Object JavaDoc value) {
122         ch.setOpt(option, value);
123         super.setOpt(option, value);
124     }
125
126     public synchronized void connect(String JavaDoc channel_name) throws ChannelException, ChannelClosedException {
127         /*make sure the channel is not closed*/
128         checkClosed();
129
130         /*if we already are connected, then ignore this*/
131         if(connected) {
132             if(log.isTraceEnabled()) log.trace("already connected to " + channel_name);
133             return;
134         }
135         
136         factory.connect(this);
137         notifyChannelConnected(this);
138     }
139
140
141     public synchronized boolean connect(String JavaDoc cluster_name, Address target, String JavaDoc state_id, long timeout) throws ChannelException {
142         throw new UnsupportedOperationException JavaDoc("not yet implemented");
143     }
144
145     public synchronized void disconnect() {
146         try {
147             closed=false;
148             setConnected(false);
149             factory.disconnect(this);
150         }
151         catch(Throwable JavaDoc t) {
152             log.error("disconnecting channel failed", t);
153         }
154         closed=false;
155         setConnected(false);
156         notifyChannelDisconnected(this);
157     }
158
159
160
161     public synchronized void open() throws ChannelException {
162         factory.open(this);
163         closed=false;
164     }
165
166     public synchronized void close() {
167         try {
168             closed=true;
169             setConnected(false);
170             factory.close(this);
171         }
172         finally {
173             closed=true;
174             setConnected(false);
175             closeMessageQueue(true);
176         }
177
178         notifyChannelClosed(this);
179     }
180
181     protected void _close(boolean disconnect, boolean close_mq) {
182         super._close(disconnect, close_mq);
183         closed=!ch.isOpen();
184         setConnected(ch.isConnected());
185         notifyChannelClosed(this);
186     }
187
188     public synchronized void shutdown() {
189         try {
190             factory.shutdown(this);
191         }
192         finally {
193             closed=true;
194             setConnected(false);
195             closeMessageQueue(true);
196         }
197     }
198
199
200     public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException {
201         msg.putHeader(name, hdr);
202         ch.send(msg);
203     }
204
205     public void send(Address dst, Address src, Serializable JavaDoc obj) throws ChannelNotConnectedException, ChannelClosedException {
206         send(new Message(dst, src, obj));
207     }
208
209
210     public void down(Event evt) {
211         if(evt.getType() == Event.MSG) {
212             Message msg=(Message)evt.getArg();
213             msg.putHeader(name, hdr);
214         }
215         ch.down(evt);
216     }
217
218     public Object JavaDoc downcall(Event evt) {
219         if(evt.getType() == Event.MSG) {
220             Message msg=(Message)evt.getArg();
221             msg.putHeader(name, hdr);
222         }
223         return ch.downcall(evt);
224     }
225     
226
227     public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
228         return getState(target, null, timeout);
229     }
230
231     public boolean getState(Address target, String JavaDoc state_id, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
232         String JavaDoc my_id=id;
233
234         if(state_id != null)
235             my_id += "::" + state_id;
236
237         // we're usig service views, so we need to find the first host in the cluster on which our service runs
238
// http://jira.jboss.com/jira/browse/JGRP-247
239
//
240
// unless service runs on a specified target node
241
// http://jira.jboss.com/jira/browse/JGRP-401
242
Address service_view_coordinator=mux.getStateProvider(target,id);
243         Address tmp=getLocalAddress();
244
245         if(service_view_coordinator != null)
246             target=service_view_coordinator;
247
248         if(tmp != null && tmp.equals(target)) // this will avoid the "cannot get state from myself" error
249
target=null;
250
251         if(!mux.stateTransferListenersPresent())
252             return ch.getState(target, my_id, timeout);
253         else {
254             return mux.getState(target, my_id, timeout);
255         }
256     }
257
258     public void returnState(byte[] state) {
259         ch.returnState(state, id);
260     }
261
262     public void returnState(byte[] state, String JavaDoc state_id) {
263         String JavaDoc my_id=id;
264         if(state_id != null)
265             my_id+="::" + state_id;
266         ch.returnState(state, my_id);
267     }
268 }
269
Popular Tags