1 19 20 package org.apache.cayenne.event; 21 22 import java.io.Serializable ; 23 import java.util.Collection ; 24 25 import org.jgroups.Channel; 26 import org.jgroups.JChannel; 27 import org.jgroups.Message; 28 import org.jgroups.MessageListener; 29 import org.jgroups.blocks.PullPushAdapter; 30 31 38 public class JavaGroupsBridge extends EventBridge implements MessageListener { 39 40 protected byte[] state; 42 43 protected Channel channel; 44 protected PullPushAdapter adapter; 45 protected String multicastAddress; 46 protected String multicastPort; 47 protected String configURL; 48 49 52 public JavaGroupsBridge(EventSubject localSubject, String externalSubject) { 53 super(localSubject, externalSubject); 54 } 55 56 59 public JavaGroupsBridge(Collection localSubjects, String externalSubject) { 60 super(localSubjects, externalSubject); 61 } 62 63 public String getConfigURL() { 64 return configURL; 65 } 66 67 public void setConfigURL(String configURL) { 68 this.configURL = configURL; 69 } 70 71 public String getMulticastAddress() { 72 return multicastAddress; 73 } 74 75 public void setMulticastAddress(String multicastAddress) { 76 this.multicastAddress = multicastAddress; 77 } 78 79 public String getMulticastPort() { 80 return multicastPort; 81 } 82 83 public void setMulticastPort(String multicastPort) { 84 this.multicastPort = multicastPort; 85 } 86 87 public byte[] getState() { 88 return state; 89 } 90 91 public void setState(byte[] state) { 92 this.state = state; 93 } 94 95 99 public void receive(Message message) { 100 try { 101 CayenneEvent event = messageObjectToEvent((Serializable ) message.getObject()); 102 if (event != null) { 103 104 onExternalEvent(event); 105 } 106 } 107 catch (Exception ex) { 108 } 111 } 112 113 protected void startupExternal() throws Exception { 114 118 if (configURL != null) { 121 channel = new JChannel(configURL); 122 } 123 else { 124 String configString = buildConfigString(); 125 channel = new JChannel(configString); 126 } 127 128 channel.setOpt(Channel.LOCAL, Boolean.FALSE); 130 channel.connect(externalSubject); 131 132 if (receivesExternalEvents()) { 133 adapter = new PullPushAdapter(channel, this); 134 } 135 } 136 137 141 protected String buildConfigString() { 142 if (multicastAddress == null) { 143 throw new IllegalStateException ("'multcastAddress' is not set"); 144 } 145 146 if (multicastPort == null) { 147 throw new IllegalStateException ("'multcastPort' is not set"); 148 } 149 150 return "UDP(mcast_addr=" 151 + multicastAddress 152 + ";mcast_port=" 153 + multicastPort 154 + ";ip_ttl=32):" 155 + "PING(timeout=3000;num_initial_members=6):" 156 + "FD(timeout=3000):" 157 + "VERIFY_SUSPECT(timeout=1500):" 158 + "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):" 159 + "pbcast.STABLE(desired_avg_gossip=10000):" 160 + "FRAG:" 161 + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" 162 + "shun=true;print_local_addr=false)"; 163 } 164 165 protected void shutdownExternal() throws Exception { 166 try { 167 if (adapter != null) { 168 adapter.stop(); 169 } 170 171 channel.close(); 172 } 173 finally { 174 adapter = null; 175 channel = null; 176 } 177 } 178 179 protected void sendExternalEvent(CayenneEvent localEvent) throws Exception { 180 Message message = new Message(null, null, eventToMessageObject(localEvent)); 181 channel.send(message); 182 } 183 184 189 protected Serializable eventToMessageObject(CayenneEvent event) throws Exception { 190 return event; 191 } 192 193 198 protected CayenneEvent messageObjectToEvent(Serializable object) throws Exception { 199 return (object instanceof CayenneEvent) ? (CayenneEvent) object : null; 200 } 201 } 202 | Popular Tags |