1 19 package org.fjank.jcache.distribution; 20 21 import java.util.Enumeration ; 22 import org.fjank.jcache.CacheImpl; 23 import org.jgroups.Address; 24 import org.jgroups.ChannelClosedException; 25 import org.jgroups.ChannelException; 26 import org.jgroups.ChannelNotConnectedException; 27 import org.jgroups.JChannel; 28 import org.jgroups.Membership; 29 import org.jgroups.MembershipListener; 30 import org.jgroups.Message; 31 import org.jgroups.MessageListener; 32 import org.jgroups.View; 33 import org.jgroups.blocks.PullPushAdapter; 34 35 public class JGroupsDistributionEngine 36 extends DistributionEngine 37 implements MessageListener, MembershipListener 38 { 39 private static JGroupsDistributionEngine _singleton; 40 41 public static synchronized JGroupsDistributionEngine instanceOf(CacheImpl cache) 42 { 43 if (_singleton == null) 44 { 45 _singleton = new JGroupsDistributionEngine(cache); 46 } 47 return _singleton; 48 } 49 50 private JChannel channel; 51 52 private Membership members = new Membership(); 53 54 55 private JGroupsDistributionEngine(CacheImpl cache) 56 57 { 58 this.cache = cache; 59 if (cache.getAttributes().isDistributed()) 60 { 61 try 62 { 63 channel = new JChannel(null); 64 channel.connect("FKacheOS"); 65 new PullPushAdapter(channel, this, this); 66 } 67 catch (ChannelException e) 68 { 69 throw new IllegalStateException (e.getMessage()); 70 } 71 } 72 73 } 74 75 public void block() 76 { 77 } 78 79 82 public Enumeration getCacheAddr() 83 { 84 return members.getMembers().elements(); 85 } 86 87 protected JChannel getChannel() 88 { 89 return channel; 90 } 91 92 95 public byte[] getState() 96 { 97 return new byte[0]; 98 } 99 100 public void receive(Message msg) 101 { 102 if (msg.getObject() instanceof ClusterNotification 104 && !msg.getSrc().equals(channel.getLocalAddress())) 105 { 106 ClusterNotification clusterNotification = 107 (ClusterNotification) msg.getObject(); 108 handleClusterNotification(clusterNotification); 109 } 110 } 111 112 public void sendNotification(ClusterNotification clusterNotification) 113 114 { 115 if (cache.getAttributes().isDistributed()) 116 { 117 Message message = new Message(); 118 message.setObject(clusterNotification); 119 try 120 { 121 channel.send(message); 122 } 123 catch (ChannelNotConnectedException e) 124 { 125 throw new IllegalStateException (e.getMessage()); 126 } 127 catch (ChannelClosedException e) 128 { 129 throw new IllegalStateException (e.getMessage()); 130 } 131 } 132 } 133 134 137 public void setState(byte[] state) 138 { 139 140 } 141 142 public void suspect(Address suspected_mbr) 143 { 144 } 145 146 149 public void viewAccepted(View new_view) 150 { 151 members.add(new_view.getMembers()); 152 } 153 } 154 | Popular Tags |