1 3 package org.jgroups.blocks; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.*; 9 import org.jgroups.util.Util; 10 11 import java.io.IOException ; 12 import java.io.ObjectInput ; 13 import java.io.ObjectOutput ; 14 import java.io.Serializable ; 15 import java.util.ArrayList ; 16 import java.util.HashMap ; 17 import java.util.Iterator ; 18 import java.util.List ; 19 20 21 36 public class PullPushAdapter implements Runnable , ChannelListener { 37 protected Transport transport=null; 38 protected MessageListener listener=null; protected final List membership_listeners=new ArrayList (); 40 protected Thread receiver_thread=null; 41 protected final HashMap listeners=new HashMap (); protected final Log log=LogFactory.getLog(getClass()); 43 static final String PULL_HEADER="PULL_HEADER"; 44 45 46 public PullPushAdapter(Transport transport) { 47 this.transport=transport; 48 start(); 49 } 50 51 public PullPushAdapter(Transport transport, MessageListener l) { 52 this.transport=transport; 53 setListener(l); 54 start(); 55 } 56 57 58 public PullPushAdapter(Transport transport, MembershipListener ml) { 59 this.transport=transport; 60 addMembershipListener(ml); 61 start(); 62 } 63 64 65 public PullPushAdapter(Transport transport, MessageListener l, MembershipListener ml) { 66 this.transport=transport; 67 setListener(l); 68 addMembershipListener(ml); 69 start(); 70 } 71 72 73 public Transport getTransport() { 74 return transport; 75 } 76 77 78 public void start() { 79 if(receiver_thread == null || !receiver_thread.isAlive()) { 80 receiver_thread=new Thread (this, "PullPushAdapterThread"); 81 receiver_thread.setDaemon(true); 82 receiver_thread.start(); 83 } 84 if(transport instanceof JChannel) 85 ((JChannel)transport).setChannelListener(this); 86 } 87 88 public void stop() { 89 Thread tmp=null; 90 if(receiver_thread != null && receiver_thread.isAlive()) { 91 tmp=receiver_thread; 92 receiver_thread=null; 93 tmp.interrupt(); 94 try { 95 tmp.join(1000); 96 } 97 catch(Exception ex) { 98 } 99 } 100 receiver_thread=null; 101 } 102 103 109 public void send(Serializable identifier, Message msg) throws Exception { 110 if(msg == null) { 111 if(log.isErrorEnabled()) log.error("msg is null"); 112 return; 113 } 114 if(identifier == null) 115 transport.send(msg); 116 else { 117 msg.putHeader(PULL_HEADER, new PullHeader(identifier)); 118 transport.send(msg); 119 } 120 } 121 122 127 public void send(Message msg) throws Exception { 128 send(null, msg); 129 } 130 131 132 public void setListener(MessageListener l) { 133 listener=l; 134 } 135 136 137 138 144 public void registerListener(Serializable identifier, MessageListener l) { 145 if(l == null || identifier == null) { 146 if(log.isErrorEnabled()) log.error("message listener or identifier is null"); 147 return; 148 } 149 if(listeners.containsKey(identifier)) { 150 if(log.isErrorEnabled()) log.error("listener with identifier=" + identifier + 151 " already exists, choose a different identifier or unregister current listener"); 152 return; 154 } 155 listeners.put(identifier, l); 156 } 157 158 162 public void unregisterListener(Serializable identifier) { 163 listeners.remove(identifier); 164 } 165 166 167 168 public void setMembershipListener(MembershipListener ml) { 169 addMembershipListener(ml); 170 } 171 172 public void addMembershipListener(MembershipListener l) { 173 if(l != null && !membership_listeners.contains(l)) 174 membership_listeners.add(l); 175 } 176 177 public void removeMembershipListener(MembershipListener l) { 178 if(l != null && membership_listeners.contains(l)) 179 membership_listeners.remove(l); 180 } 181 182 183 187 public void run() { 188 Object obj; 189 190 while(receiver_thread != null && Thread.currentThread().equals(receiver_thread)) { 191 try { 192 obj=transport.receive(0); 193 if(obj == null) 194 continue; 195 196 if(obj instanceof Message) { 197 handleMessage((Message)obj); 198 } 199 else if(obj instanceof GetStateEvent) { 200 byte[] retval=null; 201 if(listener != null) { 202 try { 203 retval=listener.getState(); 204 } 205 catch(Throwable t) { 206 log.error("getState() from application failed, will return empty state", t); 207 } 208 } 209 else { 210 log.warn("no listener registered, returning empty state"); 211 } 212 213 if(transport instanceof Channel) { 214 ((Channel)transport).returnState(retval); 215 } 216 else { 217 if(log.isErrorEnabled()) 218 log.error("underlying transport is not a Channel, but a " + 219 transport.getClass().getName() + ": cannot return state using returnState()"); 220 continue; 221 } 222 } 223 else if(obj instanceof SetStateEvent) { 224 if(listener != null) { 225 try { 226 listener.setState(((SetStateEvent)obj).getArg()); 227 } 228 catch(ClassCastException cast_ex) { 229 if(log.isErrorEnabled()) log.error("received SetStateEvent, but argument " + 230 ((SetStateEvent)obj).getArg() + " is not serializable ! Discarding message."); 231 continue; 232 } 233 } 234 } 235 else if(obj instanceof View) { 236 notifyViewChange((View)obj); 237 } 238 else if(obj instanceof SuspectEvent) { 239 notifySuspect((Address)((SuspectEvent)obj).getMember()); 240 } 241 else if(obj instanceof BlockEvent) { 242 notifyBlock(); 243 } 244 } 245 catch(ChannelNotConnectedException conn) { 246 Address local_addr=((Channel)transport).getLocalAddress(); 247 if(log.isWarnEnabled()) log.warn('[' + (local_addr == null ? "<null>" : local_addr.toString()) + 248 "] channel not connected, exception is " + conn); 249 Util.sleep(1000); 250 receiver_thread=null; 251 break; 252 } 253 catch(ChannelClosedException closed_ex) { 254 Address local_addr=((Channel)transport).getLocalAddress(); 255 if(log.isWarnEnabled()) log.warn('[' + (local_addr == null ? "<null>" : local_addr.toString()) + 256 "] channel closed, exception is " + closed_ex); 257 receiver_thread=null; 259 break; 260 } 261 catch(Throwable e) { 262 } 263 } 264 } 265 266 267 272 protected void handleMessage(Message msg) { 273 PullHeader hdr=(PullHeader)msg.getHeader(PULL_HEADER); 274 Serializable identifier; 275 MessageListener l; 276 277 if(hdr != null && (identifier=hdr.getIdentifier()) != null) { 278 l=(MessageListener)listeners.get(identifier); 279 if(l == null) { 280 if(log.isErrorEnabled()) log.error("received a messages tagged with identifier=" + 281 identifier + ", but there is no registration for that identifier. Will drop message"); 282 } 283 else 284 l.receive(msg); 285 } 286 else { 287 if(listener != null) 288 listener.receive(msg); 289 } 290 } 291 292 293 protected void notifyViewChange(View v) { 294 MembershipListener l; 295 296 if(v == null) return; 297 for(Iterator it=membership_listeners.iterator(); it.hasNext();) { 298 l=(MembershipListener)it.next(); 299 try { 300 l.viewAccepted(v); 301 } 302 catch(Throwable ex) { 303 if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex); 304 } 305 } 306 } 307 308 protected void notifySuspect(Address suspected_mbr) { 309 MembershipListener l; 310 311 if(suspected_mbr == null) return; 312 for(Iterator it=membership_listeners.iterator(); it.hasNext();) { 313 l=(MembershipListener)it.next(); 314 try { 315 l.suspect(suspected_mbr); 316 } 317 catch(Throwable ex) { 318 if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex); 319 } 320 } 321 } 322 323 protected void notifyBlock() { 324 MembershipListener l; 325 326 for(Iterator it=membership_listeners.iterator(); it.hasNext();) { 327 l=(MembershipListener)it.next(); 328 try { 329 l.block(); 330 } 331 catch(Throwable ex) { 332 if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex); 333 } 334 } 335 } 336 337 public void channelConnected(Channel channel) { 338 if(log.isTraceEnabled()) 339 log.trace("channel is connected"); 340 } 341 342 public void channelDisconnected(Channel channel) { 343 if(log.isTraceEnabled()) 344 log.trace("channel is disconnected"); 345 } 346 347 public void channelClosed(Channel channel) { 348 } 349 350 public void channelShunned() { 351 if(log.isTraceEnabled()) 352 log.trace("channel is shunned"); 353 } 354 355 public void channelReconnected(Address addr) { 356 start(); 357 } 358 359 360 361 362 public static final class PullHeader extends Header { 363 Serializable identifier=null; 364 365 public PullHeader() { 366 ; } 368 369 public PullHeader(Serializable identifier) { 370 this.identifier=identifier; 371 } 372 373 public Serializable getIdentifier() { 374 return identifier; 375 } 376 377 public long size() { 378 if(identifier == null) 379 return 12; 380 else 381 return 64; 382 } 383 384 385 public String toString() { 386 return "PullHeader"; 387 } 388 389 public void writeExternal(ObjectOutput out) throws IOException { 390 out.writeObject(identifier); 391 } 392 393 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 394 identifier=(Serializable )in.readObject(); 395 } 396 } 397 398 399 402 public MessageListener getListener() { 403 return listener; 404 } 405 } 406 | Popular Tags |