1 21 package com.presumo.mobileagent; 22 23 import com.presumo.util.log.Logger; 24 import com.presumo.util.log.LoggerFactory; 25 26 import java.io.BufferedReader ; 27 import java.io.InputStreamReader ; 28 import java.io.IOException ; 29 30 import java.net.InetAddress ; 31 import java.util.Iterator ; 32 import java.util.Vector ; 33 34 import javax.jms.ExceptionListener ; 35 import javax.jms.Topic ; 36 import javax.jms.TopicConnection ; 37 import javax.jms.TopicConnectionFactory ; 38 import javax.jms.TopicSession ; 39 import javax.jms.TopicSubscriber ; 40 import javax.jms.TopicPublisher ; 41 import javax.jms.Message ; 42 import javax.jms.MessageListener ; 43 import javax.jms.ObjectMessage ; 44 import javax.jms.JMSException ; 45 import javax.jms.Session ; 46 47 48 52 public class AgentRunner implements MessageListener , ExceptionListener  53 { 54 protected String name; 55 protected TopicConnection connection; 56 protected Vector agents; 57 58 62 public AgentRunner() throws JMSException  63 { 64 this("localhost", 2323); 65 } 66 67 public AgentRunner(String server, int port) throws JMSException  68 { 69 logger.entry("AgentRunner", server, new Integer (port)); 70 71 com.presumo.jms.client.JmsTopicConnectionFactory factory = 72 new com.presumo.jms.client.JmsTopicConnectionFactory(); 73 factory.setHost(server); 74 factory.setPort(port); 75 76 connection = factory.createTopicConnection(); 77 connection.setExceptionListener(this); 78 79 TopicSession session = 80 connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 81 name = session.createMessage().getJMSMessageID().toString(); 83 84 Topic topic = session.createTopic(Agent.AGENT_TOPIC); 85 TopicSubscriber sub = session.createSubscriber(topic, 86 createFilter(), 87 false); 88 sub.setMessageListener(this); 89 90 connection.start(); 91 92 agents = new Vector (); 93 94 logger.exit("AgentRunner", this); 95 } 96 97 public AgentRunner(TopicConnection connection) throws JMSException  98 { 99 logger.entry("AgentRunner", connection); 100 101 this.connection = connection; 102 103 try { 104 name = InetAddress.getLocalHost().getHostName(); 105 } catch (IOException ioe) { 106 name = "agentrunner"; 107 } 108 109 TopicSession session = 110 connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 111 Topic topic = session.createTopic(Agent.AGENT_TOPIC); 112 TopicSubscriber sub = session.createSubscriber(topic); 113 sub.setMessageListener(this); 114 115 agents = new Vector (); 116 117 logger.exit("AgentRunner", this); 118 } 119 120 124 public void onException(JMSException jmsex) 125 { 126 logger.exception(jmsex); 127 } 128 129 public void onMessage(Message msg) 130 { 131 logger.entry("onMessage", msg); 132 int type; 133 try { 134 type = msg.getIntProperty(Agent.MESSAGE_TYPE); 135 136 switch(type) { 137 case(Agent.AGENT_MOVE): 138 Agent agent = (Agent) ((ObjectMessage )msg).getObject(); 139 logger.debug("Received agent: " + agent); 140 agent.setConnection(connection); 141 AgentAdapter aa = new AgentAdapter(agent); 142 Thread t = new Thread (aa); 143 t.start(); 144 break; 145 case(Agent.AGENT_START): 146 logger.debug("Received start message"); 147 startAgents(); 148 break; 149 case(Agent.AGENT_STOP): 150 logger.debug("Received stop message"); 151 stopAgents(); 152 break; 153 case(Agent.RUNNER_QUERY): 154 logger.debug("Received agent runner query"); 155 sendQueryResponse(); 156 break; 157 } 158 } catch (JMSException jmsex) { 159 jmsex.printStackTrace(); 160 } 161 162 logger.exit("onMessage"); 163 } 164 165 166 public void stop() 167 { 168 logger.entry("stop"); 169 try { 170 connection.close(); 171 } catch (JMSException jmsex) {} 172 logger.exit("stop"); 173 } 174 175 176 protected String createFilter() 180 { 181 logger.entry("creatFilter"); 182 StringBuffer result = new StringBuffer (); 183 result.append(Agent.MESSAGE_TYPE); 184 result.append("<>"); 185 result.append(Agent.AGENT_MOVE); 186 result.append(" OR "); 187 result.append(Agent.RUNNER_PROP); 188 result.append(" LIKE '"); 189 result.append("%"+name+"%'"); 190 191 logger.exit("createFilter", result); 192 return result.toString(); 193 } 194 195 protected void sendQueryResponse() throws JMSException  196 { 197 logger.entry("sendQueryResponse"); 198 TopicSession session = 199 connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 200 Topic topic = session.createTopic(Agent.AGENT_TOPIC); 201 TopicPublisher publisher = session.createPublisher(topic); 202 203 Message msg = session.createMessage(); 204 msg.setIntProperty(Agent.MESSAGE_TYPE, Agent.QUERY_RESPONSE); 205 msg.setStringProperty(Agent.RUNNER_NAME, this.name); 206 publisher.publish(msg); 207 session.close(); 208 logger.exit("sendQueryResponse"); 209 } 210 211 212 protected void startAgents() 213 { 214 logger.entry("startAgents"); 215 216 synchronized (agents) { 217 Iterator itr = agents.iterator(); 218 while(itr.hasNext()) { 219 Agent agent = (Agent) itr.next(); 220 agent.startAgent(); 221 } 222 } 223 224 logger.exit("startAgents"); 225 } 226 227 protected void stopAgents() 228 { 229 logger.entry("stopAgents"); 230 231 synchronized (agents) { 232 Iterator itr = agents.iterator(); 233 while(itr.hasNext()) { 234 Agent agent = (Agent) itr.next(); 235 agent.stopAgent(); 236 } 237 } 238 239 logger.exit("stopAgents"); 240 } 241 242 246 protected class AgentAdapter implements Runnable  247 { 248 private final Agent agent; 249 250 AgentAdapter(Agent a) 251 { 252 logger.entry("AgentAdapter", a); 253 agent = a; 254 logger.exit("AgentAdapter", this); 255 } 256 257 public void run() 258 { 259 logger.entry("AgentAdapter-run"); 260 agents.add(agent); 261 agent.run(); 262 agents.remove(agent); 263 logger.exit("AgentAdapter-run"); 264 } 265 266 } 267 268 272 public static void main(String [] args) throws Exception  273 { 274 logger.entry("main", args); 275 if (args.length > 1) { 276 System.err.println("Usage: pubsubclient server:port"); 277 System.exit(1); 278 } 279 280 AgentRunner runner; 281 if (args.length == 0) { 282 runner = new AgentRunner(); 283 } else { 284 String server = args[0]; 285 int port = 2323; 286 int loc = server.indexOf(':'); 287 if (loc != -1) { 288 port = Integer.parseInt(server.substring(loc+1)); 289 server = server.substring(0, loc); 290 } 291 runner = new AgentRunner(server, port); 292 } 293 294 System.out.println("Agent runner started."); 295 296 BufferedReader input = new BufferedReader (new InputStreamReader (System.in)); 297 String choice = ""; 298 while(! choice.toLowerCase().equals("exit")) { 299 System.out.println("Type <exit> then ENTER to shutdown."); 300 choice = input.readLine(); 301 } 302 303 runner.stop(); 304 305 logger.exit("main"); 306 } 307 308 310 private static Logger logger = 311 LoggerFactory.getLogger(AgentRunner.class, null); 312 313 315 } 316
| Popular Tags
|