1 24 25 package org.objectweb.tribe.adapters; 26 27 import java.io.Serializable ; 28 import java.util.ArrayList ; 29 import java.util.HashMap ; 30 31 import org.objectweb.tribe.channel.ReliableGroupChannel; 32 import org.objectweb.tribe.common.Member; 33 import org.objectweb.tribe.common.log.Trace; 34 import org.objectweb.tribe.exceptions.ChannelException; 35 import org.objectweb.tribe.exceptions.NotConnectedException; 36 import org.objectweb.tribe.exceptions.TimeoutException; 37 import org.objectweb.tribe.messages.MessageListener; 38 39 47 public class MulticastRequestAdapter implements MessageListener 48 { 49 50 public static final int WAIT_NONE = 0; 51 52 public static final int WAIT_FIRST = 1; 53 54 public static final int WAIT_MAJORITY = 2; 55 56 public static final int WAIT_ALL = 3; 57 58 private ReliableGroupChannel channel; 59 private Member me; 60 private MessageListener msgListener; 61 private MulticastRequestListener multicastRequestListener; 62 private HashMap pendingQueries; 63 private PullPushAdapter pullPushAdapter = null; 64 private int sequenceNumber = 0; 65 private static Trace logger = Trace 66 .getLogger("org.objectweb.tribe.blocks.multicastadapter"); 67 68 71 public MulticastRequestAdapter(ReliableGroupChannel channel, 72 MessageListener msgListener, MulticastRequestListener dispatcherListener) 73 { 74 pullPushAdapter = new PullPushAdapter(channel, this); 75 this.channel = channel; 76 this.msgListener = msgListener; 77 this.multicastRequestListener = dispatcherListener; 78 this.me = channel.getLocalMembership(); 79 this.pendingQueries = new HashMap (); 80 } 81 82 87 public ReliableGroupChannel getChannel() 88 { 89 return channel; 90 } 91 92 97 public void stop() 98 { 99 pullPushAdapter.stop(); 100 } 101 102 105 public void receive(Serializable msg) 106 { 107 if (msg instanceof MulticastRequestAdapterMessage) 108 { 109 MulticastRequestAdapterMessage requestReply = (MulticastRequestAdapterMessage) msg; 110 if (requestReply.isReply()) 111 { MulticastResponse resp; 113 Integer key = new Integer (requestReply.getUid()); 114 synchronized (pendingQueries) 115 { 116 resp = (MulticastResponse) pendingQueries.get(key); 117 } 118 if (resp == null) 119 { if (logger.isDebugEnabled()) 122 logger.debug("Dropping response to request " 123 + requestReply.getUid()); 124 return; 125 } 126 if (logger.isDebugEnabled()) 128 logger.debug("Received reply from " + requestReply.getSender() 129 + " to message " + requestReply.getUid()); 130 resp.addResult(requestReply.getSender(), requestReply.getMessage()); 131 } 132 else 133 { Object callback1 = multicastRequestListener 135 .handleMessageSingleThreaded(requestReply.getMessage(), 136 requestReply.getSender()); 137 138 new MulticastRequestAdapterThread(multicastRequestListener, 140 requestReply, callback1, pullPushAdapter, me).start(); 141 } 142 } 143 else 144 msgListener.receive(msg); 146 } 147 148 161 public MulticastResponse multicastMessage(ArrayList dests, Serializable msg, 162 int waitMode, long timeout) throws TimeoutException, ChannelException, 163 NotConnectedException 164 { 165 int seq = nextSequenceNumber(); 167 MulticastResponse reply = new MulticastResponse(dests, waitMode); 169 MulticastRequestAdapterMessage toSend; 170 if (waitMode == WAIT_NONE) 171 { toSend = new MulticastRequestAdapterMessage(msg, me, seq, 173 MulticastRequestAdapterMessage.REQUEST_ONLY); 174 if (logger.isDebugEnabled()) 175 logger.debug("Sending request " + seq + " to " + dests.size() 176 + " members."); 177 ArrayList failed = pullPushAdapter.send(toSend, dests); 178 reply.setFailedMembers(failed); 179 return reply; 180 } 181 else 182 { toSend = new MulticastRequestAdapterMessage(msg, me, seq, 184 MulticastRequestAdapterMessage.REQUEST); 185 186 Integer key = new Integer (seq); 187 synchronized (pendingQueries) 188 { 189 pendingQueries.put(key, reply); 190 } 191 192 if (logger.isDebugEnabled()) 194 logger.debug("Sending message " + key + " to " + dests.size() 195 + " members."); 196 ArrayList failed = pullPushAdapter.send(toSend, dests); 197 reply.setFailedMembers(failed); 198 reply.waitForCompletion(timeout); 199 200 synchronized (pendingQueries) 202 { 203 pendingQueries.remove(key); 204 } 205 return reply; 206 } 207 } 208 209 private synchronized int nextSequenceNumber() 210 { 211 return sequenceNumber++; 212 } 213 214 } | Popular Tags |