1 56 package org.objectstyle.cayenne.event; 57 58 import java.io.Serializable ; 59 60 import org.apache.log4j.Logger; 61 import org.jgroups.Channel; 62 import org.jgroups.JChannel; 63 import org.jgroups.Message; 64 import org.jgroups.MessageListener; 65 import org.jgroups.blocks.PullPushAdapter; 66 67 74 public class JavaGroupsBridge extends EventBridge implements MessageListener { 75 private static Logger logObj = Logger.getLogger(JavaGroupsBridge.class); 76 77 protected byte[] state; 79 80 protected Channel channel; 81 protected PullPushAdapter adapter; 82 protected String multicastAddress; 83 protected String multicastPort; 84 protected String configURL; 85 86 89 public JavaGroupsBridge(EventSubject localSubject, String externalSubject) { 90 super(localSubject, externalSubject); 91 } 92 93 public String getConfigURL() { 94 return configURL; 95 } 96 97 public void setConfigURL(String configURL) { 98 this.configURL = configURL; 99 } 100 101 public String getMulticastAddress() { 102 return multicastAddress; 103 } 104 105 public void setMulticastAddress(String multicastAddress) { 106 this.multicastAddress = multicastAddress; 107 } 108 109 public String getMulticastPort() { 110 return multicastPort; 111 } 112 113 public void setMulticastPort(String multicastPort) { 114 this.multicastPort = multicastPort; 115 } 116 117 public byte[] getState() { 118 return state; 119 } 120 121 public void setState(byte[] state) { 122 this.state = state; 123 } 124 125 129 public void receive(Message message) { 130 try { 131 if (logObj.isDebugEnabled()) { 132 logObj.debug("Received Message from: " + message.getSrc()); 133 } 134 135 CayenneEvent event = messageObjectToEvent((Serializable ) message.getObject()); 136 if (event != null) { 137 if (logObj.isDebugEnabled()) { 138 logObj.debug("Received CayenneEvent: " + event.getClass().getName()); 139 } 140 141 onExternalEvent(event); 142 } 143 } catch (Exception ex) { 144 logObj.info("Exception while processing message: ", ex); 145 } 146 147 } 148 149 protected void startupExternal() throws Exception { 150 153 if (configURL != null) { 156 logObj.debug("creating channel with configuration from " + configURL); 157 channel = new JChannel(configURL); 158 } else { 159 String configString = buildConfigString(); 160 logObj.debug("creating channel with properties: " + configString); 161 channel = new JChannel(configString); 162 } 163 164 channel.setOpt(Channel.LOCAL, Boolean.FALSE); 166 channel.connect(externalSubject); 167 logObj.debug("channel connected."); 168 169 if (receivesExternalEvents()) { 170 adapter = new PullPushAdapter(channel, this); 171 } 172 } 173 174 178 protected String buildConfigString() { 179 if (multicastAddress == null) { 180 throw new IllegalStateException ("'multcastAddress' is not set"); 181 } 182 183 if (multicastPort == null) { 184 throw new IllegalStateException ("'multcastPort' is not set"); 185 } 186 187 return "UDP(mcast_addr=" 188 + multicastAddress 189 + ";mcast_port=" 190 + multicastPort 191 + ";ip_ttl=32):" 192 + "PING(timeout=3000;num_initial_members=6):" 193 + "FD(timeout=3000):" 194 + "VERIFY_SUSPECT(timeout=1500):" 195 + "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):" 196 + "pbcast.STABLE(desired_avg_gossip=10000):" 197 + "FRAG:" 198 + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" 199 + "shun=true;print_local_addr=false)"; 200 } 201 202 protected void shutdownExternal() throws Exception { 203 try { 204 if (adapter != null) { 205 adapter.stop(); 206 } 207 208 channel.close(); 209 } finally { 210 adapter = null; 211 channel = null; 212 } 213 } 214 215 protected void sendExternalEvent(CayenneEvent localEvent) throws Exception { 216 logObj.debug("Sending event remotely: " + localEvent); 217 Message message = new Message(null, null, eventToMessageObject(localEvent)); 218 channel.send(message); 219 } 220 221 226 protected Serializable eventToMessageObject(CayenneEvent event) throws Exception { 227 return event; 228 } 229 230 235 protected CayenneEvent messageObjectToEvent(Serializable object) throws Exception { 236 return (object instanceof CayenneEvent) ? (CayenneEvent) object : null; 237 } 238 } 239 | Popular Tags |