1 23 package org.objectweb.joram.mom.proxies; 24 25 import java.io.IOException ; 26 import java.io.ObjectInputStream ; 27 import java.io.ObjectOutputStream ; 28 import java.util.Collection ; 29 import java.util.Enumeration ; 30 import java.util.Hashtable ; 31 import java.util.Iterator ; 32 33 import org.objectweb.joram.shared.client.AbstractJmsReply; 34 import org.objectweb.joram.shared.client.AbstractJmsRequest; 35 import org.objectweb.joram.shared.client.CnxCloseRequest; 36 import org.objectweb.joram.shared.client.JmsRequestGroup; 37 import org.objectweb.joram.shared.client.ProducerMessages; 38 import org.objectweb.joram.shared.client.ServerReply; 39 import org.objectweb.joram.shared.client.MomExceptionReply; 40 import org.objectweb.joram.shared.excepts.MomException; 41 42 import org.objectweb.joram.mom.proxies.ProxyImpl; 43 import org.objectweb.joram.mom.proxies.SendReplyNot; 44 import org.objectweb.joram.mom.proxies.ProxyAgentItf; 45 import org.objectweb.joram.mom.notifications.WakeUpNot; 46 47 48 import fr.dyade.aaa.agent.Agent; 49 import fr.dyade.aaa.agent.AgentId; 50 import fr.dyade.aaa.agent.BagSerializer; 51 import fr.dyade.aaa.agent.Notification; 52 import fr.dyade.aaa.agent.UnknownNotificationException; 53 54 import fr.dyade.aaa.util.Timer; 55 import fr.dyade.aaa.util.TimerTask; 56 57 import fr.dyade.aaa.util.Queue; 58 import fr.dyade.aaa.util.management.MXWrapper; 59 import org.objectweb.joram.shared.JoramTracing; 60 import org.objectweb.util.monolog.api.BasicLevel; 61 62 65 public class UserAgent extends Agent implements BagSerializer, ProxyAgentItf { 66 69 private ProxyImpl proxyImpl; 70 71 76 private transient Hashtable connections; 77 78 private transient Hashtable heartBeatTasks; 79 80 83 private int keyCounter; 84 85 90 public UserAgent() { 91 super(true); 92 init(); 93 } 94 95 100 public UserAgent(int stamp) { 101 super("JoramAdminProxy", true, stamp); 102 init(); 103 } 104 105 private void init() { 106 proxyImpl = new ProxyImpl(this); 107 keyCounter = 0; 108 } 109 110 private transient CleaningTask cleaningTask; 111 112 113 public void agentInitialize(boolean firstTime) throws Exception { 114 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 115 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 116 "UserAgent.agentInitialize(" + firstTime + ')'); 117 118 super.agentInitialize(firstTime); 119 proxyImpl.initialize(firstTime); 120 cleaningTask = new CleaningTask(); 121 cleaningTask.schedule(); 122 MXWrapper.registerMBean(proxyImpl, "Joram", getMBeanName()); 123 } 124 125 126 public void agentFinalize(boolean lastTime) { 127 try { 128 MXWrapper.unregisterMBean("Joram", getMBeanName()); 129 } catch (Exception exc) { 130 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 131 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "", exc); 132 } 133 super.agentFinalize(lastTime); 134 } 135 136 private String getMBeanName() { 137 return new StringBuffer ().append("type=User").append(",name=").append( 138 (name == nullName) ? getId().toString() : name).toString(); 139 } 140 141 150 public void react(AgentId from, Notification not) throws Exception { 151 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 152 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 153 "UserAgent.react(" + from + ',' + not + ')'); 154 155 setNoSave(); 158 159 if (not instanceof OpenConnectionNot) { 160 doReact((OpenConnectionNot) not); 161 } else if (not instanceof GetConnectionNot) { 162 doReact((GetConnectionNot) not); 163 } else if (not instanceof CloseConnectionNot) { 164 doReact((CloseConnectionNot) not); 165 } else if (not instanceof ResetCollocatedConnectionsNot) { 166 doReact((ResetCollocatedConnectionsNot) not); 167 } else if (not instanceof SendReplyNot) { 168 doReact((SendReplyNot) not); 169 } else if (not instanceof RequestNot) { 170 doReact((RequestNot) not); 171 } else if (not instanceof ReturnConnectionNot) { 172 doReact((ReturnConnectionNot) not); 173 } else if (not instanceof SendRepliesNot) { 174 doReact((SendRepliesNot) not); 175 } else if (not instanceof ProxyRequestGroupNot) { 176 doReact((ProxyRequestGroupNot) not); 177 } else if (not instanceof WakeUpNot) { 178 try { 179 proxyImpl.cleanPendingMessages(System.currentTimeMillis()); 180 } catch (Exception exc) { 181 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 182 JoramTracing.dbgDestination.log(BasicLevel.ERROR, 183 "--- " + this + " Proxy(...)", exc); 184 } 185 if (cleaningTask == null) 186 cleaningTask = new CleaningTask(); 187 cleaningTask.schedule(); 188 } else { 189 try { 190 proxyImpl.react(from, not); 191 } catch (UnknownNotificationException exc) { 192 super.react(from, not); 193 } 194 } 195 } 196 197 200 private void doReact(OpenConnectionNot not) { 201 setSave(); 203 204 if (connections == null) { 205 connections = new Hashtable (); 206 heartBeatTasks = new Hashtable (); 207 } 208 209 Integer objKey = new Integer (keyCounter); 210 ConnectionContext ctx; 211 if (not.getReliable()) { 212 ctx = new ReliableConnectionContext( 213 proxyImpl, keyCounter, 214 not.getHeartBeat()); 215 connections.put(objKey, ctx); 216 } else { 217 ctx = new StandardConnectionContext( 218 proxyImpl, keyCounter); 219 connections.put(objKey, ctx); 220 } 221 222 if (not.getHeartBeat() > 0) { 223 HeartBeatTask heartBeatTask = new HeartBeatTask(2 * not.getHeartBeat(), 224 objKey); 225 heartBeatTasks.put(objKey, heartBeatTask); 226 heartBeatTask.start(); 227 } 228 229 sendTo(getId(), new ReturnConnectionNot(not, ctx)); 232 keyCounter++; 233 } 234 235 239 private void doReact(ReturnConnectionNot not) { 240 not.Return(); 241 } 242 243 private void doReact(GetConnectionNot not) { 244 int key = not.getKey(); 245 if (connections == null) { 246 not.Throw(new Exception ("Connection " + key + " not found")); 247 } else { 248 Integer objKey = new Integer (key); 249 ReliableConnectionContext ctx = (ReliableConnectionContext) connections 250 .get(objKey); 251 if (ctx == null) { 252 not.Throw(new Exception ("Connection " + key + " not found")); 253 } else { 254 not.Return(ctx); 255 } 256 } 257 } 258 259 private void doReact(RequestNot not) { 260 Integer key = new Integer (not.getConnectionKey()); 261 if (connections != null) { 262 ConnectionContext ctx = (ConnectionContext) connections.get(key); 263 if (ctx != null) { 264 HeartBeatTask heartBeatTask = (HeartBeatTask) heartBeatTasks.get(key); 265 if (heartBeatTask != null) { 266 heartBeatTask.touch(); 267 } 268 269 AbstractJmsRequest request = ctx.getRequest(not.getMessage()); 270 proxyImpl.reactToClientRequest(key.intValue(), request); 271 272 if (ctx.isClosed()) { 273 connections.remove(key); 275 HeartBeatTask hbt = (HeartBeatTask) heartBeatTasks.remove(key); 276 if (hbt != null) { 277 hbt.cancel(); 278 } 279 } 280 } 281 } 282 } 287 288 private void doReact(ProxyRequestGroupNot not) { 289 RequestNot[] requests = not.getRequests(); 290 RequestBuffer rm = new RequestBuffer(this); 291 for (int i = 0; i < requests.length; i++) { 292 RequestNot req = requests[i]; 293 Integer key = new Integer (req.getConnectionKey()); 294 HeartBeatTask heartBeatTask = (HeartBeatTask) heartBeatTasks.get(key); 295 if (heartBeatTask != null) { 296 heartBeatTask.touch(); 297 } 298 ConnectionContext ctx = (ConnectionContext) connections.get(key); 299 if (ctx != null) { 300 AbstractJmsRequest request = ctx.getRequest(req.getMessage()); 301 if (request instanceof ProducerMessages) { 302 ProducerMessages pm = (ProducerMessages) request; 303 rm.put(req.getConnectionKey(), pm); 304 } else if (request instanceof JmsRequestGroup) { 305 JmsRequestGroup jrg = (JmsRequestGroup)request; 306 AbstractJmsRequest[] groupedRequests = jrg.getRequests(); 307 for (int j = 0; j < groupedRequests.length; j++) { 308 if (groupedRequests[i] instanceof ProducerMessages) { 309 ProducerMessages pm = (ProducerMessages) groupedRequests[i]; 310 rm.put(req.getConnectionKey(), pm); 311 } else { 312 proxyImpl.reactToClientRequest(key.intValue(), groupedRequests[i]); 313 } 314 } 315 } else { 316 proxyImpl.reactToClientRequest(key.intValue(), request); 317 } 318 } 319 } 320 rm.flush(); 321 } 322 323 private void doReact(CloseConnectionNot not) { 324 if (connections != null) { 325 Integer key = new Integer (not.getKey()); 326 if (connections.remove(key) != null) { 329 proxyImpl.reactToClientRequest(not.getKey(), new CnxCloseRequest()); 330 heartBeatTasks.remove(key); 331 } 332 } 333 } 338 339 private void doReact(ResetCollocatedConnectionsNot not) { 340 if (connections != null) { 341 Collection values = connections.values(); 342 Iterator iterator = values.iterator(); 343 while (iterator.hasNext()) { 344 Object obj = iterator.next(); 345 if (obj instanceof StandardConnectionContext) { 348 ConnectionContext cc = (ConnectionContext) obj; 349 proxyImpl.reactToClientRequest( 350 cc.getKey(), new CnxCloseRequest()); 351 iterator.remove(); 352 } 353 } 354 } 355 } 356 357 private void doReact(SendRepliesNot not) { 358 Enumeration en = not.getReplies(); 359 while (en.hasMoreElements()) { 360 SendReplyNot sr = (SendReplyNot) en.nextElement(); 361 doReact(sr); 362 } 363 } 364 365 370 private void doReact(SendReplyNot not) { 371 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 372 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "UserAgent.doReact(" + not + ')'); 373 ClientContext cc = proxyImpl.getClientContext(not.getKey()); 374 if (cc != null) { 375 if (cc.setReply(not.getRequestId()) == 0) { 376 sendToClient(not.getKey(), new ServerReply(not.getRequestId())); 377 } 378 } else if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) { 379 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 382 "UserAgent: unknown client context for " + not); 383 } 384 } 385 386 392 public void sendNot(AgentId to, Notification not) { 393 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 394 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 395 "UserAgent.sendNot(" + to + ',' + not + ')'); 396 sendTo(to, not); 397 } 398 399 407 public void sendToClient(int key, AbstractJmsReply reply) { 408 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 409 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 410 "UserAgent.sendToClient(" + key + ',' + reply + ')'); 411 Integer objKey = new Integer (key); 412 if (connections != null) { 413 ConnectionContext ctx = (ConnectionContext)connections.get(objKey); 414 if (ctx != null) { 415 ctx.pushReply(reply); 416 } 417 } 418 } 420 421 425 class HeartBeatTask extends fr.dyade.aaa.util.TimerTask implements 426 java.io.Serializable { 427 private int timeout; 428 429 private Integer key; 430 431 private long lastRequestDate; 432 433 HeartBeatTask(int timeout, Integer key) { 434 this.timeout = timeout; 435 this.key = key; 436 } 437 438 public void run() { 439 long date = System.currentTimeMillis(); 440 if ((date - lastRequestDate) > timeout) { 441 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 442 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 443 "HeartBeatTask: close connection"); 444 ConnectionContext ctx = (ConnectionContext) connections.remove(key); 445 heartBeatTasks.remove(key); 446 proxyImpl.reactToClientRequest(key.intValue(), new CnxCloseRequest()); 447 MomException exc = new MomException(MomExceptionReply.HBCloseConnection, 448 "Connection " + getId() + ':' + key + " closed"); 449 ctx.pushError(exc); 450 } else { 451 start(); 452 } 453 } 454 455 public void start() { 456 try { 457 ConnectionManager.getTimer().schedule(this, timeout); 458 } catch (Exception exc) { 459 throw new Error (exc.toString()); 460 } 461 } 462 463 public void touch() { 464 lastRequestDate = System.currentTimeMillis(); 465 } 466 } 467 468 class CleaningTask extends TimerTask { 469 CleaningTask() { 470 } 471 472 473 public void run() { 474 sendTo(getId(), new WakeUpNot()); 475 } 476 477 public void schedule() { 478 long period = proxyImpl.getPeriod(); 479 480 if (period != -1) { 481 try { 482 Timer timer = ConnectionManager.getTimer(); 483 timer.schedule(this, period); 484 } catch (Exception exc) { 485 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 486 JoramTracing.dbgDestination.log(BasicLevel.ERROR, 487 "--- " + this + " Proxy(...)", exc); 488 } 489 } 490 } 491 } 492 493 public void setNoSave() { 494 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 495 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "setNoSave()"); 496 497 super.setNoSave(); 498 } 499 500 public void setSave() { 501 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 502 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "UserAgent.setSave()"); 503 504 super.setSave(); 505 } 506 507 public void readBag(ObjectInputStream in) throws IOException , 508 ClassNotFoundException { 509 connections = (Hashtable ) in.readObject(); 510 heartBeatTasks = (Hashtable ) in.readObject(); 511 512 if (heartBeatTasks != null) { 513 Enumeration tasks = heartBeatTasks.elements(); 515 while (tasks.hasMoreElements()) { 516 HeartBeatTask task = (HeartBeatTask) tasks.nextElement(); 517 task.start(); 518 } 519 } 520 521 proxyImpl.readBag(in); 522 } 523 524 public void writeBag(ObjectOutputStream out) throws IOException { 525 out.writeObject(connections); 526 out.writeObject(heartBeatTasks); 527 proxyImpl.writeBag(out); 528 } 529 } 530 | Popular Tags |