1 package org.jgroups.service; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.*; 6 import org.jgroups.blocks.PullPushAdapter; 7 8 import java.util.LinkedList ; 9 10 20 public abstract class AbstractService implements MembershipListener { 21 22 protected final Channel serviceChannel; 23 24 protected final PullPushAdapter serviceAdapter; 25 26 protected final Channel clientChannel; 27 28 protected final LinkedList members = new LinkedList (); 29 30 protected boolean blocked; 31 32 protected final Object blockMonitor = new Object (); 37 38 protected final Object threadMonitor = new Object (); 41 protected boolean runThread; 42 43 protected final Log log=LogFactory.getLog(this.getClass()); 44 45 46 60 public AbstractService(Channel serviceChannel, Channel clientChannel) { 61 this.serviceChannel = serviceChannel; 62 this.serviceAdapter = new PullPushAdapter(serviceChannel, this); 63 this.clientChannel = clientChannel; 64 65 try { 66 serviceChannel.getState(null, 1000); 67 } 68 catch(Exception ex) { 69 if(log.isErrorEnabled()) log.error("exception fetching state: " + ex); 70 } 71 72 } 73 74 81 public abstract String getName(); 82 83 86 protected void setMessageListener(MessageListener listener) { 87 this.serviceAdapter.setListener(listener); 88 } 89 90 93 public Address getAddress() { 94 return serviceChannel.getLocalAddress(); 95 } 96 97 103 public boolean isCoordinator() { 104 return getAddress().equals(members.getFirst()); 105 } 106 107 111 public boolean isBlocked() { 112 return blocked; 113 } 114 115 120 public void waitOnBlocked() throws InterruptedException { 121 synchronized(blockMonitor) { 122 blockMonitor.wait(); 123 } 124 } 125 126 131 public void block() { 132 blocked = true; 133 } 134 135 139 public void suspect(Address suspectedMember) { 140 } 141 142 149 public void viewAccepted(View view) { 150 synchronized(members) { 151 members.clear(); 152 members.addAll(view.getMembers()); 153 } 154 155 synchronized(blockMonitor) { 157 blocked = false; 158 blockMonitor.notifyAll(); 159 } 160 } 161 162 166 public void start() { 167 runThread = true; 168 169 Runnable runnable = new Runnable () { 170 public void run() { 171 while(runThread) 172 synchronized(threadMonitor) { 173 try { 174 threadMonitor.wait(); 175 } catch(InterruptedException ex) { 176 } 177 } 178 } 179 }; 180 181 Thread thread = new Thread (runnable, getName() + " Thread [" + getAddress() + ']'); 182 thread.start(); 184 } 185 186 190 public void stop() { 191 if (!runThread) 192 return; 193 194 runThread = false; 195 196 synchronized(threadMonitor) { 197 threadMonitor.notifyAll(); 198 } 199 } 200 201 } 202 | Popular Tags |