1 46 51 package org.mr.kernel.delivery; 52 53 import java.io.IOException ; 54 55 import org.apache.commons.logging.LogFactory; 56 import org.mr.MantaAgent; 57 import org.mr.core.protocol.MantaBusMessage; 58 import org.mr.core.protocol.PayloadContainer; 59 import org.mr.core.util.SynchronizedQueue; 60 61 68 public class NetworkModerator { 69 private SynchronizedQueue unBufferedMessages = new SynchronizedQueue(); 71 private SynchronizedQueue bufferedMessages = new SynchronizedQueue(); 74 75 private String recipient; 77 private int maxNumberOfMessages; 79 private MantaBusMessage messageInPipe = null; 83 84 85 86 public NetworkModerator(String recipient , int maxNumberOfMessages ){ 87 this.recipient = recipient; 88 this.maxNumberOfMessages =maxNumberOfMessages; 89 } 90 91 95 public final synchronized void sendToNetwork(MantaBusMessage msg){ 96 if(messageInPipe != null){ 97 if(unBufferedMessages.isEmpty() == false || bufferedMessages.size() >=maxNumberOfMessages){ 98 unBufferedMessages.enqueue(msg); 99 }else{ 100 try { 101 if(msg.getPayloadContainer()!= null) 102 msg.getPayloadContainer().getSerializedPayload(); 103 bufferedMessages.enqueue(msg); 104 } catch (IOException e) { 105 LogFactory.getLog("NetworkModerator").error(e); 106 } 107 } 108 }else{ 109 messageInPipe = msg; 110 MantaAgent.getInstance().getSingletonRepository().getNetworkManager().sendBuffer(msg); 111 } 112 113 } 115 120 public final synchronized void messageSentByNetwork( MantaBusMessage sent){ 121 if(sent != messageInPipe){ 122 return; 123 } 124 if(messageInPipe != null) 125 messageInPipe.releaseBuffers(); 126 MantaBusMessage msg = null; 127 if(bufferedMessages.size() <maxNumberOfMessages){ 128 msg = (MantaBusMessage) unBufferedMessages.dequeueNoBlock(); 129 if(msg!= null){ 130 try { 131 PayloadContainer con = msg.getPayloadContainer(); 132 if(con != null) 133 con.getSerializedPayload(); 134 bufferedMessages.enqueue(msg); 135 } catch (IOException e) { 136 LogFactory.getLog("NetworkModerator").error(e); 137 } 138 } 139 140 } 141 if(!bufferedMessages.isEmpty()){ 142 msg=(MantaBusMessage) bufferedMessages.dequeueNoBlock(); 143 } 144 if(msg!= null){ 145 messageInPipe=msg; 146 MantaAgent.getInstance().getSingletonRepository().getNetworkManager().sendBuffer(msg); 147 }else{ 148 messageInPipe = null; 149 } 150 151 } 153 public final synchronized void messageSendFailByNetwork(MantaBusMessage sent){ 154 if(messageInPipe != null) { 155 if(sent != messageInPipe){ 156 return; 157 } 158 messageInPipe.releaseBuffers(); 159 messageInPipe = null; 160 } 161 } 162 163 164 167 public synchronized void clear() { 168 this.unBufferedMessages.clear(); 169 MantaBusMessage msg = null; 170 while((msg=(MantaBusMessage) bufferedMessages.dequeueNoBlock())!= null){ 171 msg.releaseBuffers(); 172 } 173 bufferedMessages.clear(); 174 175 } 176 177 } 178 | Popular Tags |