1 46 package org.mr.api.rmi.thin; 47 48 import java.rmi.RemoteException ; 49 import java.util.ArrayList ; 50 import java.util.HashMap ; 51 import java.util.Iterator ; 52 import java.util.LinkedList ; 53 54 import javax.jms.Connection ; 55 import javax.jms.ConnectionFactory ; 56 import javax.jms.JMSException ; 57 import javax.jms.Message ; 58 import javax.jms.MessageConsumer ; 59 import javax.jms.MessageListener ; 60 import javax.jms.MessageProducer ; 61 import javax.jms.Queue ; 62 import javax.jms.Session ; 63 import javax.jms.TextMessage ; 64 import javax.jms.Topic ; 65 66 import org.mr.MantaAgent; 67 import org.mr.api.jms.MantaConnectionFactory; 68 import org.mr.kernel.services.MantaService; 69 import org.mr.kernel.world.WorldModeler; 70 71 public class ThinRMIJMSImpl implements ThinMessagingInterface { 72 78 class Key { 79 String user; 80 String destination; 81 82 public Key(String user, String destination) { 83 this.user = user; 84 this.destination = destination; 85 } 86 87 public int hashCode() { 88 return user.hashCode() + destination.hashCode(); 89 } 90 91 public boolean equals(Object other) { 92 if (!(other instanceof Key)) { 93 return false; 94 } 95 if (other == null) { 96 return false; 97 } 98 if (this == other) { 99 return true; 100 } 101 return 102 this.user.equals(((Key) other).user) && 103 this.destination.equals(((Key) other).destination); 104 } 105 } 106 107 class ConsumerPlus implements MessageListener { 108 MessageConsumer consumer; 109 LinkedList messages; 110 int maxMessages; 111 112 public void onMessage(Message message) { 113 synchronized (this.messages) { 114 if (this.messages.size() < this.maxMessages) { 115 this.messages.add(message); 116 } 117 } 118 } 119 } 120 121 private ConnectionFactory factory; 122 private Connection connection; 123 private Session session; 124 private HashMap consumers; 125 private HashMap producers; 126 private boolean initJMS; 127 public ThinRMIJMSImpl() { 128 this.consumers = new HashMap (); 129 this.producers = new HashMap (); 130 this.initJMS = false; 131 } 132 133 private void initJMS() throws RemoteException { 134 if (!initJMS) { 135 try { 136 this.factory = new MantaConnectionFactory(); 137 this.connection = factory.createConnection(); 138 this.session = 139 connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 140 connection.start(); 141 initJMS = true; 142 } catch (JMSException e) { 143 throw new RemoteException ("Error in Manta", e); 144 } 145 } 146 } 147 148 public boolean enqueueMessage(String userKey, String queueName, 149 String text) 150 throws RemoteException 151 { 152 initJMS(); 153 try { 154 TextMessage message = this.session.createTextMessage(); 155 Key key = new Key(userKey, queueName); 156 MessageProducer producer = 157 (MessageProducer ) this.producers.get(key); 158 159 if (producer == null) { 160 Queue queue = this.session.createQueue(queueName); 161 producer = this.session.createProducer(queue); 162 this.producers.put(key, producer); 163 } 164 165 message.setText(text); 166 producer.send(message); 167 return true; 168 } catch (JMSException e) { 169 throw new RemoteException ("Error in Manta", e); 170 } 171 } 172 173 public String denqueueMessage(String userKey, String queueName) 174 throws RemoteException 175 { 176 initJMS(); 177 try { 178 String result = null; 179 Key key = new Key(userKey, queueName); 180 MessageConsumer consumer = 181 (MessageConsumer ) this.consumers.get(key); 182 183 if (consumer == null) { 184 Queue queue = this.session.createQueue(queueName); 185 consumer = this.session.createConsumer(queue); 186 this.consumers.put(key, consumer); 187 } 188 189 TextMessage message = (TextMessage ) consumer.receive(1000); 190 if (message != null) { 191 result = message.getText(); 192 } 193 return result; 194 } catch (JMSException e) { 195 throw new RemoteException ("Error in Manta", e); 196 } 197 } 198 199 public void publishMessage(String userKey, String topicName, String text) 200 throws RemoteException 201 { 202 initJMS(); 203 try { 204 TextMessage message = this.session.createTextMessage(); 205 Key key = new Key(userKey, topicName); 206 MessageProducer producer = 207 (MessageProducer ) this.producers.get(key); 208 209 if (producer == null) { 210 Topic topic = this.session.createTopic(topicName); 211 producer = this.session.createProducer(topic); 212 this.producers.put(key, producer); 213 } 214 215 message.setText(text); 216 producer.send(message); 217 } catch (JMSException e) { 218 throw new RemoteException ("Error in Manta", e); 219 } 220 } 221 222 public void subscribe(String userKey, String topicName, int cacheSize) 223 throws RemoteException 224 { 225 initJMS(); 226 try { 227 Key key = new Key(userKey, topicName); 228 ConsumerPlus cp = (ConsumerPlus) this.consumers.get(key); 229 if (cp != null) { 230 return; 231 } 232 233 234 Topic topic = this.session.createTopic(topicName); 235 MessageConsumer consumer = this.session.createConsumer(topic); 236 237 cp = new ConsumerPlus(); 238 cp.consumer = consumer; 239 cp.messages = new LinkedList (); 240 cp.maxMessages = cacheSize; 241 242 consumer.setMessageListener(cp); 243 244 this.consumers.put(key, cp); 245 } catch (JMSException e) { 246 throw new RemoteException ("Error in Manta", e); 247 } 248 } 249 250 public void unsubscribe(String userKey, String topicName) 251 throws RemoteException 252 { 253 initJMS(); 254 try { 255 Key key = new Key(userKey, topicName); 256 ConsumerPlus cp = (ConsumerPlus) this.consumers.get(key); 257 if (cp != null) { 258 cp.consumer.close(); 259 } 260 } catch (JMSException e) { 261 throw new RemoteException ("Error in Manta", e); 262 } 263 } 264 265 public String [] getMessageFromTopic(String userKey, String topicName) 266 throws RemoteException 267 { 268 initJMS(); 269 try { 270 String [] results = null; 271 Key key = new Key(userKey, topicName); 272 ConsumerPlus cp = (ConsumerPlus) this.consumers.get(key); 273 if (cp != null) { 274 synchronized (cp.messages) { 275 results = new String [cp.messages.size()]; 276 Iterator iter = cp.messages.iterator(); 277 int index = 0; 278 while (iter.hasNext()) { 279 TextMessage message = (TextMessage ) iter.next(); 280 results[index++] = message.getText(); 281 } 282 } 283 } 284 285 return results; 286 } catch (JMSException e) { 287 throw new RemoteException ("Error in Manta", e); 288 } 289 } 290 291 294 public String [] getTopics() throws RemoteException { 295 initJMS(); 296 WorldModeler world = MantaAgent.getInstance().getSingletonRepository().getWorldModeler(); 297 298 ArrayList list = new ArrayList (); 299 Iterator iter = world.getServices(world.getDefaultDomainName()).iterator(); 300 while(iter.hasNext()){ 301 MantaService service = (MantaService)iter.next(); 302 if(service.getServiceType() == MantaService.SERVICE_TYPE_TOPIC){ 303 list.add(service.getServiceName()); 304 } 305 } 306 String [] result = new String [list.size()]; 307 for (int i = 0; i < list.size(); i++) { 308 result[i] = (String )list.get(i); 309 } 310 return result; 311 } 312 313 public String [] getQueues() throws RemoteException { 314 initJMS(); 315 WorldModeler world = MantaAgent.getInstance().getSingletonRepository().getWorldModeler(); 316 317 ArrayList list = new ArrayList (); 318 Iterator iter = world.getServices(world.getDefaultDomainName()).iterator(); 319 while(iter.hasNext()){ 320 MantaService service = (MantaService)iter.next(); 321 if(service.getServiceType() == MantaService.SERVICE_TYPE_QUEUE){ 322 list.add(service.getServiceName()); 323 } 324 } 325 String [] result = new String [list.size()]; 326 for (int i = 0; i < list.size(); i++) { 327 result[i] = (String )list.get(i); 328 } 329 return result; 330 } 331 } | Popular Tags |