1 21 package net.sf.hajdbc.distributable; 22 23 import java.io.Serializable ; 24 import java.util.Collection ; 25 import java.util.concurrent.locks.Lock ; 26 27 import javax.management.MBeanServer ; 28 import javax.management.ObjectName ; 29 30 import net.sf.hajdbc.Database; 31 import net.sf.hajdbc.DatabaseClusterFactory; 32 import net.sf.hajdbc.Messages; 33 import net.sf.hajdbc.SQLException; 34 import net.sf.hajdbc.local.LocalDatabaseCluster; 35 36 import org.jgroups.Address; 37 import org.jgroups.Channel; 38 import org.jgroups.JChannel; 39 import org.jgroups.blocks.NotificationBus; 40 import org.jgroups.jmx.JmxConfigurator; 41 import org.slf4j.Logger; 42 import org.slf4j.LoggerFactory; 43 44 51 public class DistributableDatabaseCluster extends LocalDatabaseCluster implements NotificationBus.Consumer 52 { 53 static Logger logger = LoggerFactory.getLogger(DistributableDatabaseCluster.class); 54 55 private NotificationBus notificationBus; 56 private DistributableLock lock; 57 private DistributableDatabaseClusterBuilder builder; 58 59 63 public DistributableDatabaseCluster(DistributableDatabaseClusterBuilder builder) 64 { 65 this.builder = builder; 66 } 67 68 71 @Override 72 public boolean deactivate(Database database) 73 { 74 boolean deactivated = super.deactivate(database); 75 76 if (deactivated) 77 { 78 this.notificationBus.sendNotification(new DatabaseDeactivationCommand(database)); 79 } 80 81 return deactivated; 82 } 83 84 87 @Override 88 public boolean activate(Database database) 89 { 90 boolean activated = super.activate(database); 91 92 if (activated) 93 { 94 this.notificationBus.sendNotification(new DatabaseActivationCommand(database)); 95 } 96 97 return activated; 98 } 99 100 103 public void handleNotification(Serializable command) 104 { 105 logger.info(Messages.getMessage(Messages.DATABASE_COMMAND_RECEIVED, command)); 106 107 DatabaseCommand.class.cast(command).execute(this); 108 } 109 110 113 public Serializable getCache() 114 { 115 Collection <String > databases = super.getActiveDatabases(); 116 117 return databases.toArray(new String [databases.size()]); 118 } 119 120 123 public void memberJoined(Address address) 124 { 125 String channel = this.notificationBus.getChannel().getChannelName(); 126 127 logger.info(Messages.getMessage(Messages.GROUP_MEMBER_JOINED, address, channel)); 128 } 129 130 133 public void memberLeft(Address address) 134 { 135 String channel = this.notificationBus.getChannel().getChannelName(); 136 137 logger.info(Messages.getMessage(Messages.GROUP_MEMBER_LEFT, address, channel)); 138 } 139 140 143 @Override 144 public String [] loadState() throws java.sql.SQLException 145 { 146 String [] state = String [].class.cast(this.notificationBus.getCacheFromCoordinator(this.builder.getTimeout(), 1)); 147 148 return (state != null) ? state : super.loadState(); 149 } 150 151 154 @Override 155 public synchronized void start() throws java.sql.SQLException 156 { 157 try 158 { 159 this.notificationBus = new NotificationBus(this.getId(), this.builder.getProtocol()); 160 this.notificationBus.setConsumer(this); 161 this.notificationBus.start(); 162 163 this.register(this.notificationBus.getChannel()); 164 165 this.lock = new DistributableLock(this.getId() + "-lock", this.builder.getProtocol(), this.builder.getTimeout(), super.writeLock()); 166 167 this.register(this.lock.getChannel()); 168 169 super.start(); 170 } 171 catch (Exception e) 172 { 173 throw new SQLException(e.toString(), e); 174 } 175 } 176 177 private void register(Channel channel) throws Exception 178 { 179 MBeanServer server = DatabaseClusterFactory.getMBeanServer(); 180 181 ObjectName name = this.getObjectName(channel); 182 183 if (!server.isRegistered(name)) 184 { 185 JmxConfigurator.registerChannel(JChannel.class.cast(channel), server, name.getCanonicalName(), true); 186 } 187 } 188 189 private ObjectName getObjectName(Channel channel) throws Exception 190 { 191 return ObjectName.getInstance("org.jgroups", "channel", ObjectName.quote(channel.getChannelName())); 192 } 193 194 197 @Override 198 public synchronized void stop() 199 { 200 if (this.notificationBus != null) 201 { 202 this.unregister(this.notificationBus.getChannel()); 203 204 this.notificationBus.stop(); 205 } 206 207 if (this.lock != null) 208 { 209 this.unregister(this.lock.getChannel()); 210 211 this.lock.stop(); 212 } 213 214 super.stop(); 215 } 216 217 private void unregister(Channel channel) 218 { 219 MBeanServer server = DatabaseClusterFactory.getMBeanServer(); 220 221 try 222 { 223 ObjectName name = this.getObjectName(channel); 224 225 if (server.isRegistered(name)) 226 { 227 JmxConfigurator.unregisterChannel(server, name); 228 JmxConfigurator.unregisterProtocols(server, JChannel.class.cast(channel), name.getCanonicalName()); 229 } 230 } 231 catch (Exception e) 232 { 233 logger.warn(e.getMessage(), e); 234 } 235 } 236 237 240 @Override 241 public Lock writeLock() 242 { 243 return this.lock; 244 } 245 } 246 | Popular Tags |