1 24 25 package org.objectweb.tribe.channel; 26 27 import java.io.ByteArrayInputStream ; 28 import java.io.ObjectInputStream ; 29 import java.io.Serializable ; 30 import java.util.ArrayList ; 31 import java.util.HashMap ; 32 33 import org.jgroups.Address; 34 import org.jgroups.BlockEvent; 35 import org.jgroups.ChannelClosedException; 36 import org.jgroups.ChannelNotConnectedException; 37 import org.jgroups.JChannel; 38 import org.jgroups.Message; 39 import org.jgroups.SuspectEvent; 40 import org.jgroups.TimeoutException; 41 import org.jgroups.View; 42 import org.objectweb.tribe.common.Group; 43 import org.objectweb.tribe.common.GroupIdentifier; 44 import org.objectweb.tribe.common.Member; 45 import org.objectweb.tribe.exceptions.AlreadyMemberException; 46 import org.objectweb.tribe.exceptions.ChannelException; 47 import org.objectweb.tribe.exceptions.NotConnectedException; 48 import org.objectweb.tribe.gms.JGroupsMembershipService; 49 import org.objectweb.tribe.messages.FragmentedMessage; 50 import org.objectweb.tribe.messages.GroupMessage; 51 52 60 public class JGroupsReliableChannelWithGms extends ReliableGroupChannelWithGms 61 { 62 private JChannel jgroupsChannel; 64 private JGroupsMembershipService jgroupsGMS; 65 66 private static final int FRAGMENT_SIZE = 32000; 68 private HashMap fragmentList = new HashMap (); 69 70 76 public JGroupsReliableChannelWithGms(JGroupsMembershipService gms) 77 { 78 super(gms); 79 this.jgroupsGMS = gms; 80 this.jgroupsChannel = gms.getJGroupsChannel(); 81 } 82 83 86 public void join(Group g) throws AlreadyMemberException, ChannelException, 87 NotConnectedException 88 { 89 if ((currentGroup != null) && currentGroup.equals(g)) 90 throw new AlreadyMemberException(); 91 92 if (currentGroup != null) 94 try 95 { 96 quit(); 97 } 98 catch (Exception ignore) 99 { 100 } 101 102 currentGroup = g; 103 me = jgroupsGMS.join(this, g.getGroupIdentifier()); 104 } 105 106 109 public void quit() throws ChannelException, NotConnectedException 110 { 111 if (currentGroup == null) 112 throw new NotConnectedException(); 113 jgroupsChannel.close(); 114 currentGroup = null; 115 } 116 117 120 public Serializable receive() throws ChannelException, NotConnectedException 121 { 122 if (currentGroup == null) 123 throw new NotConnectedException(); 124 while (true) 125 { try 127 { 128 Object obj = jgroupsChannel.receive(0); 129 130 if (obj == null) 133 continue; 134 135 if (obj instanceof View) 136 { 137 jgroupsGMS.viewAccepted((View) obj); 138 continue; 139 } 140 else if (obj instanceof SuspectEvent) 141 { 142 jgroupsGMS.suspect((Address) ((SuspectEvent) obj).getMember()); 143 continue; 144 } 145 else if (obj instanceof BlockEvent) 146 { 147 jgroupsGMS.block(); 148 continue; 149 } 150 else if (obj instanceof Message) 151 { 152 Object content = ((Message) obj).getObject(); 153 GroupMessage groupMessage = null; 154 if (content instanceof FragmentedMessage) 155 { 156 FragmentedMessage fragment = (FragmentedMessage) content; 157 byte[] message; 158 if (fragment.getFragmentId() == 0) 159 { message = new byte[fragment.getMessageSize()]; 161 fragmentList.put(fragment.getMessageId(), message); 162 } 163 else 164 { message = (byte[]) fragmentList.get(fragment.getMessageId()); 166 } 167 168 int offset = fragment.getFragmentId() * FRAGMENT_SIZE; 170 int fragmentLength = fragment.getFragmentData().length; 171 System.arraycopy(fragment.getFragmentData(), 0, message, offset, 172 fragmentLength); 173 174 if (offset + fragmentLength == fragment.getMessageSize()) 176 { try 178 { 179 ObjectInputStream ois = new ObjectInputStream ( 180 new ByteArrayInputStream (message)); 181 groupMessage = (GroupMessage) ois.readObject(); 182 } 183 catch (Exception e) 184 { 185 e.printStackTrace(); 186 throw new ChannelException( 187 "Failed to reassemble fragmented message", e); 188 } 189 finally 190 { fragmentList.remove(fragment.getMessageId()); 192 } 193 } 194 else 195 { continue; 197 } 198 } 199 else if (content instanceof GroupMessage) 200 groupMessage = (GroupMessage) content; 201 202 if (groupMessage == null) 205 return null; 206 else 207 { ArrayList members = (ArrayList ) groupMessage.getChunks().get(1); 209 if (members.contains(getLocalMembership())) 210 return groupMessage.getMessage(); 211 } 212 } 213 else 214 System.out.println("Unhandled JGroups message type (" 215 + obj.getClass() + "): " + obj); 216 } 217 catch (ChannelNotConnectedException e) 218 { 219 throw new NotConnectedException(e); 220 } 221 catch (ChannelClosedException e) 222 { 223 throw new NotConnectedException(e); 224 } 225 catch (TimeoutException e) 226 { 227 throw new ChannelException( 228 "Timeout while retrieving message from channel", e); 229 } 230 } 231 } 232 233 237 public ArrayList send(Serializable msg, GroupIdentifier gid, ArrayList members) 238 throws ChannelException, NotConnectedException 239 { 240 GroupMessage tribeMessage = new GroupMessage(msg, gid); 244 tribeMessage.addChunk(members); 245 int msgSize = tribeMessage.getByteArray().length; 246 try 247 { 248 if (msgSize > FRAGMENT_SIZE) 249 { byte[] completeMessageInBytes = tribeMessage.getByteArray(); 251 byte[] fragmentData = new byte[FRAGMENT_SIZE]; 252 FragmentedMessage fragment = new FragmentedMessage(msgSize); 253 int nbOfFragments = msgSize / FRAGMENT_SIZE; 254 int currentFragment = 0; 255 while (currentFragment < nbOfFragments) 256 { 257 System.arraycopy(completeMessageInBytes, currentFragment 258 * FRAGMENT_SIZE, fragmentData, 0, FRAGMENT_SIZE); 259 fragment.setFragment(currentFragment, fragmentData); 260 jgroupsChannel.send(new Message(null, null, fragment)); 261 currentFragment++; 262 } 263 int lastFragmentSize = msgSize % FRAGMENT_SIZE; 265 if (lastFragmentSize > 0) 266 { 267 fragmentData = new byte[lastFragmentSize]; 269 System.arraycopy(completeMessageInBytes, currentFragment 270 * FRAGMENT_SIZE, fragmentData, 0, lastFragmentSize); 271 fragment.setFragment(currentFragment, fragmentData); 272 jgroupsChannel.send(new Message(null, null, fragment)); 273 } 274 } 275 else 276 { Message jgroupsMsg = new Message(null, null, tribeMessage); 278 jgroupsChannel.send(jgroupsMsg); 279 } 280 return null; 283 } 284 catch (ChannelNotConnectedException e) 285 { 286 throw new NotConnectedException(e); 287 } 288 catch (ChannelClosedException e) 289 { 290 throw new NotConnectedException(e); 291 } 292 } 293 294 297 public String toString() 298 { 299 return "JGroups channel wrapper: " + jgroupsChannel; 300 } 301 302 307 public String getProperties() 308 { 309 return jgroupsChannel.getProperties(); 310 } 311 312 315 public Member getLocalMembership() 316 { 317 return JGroupsMembershipService 318 .memberFromJGroupsAddress((org.jgroups.stack.IpAddress) jgroupsChannel 319 .getLocalAddress()); 320 } 321 322 325 public Group getCurrentGroup() 326 { 327 if (currentGroup == null) 328 return null; 329 else 330 return gms.getGroup(currentGroup.getGroupIdentifier()); 331 } 332 333 } | Popular Tags |