1 21 package com.presumo.mobileagent; 22 23 import com.presumo.util.log.Logger; 24 import com.presumo.util.log.LoggerFactory; 25 26 import java.io.BufferedReader ; 27 import java.io.InputStreamReader ; 28 import java.io.Serializable ; 29 30 import java.util.Vector ; 31 32 import javax.jms.ExceptionListener ; 33 import javax.jms.Topic ; 34 import javax.jms.TopicConnection ; 35 import javax.jms.TopicSession ; 36 import javax.jms.TopicSubscriber ; 37 import javax.jms.TopicPublisher ; 38 import javax.jms.Message ; 39 import javax.jms.MessageListener ; 40 import javax.jms.ObjectMessage ; 41 import javax.jms.JMSException ; 42 import javax.jms.Session ; 43 44 45 63 public abstract class Agent implements Serializable , Runnable 64 { 65 66 public static final String AGENT_TOPIC = "MobileAgentAdmin"; 67 68 69 public static final String RUNNER_PROP = "RunnerTargets"; 70 71 72 public static final String RUNNER_NAME = "RunnerName"; 73 74 75 public static final String MESSAGE_TYPE = "AgentMessageType"; 76 77 public static final int AGENT_MOVE = 0; 78 public static final int AGENT_START = 1; 79 public static final int AGENT_STOP = 2; 80 public static final int RUNNER_QUERY = 3; 81 public static final int QUERY_RESPONSE = 4; 82 83 84 85 protected transient TopicConnection connx; 86 protected transient TopicSession agentSession; 87 private transient Topic agentTopic; 88 89 93 97 public Agent() 98 { 99 logger.entry("Agent"); 100 logger.exit("Agent", this); 101 } 102 103 107 public Agent(TopicConnection connx) 108 { 109 logger.entry("Agent", connx); 110 setConnection(connx); 111 logger.exit("Agent", this); 112 } 113 114 118 121 public void moveTo(String runnerName) throws JMSException 122 { 123 logger.entry("moveTo", runnerName); 124 moveTo(new String [] { runnerName }); 125 logger.exit("moveTo"); 126 } 127 128 131 public synchronized void moveTo(String [] runnerNames) 132 throws JMSException 133 { 134 logger.entry("moveTo", runnerNames); 135 TopicPublisher publisher = agentSession.createPublisher(agentTopic); 136 try { 137 Message msg = createRunnerMoveMessage(runnerNames); 138 publisher.publish(msg); 139 } finally { 140 publisher.close(); 141 } 142 logger.exit("moveTo"); 143 } 144 145 146 149 public synchronized String [] getAvailableRunners(long timeout) 150 throws JMSException 151 { 152 logger.entry("getAvailableRunners", new Long (timeout)); 153 154 TopicSubscriber sub = null; 155 TopicPublisher pub = null; 156 Vector runners = new Vector (); 157 158 try { 159 sub = createQueryResponseReceiver(); 160 pub = agentSession.createPublisher(agentTopic); 161 pub.publish( createQueryMessage() ); 162 163 164 long start = System.currentTimeMillis(); 165 long diff = 0; 166 while(diff < timeout) { 167 Message msg = sub.receive(timeout - diff); 168 if (msg != null) { 169 String name = (String ) msg.getObjectProperty(RUNNER_NAME); 170 logger.debug("----> Found runner: " + name); 171 if (name != null && !runners.contains(name)) { 172 runners.add(name); 173 } 174 } 175 diff = System.currentTimeMillis() - start; 176 } 177 } finally { 178 if (sub != null) sub.close(); 179 if (pub != null) pub.close(); 180 } 181 182 String [] retval = new String [runners.size()]; 183 runners.toArray(retval); 184 185 logger.exit("getAvailableRunners", retval); 186 return retval; 187 } 188 189 193 public void setConnection(TopicConnection connx) 194 { 195 logger.entry("setConnection", connx); 196 197 try { 198 this.connx = connx; 199 agentSession = connx.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 200 agentTopic = agentSession.createTopic(AGENT_TOPIC); 201 } catch (JMSException jmsex) { 202 logger.exception(jmsex); 203 } 204 205 logger.exit("setConnection"); 206 } 207 208 219 public abstract void startAgent(); 220 221 229 public abstract void stopAgent(); 230 231 232 public abstract void runAgent(); 233 234 public final void run() 235 { 236 logger.entry("run"); 237 238 runAgent(); 239 240 if (agentSession != null) { 241 try { 242 agentSession.close(); 243 } catch (JMSException jmsex) { 244 logger.exception(jmsex); 245 } 246 } 247 248 logger.exit("run"); 249 } 250 256 260 269 protected Message createRunnerMoveMessage(String [] runnerTargets) 270 throws JMSException 271 { 272 logger.entry("createRunnerMoveMessage", runnerTargets); 273 274 StringBuffer buf = new StringBuffer (); 275 for (int i=0; i < runnerTargets.length; ++i) { 276 buf.append(runnerTargets[i]); 277 if (i < (runnerTargets.length-1)) buf.append(';'); 278 } 279 280 ObjectMessage msg = agentSession.createObjectMessage(this); 281 msg.setIntProperty(MESSAGE_TYPE, AGENT_MOVE); 282 msg.setStringProperty(RUNNER_PROP, buf.toString()); 283 284 logger.exit("createRunnerMoveMessage", msg); 285 return msg; 286 } 287 288 289 293 protected Message createQueryMessage() throws JMSException 294 { 295 logger.entry("createQueryMessage"); 296 297 Message msg = agentSession.createMessage(); 298 msg.setIntProperty(MESSAGE_TYPE, RUNNER_QUERY); 299 300 logger.exit("createQueryMessage", msg); 301 return msg; 302 } 303 304 305 309 protected TopicSubscriber createQueryResponseReceiver() throws JMSException 310 { 311 logger.entry("createQueryResponseReceiver"); 312 313 StringBuffer filter = new StringBuffer (); 314 filter.append(MESSAGE_TYPE); 315 filter.append("="); 316 filter.append(QUERY_RESPONSE); 317 318 TopicSubscriber sub = 319 agentSession.createSubscriber(agentTopic, filter.toString(), false); 320 321 logger.exit("createqueryResponseReceiver", sub); 322 return sub; 323 } 324 325 326 327 329 private static Logger logger = LoggerFactory.getLogger(Agent.class, null); 330 331 333 } 334 | Popular Tags |