1 package org.jgroups.mux; 2 3 import org.jgroups.*; 4 import org.jgroups.util.Util; 5 import org.jgroups.stack.ProtocolStack; 6 7 import java.io.Serializable ; 8 import java.util.Map ; 9 10 17 public class MuxChannel extends JChannel { 18 19 20 final JChannel ch; 21 22 23 final String id; 24 25 26 final JChannelFactory factory; 27 28 29 final String stack_name; 30 31 32 final MuxHeader hdr; 33 34 static final String name="MUX"; 35 final Multiplexer mux; 36 37 38 public MuxChannel(JChannelFactory f, JChannel ch, String id, String stack_name, Multiplexer mux) { 39 super(false); factory=f; 41 this.ch=ch; 42 this.stack_name=stack_name; 43 this.id=id; 44 hdr=new MuxHeader(id); 45 this.mux=mux; 46 closed=!ch.isOpen(); 47 } 49 50 public String getStackName() {return stack_name;} 51 52 public String getId() {return id;} 53 54 public Multiplexer getMultiplexer() {return mux;} 55 56 public String getChannelName() { 57 return ch.getClusterName(); 58 } 59 60 public String getClusterName() { 61 return ch.getClusterName(); 62 } 63 64 public Address getLocalAddress() { 65 return ch != null? ch.getLocalAddress() : null; 66 } 67 68 69 public JChannel getChannel() { 70 return ch; 71 } 72 73 74 80 public View getView() { 81 return closed || !connected ? null : mux.getServiceView(id); 82 } 83 84 88 public View getClusterView() { 89 return ch != null? ch.getView() : null; 90 } 91 92 public ProtocolStack getProtocolStack() { 93 return ch != null? ch.getProtocolStack() : null; 94 } 95 96 public boolean isOpen() { 97 return !closed; 98 } 99 100 public boolean isConnected() { 101 return connected; 102 } 103 104 public Map dumpStats() { 105 return ch.dumpStats(); 106 } 107 108 109 public void setClosed(boolean f) { 110 closed=f; 111 } 112 113 public void setConnected(boolean f) { 114 connected=f; 115 } 116 117 public Object getOpt(int option) { 118 return ch.getOpt(option); 119 } 120 121 public void setOpt(int option, Object value) { 122 ch.setOpt(option, value); 123 super.setOpt(option, value); 124 } 125 126 public synchronized void connect(String channel_name) throws ChannelException, ChannelClosedException { 127 128 checkClosed(); 129 130 131 if(connected) { 132 if(log.isTraceEnabled()) log.trace("already connected to " + channel_name); 133 return; 134 } 135 136 factory.connect(this); 137 notifyChannelConnected(this); 138 } 139 140 141 public synchronized boolean connect(String cluster_name, Address target, String state_id, long timeout) throws ChannelException { 142 throw new UnsupportedOperationException ("not yet implemented"); 143 } 144 145 public synchronized void disconnect() { 146 try { 147 closed=false; 148 setConnected(false); 149 factory.disconnect(this); 150 } 151 catch(Throwable t) { 152 log.error("disconnecting channel failed", t); 153 } 154 closed=false; 155 setConnected(false); 156 notifyChannelDisconnected(this); 157 } 158 159 160 161 public synchronized void open() throws ChannelException { 162 factory.open(this); 163 closed=false; 164 } 165 166 public synchronized void close() { 167 try { 168 closed=true; 169 setConnected(false); 170 factory.close(this); 171 } 172 finally { 173 closed=true; 174 setConnected(false); 175 closeMessageQueue(true); 176 } 177 178 notifyChannelClosed(this); 179 } 180 181 protected void _close(boolean disconnect, boolean close_mq) { 182 super._close(disconnect, close_mq); 183 closed=!ch.isOpen(); 184 setConnected(ch.isConnected()); 185 notifyChannelClosed(this); 186 } 187 188 public synchronized void shutdown() { 189 try { 190 factory.shutdown(this); 191 } 192 finally { 193 closed=true; 194 setConnected(false); 195 closeMessageQueue(true); 196 } 197 } 198 199 200 public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException { 201 msg.putHeader(name, hdr); 202 ch.send(msg); 203 } 204 205 public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException { 206 send(new Message(dst, src, obj)); 207 } 208 209 210 public void down(Event evt) { 211 if(evt.getType() == Event.MSG) { 212 Message msg=(Message)evt.getArg(); 213 msg.putHeader(name, hdr); 214 } 215 ch.down(evt); 216 } 217 218 public Object downcall(Event evt) { 219 if(evt.getType() == Event.MSG) { 220 Message msg=(Message)evt.getArg(); 221 msg.putHeader(name, hdr); 222 } 223 return ch.downcall(evt); 224 } 225 226 227 public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException { 228 return getState(target, null, timeout); 229 } 230 231 public boolean getState(Address target, String state_id, long timeout) throws ChannelNotConnectedException, ChannelClosedException { 232 String my_id=id; 233 234 if(state_id != null) 235 my_id += "::" + state_id; 236 237 Address service_view_coordinator=mux.getStateProvider(target,id); 243 Address tmp=getLocalAddress(); 244 245 if(service_view_coordinator != null) 246 target=service_view_coordinator; 247 248 if(tmp != null && tmp.equals(target)) target=null; 250 251 if(!mux.stateTransferListenersPresent()) 252 return ch.getState(target, my_id, timeout); 253 else { 254 return mux.getState(target, my_id, timeout); 255 } 256 } 257 258 public void returnState(byte[] state) { 259 ch.returnState(state, id); 260 } 261 262 public void returnState(byte[] state, String state_id) { 263 String my_id=id; 264 if(state_id != null) 265 my_id+="::" + state_id; 266 ch.returnState(state, my_id); 267 } 268 } 269 | Popular Tags |