1 46 47 package org.mr.api.rmi.thin; 48 49 import java.rmi.RemoteException ; 50 import java.util.ArrayList ; 51 import java.util.HashMap ; 52 import java.util.Iterator ; 53 import java.util.List ; 54 55 import org.mr.MantaAgent; 56 import org.mr.MantaAgentConstants; 57 import org.mr.api.rmi.MantaRMIServer; 58 import org.mr.core.protocol.MantaBusMessage; 59 import org.mr.core.protocol.MantaBusMessageConsts; 60 import org.mr.core.util.SystemTime; 61 import org.mr.core.util.byteable.ByteableText; 62 import org.mr.kernel.services.MantaService; 63 import org.mr.kernel.services.ServiceConsumer; 64 import org.mr.kernel.services.ServiceProducer; 65 import org.mr.kernel.services.topics.TopicGatherListener; 66 import org.mr.kernel.world.WorldModeler; 67 68 76 public class ThinRMIMantarayImpl implements ThinMessagingInterface { 77 private HashMap consumers = new HashMap (); 78 private HashMap producers = new HashMap (); 79 private HashMap listeners = new HashMap (); 80 81 82 public ThinRMIMantarayImpl(){ 83 84 } 85 86 87 public boolean enqueueMessage(String userKey, String queueName ,String msg ) throws RemoteException { 88 try{ 89 90 MantaBusMessage message = MantaBusMessage.getInstance(); 91 ByteableText str = new ByteableText(msg); 92 message.setPayload(str); 93 message.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 95 ServiceProducer producer = (ServiceProducer)producers.get(userKey+queueName); 96 if(producer == null){ 97 MantaService service = MantaRMIServer.manta.getService(queueName, MantaService.SERVICE_TYPE_QUEUE); 98 producer = ServiceProducer.createNew(service); 99 MantaRMIServer.manta.advertiseService(producer); 100 producers.put(userKey+queueName, producer); 101 } 102 MantaRMIServer.manta.enqueueMessage(message, producer, MantaAgentConstants.NON_PERSISTENT,MantaAgentConstants.NORMAL,600000+SystemTime.gmtCurrentTimeMillis() ); 103 }catch(Exception e){ 104 e.printStackTrace(); 105 throw new RemoteException ("error in manta", e); 106 } 107 return true; 108 109 } 110 111 public String [] getQueues() throws RemoteException { 112 WorldModeler world = MantaAgent.getInstance().getSingletonRepository().getWorldModeler(); 113 114 ArrayList list = new ArrayList (); 115 Iterator iter = world.getServices(world.getDefaultDomainName()).iterator(); 116 while(iter.hasNext()){ 117 MantaService service = (MantaService)iter.next(); 118 if(service.getServiceType() == MantaService.SERVICE_TYPE_QUEUE){ 119 list.add(service.getServiceName()); 120 } 121 } 122 String [] result = new String [list.size()]; 123 for (int i = 0; i < list.size(); i++) { 124 result[i] = (String )list.get(i); 125 } 126 return result; 127 } 128 129 public String denqueueMessage(String userKey, String queueName ) throws RemoteException { 130 String result = null; 131 try{ 132 ServiceConsumer consumer = (ServiceConsumer)consumers.get(userKey+queueName); 133 134 if(consumer == null){ 135 MantaService service = MantaRMIServer.manta.getService(queueName, MantaService.SERVICE_TYPE_QUEUE); 136 consumer = new ServiceConsumer(MantaRMIServer.manta.getAgentName(), MantaRMIServer.manta.getDomainName(), service.getServiceName(), service.getServiceType(),MantaAgentConstants.AUTO_ACK ); 137 MantaRMIServer.manta.advertiseService(consumer); 138 consumers.put(userKey+queueName, consumer); 139 } 140 MantaBusMessage msg = MantaRMIServer.manta.receive(consumer,1000); 141 if(msg != null) 142 result = String.valueOf(msg.getPayload()) ; 143 return result; 144 }catch(Exception e){ 145 e.printStackTrace(); 146 throw new RemoteException ("error in manta", e); 147 } 148 149 } 150 151 152 155 public void publishMessage(String userKey, String topicName, String msg) throws RemoteException { 156 try{ 157 158 MantaBusMessage message = MantaBusMessage.getInstance(); 159 ByteableText str = new ByteableText(msg); 160 message.setPayload(str); 161 message.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 163 ServiceProducer producer = (ServiceProducer)producers.get(userKey+topicName); 164 if(producer == null){ 165 MantaService service = MantaRMIServer.manta.getService(topicName, MantaService.SERVICE_TYPE_TOPIC); 166 producer = ServiceProducer.createNew(service); 167 MantaRMIServer.manta.advertiseService(producer); 168 producers.put(userKey+topicName, producer); 169 } 170 MantaRMIServer.manta.publish(message, producer, MantaAgentConstants.NON_PERSISTENT,MantaAgentConstants.NORMAL,600000 +SystemTime.gmtCurrentTimeMillis()); 171 }catch(Exception e){ 172 e.printStackTrace(); 173 throw new RemoteException ("error in manta", e); 174 } 175 176 } 177 178 179 182 public synchronized void subscribe(String userKey, String topicName, int messagesToCash) throws RemoteException { 183 TopicGatherListener listener = (TopicGatherListener) listeners.get(userKey+topicName); 184 if(listener != null) 185 return; 186 187 try{ 188 ServiceConsumer consumer = (ServiceConsumer)consumers.get(userKey+topicName); 189 190 if(consumer == null){ 191 MantaService service = MantaRMIServer.manta.getService(topicName, MantaService.SERVICE_TYPE_TOPIC); 192 if(service == null){ 193 throw new RemoteException ("no service "+topicName); 194 } 195 consumer = new ServiceConsumer(MantaRMIServer.manta.getAgentName(), MantaRMIServer.manta.getDomainName(), service.getServiceName(), service.getServiceType(),MantaAgentConstants.AUTO_ACK ); 196 MantaRMIServer.manta.advertiseService(consumer); 197 consumers.put(userKey+topicName, consumer); 198 } 199 listener = new TopicGatherListener(messagesToCash); 200 listeners.put(userKey+topicName ,listener ); 201 MantaRMIServer.manta.subscribeToTopic(listener , consumer.getServiceName()); 202 }catch(Exception e){ 203 e.printStackTrace(); 204 throw new RemoteException ("error in manta", e); 205 } 206 207 } 208 209 210 213 public void unsubscribe(String userKey, String topicName) throws RemoteException { 214 TopicGatherListener listener = (TopicGatherListener) listeners.get(userKey+topicName); 215 if(listener != null){ 216 MantaRMIServer.manta.unsubscribeFromTopic(listener ,topicName ); 217 listeners.remove(listener); 218 } 219 220 } 221 222 223 226 public String [] getMessageFromTopic(String userKey, String topicName) throws RemoteException { 227 TopicGatherListener listener = (TopicGatherListener) listeners.get(userKey+topicName); 228 List messages = listener.getMessages(); 229 230 String [] result = new String [messages.size()]; 231 for (int i = 0; i < messages.size(); i++) { 232 result[i] = String.valueOf(((MantaBusMessage)messages.get(i)).getPayload()); 233 } 234 235 return result; 236 } 237 238 239 242 public String [] getTopics() throws RemoteException { 243 WorldModeler world = MantaAgent.getInstance().getSingletonRepository().getWorldModeler(); 244 245 ArrayList list = new ArrayList (); 246 Iterator iter = world.getServices(world.getDefaultDomainName()).iterator(); 247 while(iter.hasNext()){ 248 MantaService service = (MantaService)iter.next(); 249 if(service.getServiceType() == MantaService.SERVICE_TYPE_TOPIC){ 250 list.add(service.getServiceName()); 251 } 252 } 253 String [] result = new String [list.size()]; 254 for (int i = 0; i < list.size(); i++) { 255 result[i] = (String )list.get(i); 256 } 257 return result; 258 } 259 260 261 262 263 } 264 | Popular Tags |