1 16 17 package org.apache.catalina.cluster.tcp; 18 19 import java.io.IOException ; 20 import java.util.HashMap ; 21 import java.util.Iterator ; 22 import java.util.Map ; 23 24 import javax.management.MBeanServer ; 25 import javax.management.ObjectName ; 26 27 import org.apache.catalina.cluster.ClusterSender; 28 import org.apache.catalina.cluster.Member; 29 import org.apache.catalina.cluster.io.XByteBuffer; 30 import org.apache.catalina.util.StringManager; 31 import org.apache.tomcat.util.IntrospectionUtils; 32 33 45 public class ReplicationTransmitter implements ClusterSender { 46 private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 47 .getLog(ReplicationTransmitter.class); 48 49 52 private static final String info = "ReplicationTransmitter/1.3"; 53 54 57 protected StringManager sm = StringManager.getManager(Constants.Package); 58 59 private Map map = new HashMap (); 60 61 public ReplicationTransmitter() { 62 } 63 64 67 private long nrOfRequests = 0; 68 69 72 private long totalBytes = 0; 73 74 private long failureCounter = 0; 75 76 79 private String replicationMode; 80 81 84 private long ackTimeout = 15000; 86 89 private boolean waitForAck = true; 90 91 94 private boolean autoConnect = true; 95 96 99 private boolean compress = true; 100 101 104 private Map properties = new HashMap (); 105 106 109 private SimpleTcpCluster cluster; 110 111 114 private ObjectName objectName; 115 116 118 123 public String getInfo() { 124 125 return (info); 126 127 } 128 129 132 public long getNrOfRequests() { 133 return nrOfRequests; 134 } 135 136 139 public long getTotalBytes() { 140 return totalBytes; 141 } 142 143 146 public long getFailureCounter() { 147 return failureCounter; 148 } 149 150 155 public String getReplicationMode() { 156 return replicationMode; 157 } 158 159 165 public void setReplicationMode(String mode) { 166 String msg = IDataSenderFactory.validateMode(mode); 167 if (msg == null) { 168 if (log.isDebugEnabled()) 169 log.debug("Setting replcation mode to " + mode); 170 this.replicationMode = mode; 171 } else 172 throw new IllegalArgumentException (msg); 173 174 } 175 176 181 public void setObjectName(ObjectName name) { 182 objectName = name; 183 } 184 185 public ObjectName getObjectName() { 186 return objectName; 187 } 188 189 192 public boolean isCompress() { 193 return compress; 194 } 195 196 200 public void setCompress(boolean compressMessageData) { 201 this.compress = compressMessageData; 202 } 203 204 207 public boolean isAutoConnect() { 208 return autoConnect; 209 } 210 211 215 public void setAutoConnect(boolean autoConnect) { 216 this.autoConnect = autoConnect; 217 setProperty("autoConnect", String.valueOf(autoConnect)); 218 219 } 220 221 224 public long getAckTimeout() { 225 return ackTimeout; 226 } 227 228 231 public void setAckTimeout(long ackTimeout) { 232 this.ackTimeout = ackTimeout; 233 setProperty("ackTimeout", String.valueOf(ackTimeout)); 234 } 235 236 239 public boolean isWaitForAck() { 240 return waitForAck; 241 } 242 243 247 public void setWaitForAck(boolean waitForAck) { 248 this.waitForAck = waitForAck; 249 setProperty("waitForAck", String.valueOf(waitForAck)); 250 } 251 252 257 public void setCatalinaCluster(SimpleTcpCluster cluster) { 258 this.cluster = cluster; 259 260 } 261 262 266 public boolean getIsSenderSynchronized() { 267 return IDataSenderFactory.SYNC_MODE.equals(replicationMode) 268 || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode); 269 } 270 271 274 280 public void setProperty(String name, Object value) { 281 if (log.isTraceEnabled()) 282 log.trace(sm.getString("ReplicationTransmitter.setProperty", name, 283 value)); 284 285 properties.put(name, value); 286 } 287 288 294 public Object getProperty(String key) { 295 if (log.isTraceEnabled()) 296 log.trace(sm.getString("ReplicationTransmitter.getProperty", key)); 297 return properties.get(key); 298 } 299 300 305 public Iterator getPropertyNames() { 306 return properties.keySet().iterator(); 307 } 308 309 314 public void removeProperty(String key) { 315 properties.remove(key); 316 } 317 318 320 326 public void sendMessage(String sessionId, byte[] indata, Member member) 327 throws java.io.IOException { 328 byte[] data = convertSenderData(indata); 329 String key = getKey(member); 330 IDataSender sender = (IDataSender) map.get(key); 331 sendMessageData(sessionId, data, sender); 332 } 333 334 340 public void sendMessage(String sessionId, byte[] indata) 341 throws java.io.IOException { 342 IDataSender[] senders = getSenders(); 343 byte[] data = convertSenderData(indata); 344 for (int i = 0; i < senders.length; i++) { 345 346 IDataSender sender = senders[i]; 347 try { 348 sendMessageData(sessionId, data, sender); 349 } catch (Exception x) { 350 351 if (!sender.getSuspect()) 352 log.warn("Unable to send replicated message to " + sender 353 + ", is server down?", x); 354 sender.setSuspect(true); 355 } 356 } 357 } 358 359 364 public void start() throws java.io.IOException { 365 if (cluster != null) { 366 ObjectName clusterName = cluster.getObjectName(); 367 try { 368 MBeanServer mserver = cluster.getMBeanServer(); 369 ObjectName transmitterName = new ObjectName (clusterName 370 .getDomain() 371 + ":type=ClusterSender,host=" 372 + clusterName.getKeyProperty("host")); 373 if (mserver.isRegistered(transmitterName)) { 374 if (log.isWarnEnabled()) 375 log.warn(sm.getString( 376 "cluster.mbean.register.allready", 377 transmitterName)); 378 return; 379 } 380 setObjectName(transmitterName); 381 mserver.registerMBean(cluster.getManagedBean(this), 382 getObjectName()); 383 } catch (Exception e) { 384 log.warn(e); 385 } 386 } 387 388 } 389 390 395 public synchronized void stop() { 396 Iterator i = map.entrySet().iterator(); 397 while (i.hasNext()) { 398 IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next()) 399 .getValue(); 400 try { 401 unregisterSenderMBean(sender); 402 sender.disconnect(); 403 } catch (Exception x) { 404 } 405 i.remove(); 406 } 407 if (cluster != null && getObjectName() != null) { 408 try { 409 MBeanServer mserver = cluster.getMBeanServer(); 410 mserver.unregisterMBean(getObjectName()); 411 } catch (Exception e) { 412 log.error(e); 413 } 414 } 415 416 } 417 418 423 public IDataSender[] getSenders() { 424 java.util.Iterator iter = map.entrySet().iterator(); 425 IDataSender[] array = new IDataSender[map.size()]; 426 int i = 0; 427 while (iter.hasNext()) { 428 IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter 429 .next()).getValue(); 430 if (sender != null) 431 array[i] = sender; 432 i++; 433 } 434 return array; 435 } 436 437 442 public ObjectName [] getSenderObjectNames() { 443 java.util.Iterator iter = map.entrySet().iterator(); 444 ObjectName array[] = new ObjectName [map.size()]; 445 int i = 0; 446 while (iter.hasNext()) { 447 IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter 448 .next()).getValue(); 449 if (sender != null) 450 array[i] = getSenderObjectName(sender); 451 i++; 452 } 453 return array; 454 } 455 456 459 public synchronized void resetStatistics() { 460 nrOfRequests = 0; 461 totalBytes = 0; 462 failureCounter = 0; 463 } 464 465 471 public synchronized void add(Member member) { 472 try { 473 String key = getKey(member); 474 if (!map.containsKey(key)) { 475 IDataSender sender = IDataSenderFactory.getIDataSender( 476 replicationMode, member); 477 transferSenderProperty(sender); 478 map.put(key, sender); 479 registerSenderMBean(member, sender); 480 } 481 } catch (java.io.IOException x) { 482 log.error("Unable to create and add a IDataSender object.", x); 483 } 484 } 485 486 491 public synchronized void remove(Member member) { 492 String key = getKey(member); 493 IDataSender toberemoved = (IDataSender) map.get(key); 494 if (toberemoved == null) 495 return; 496 unregisterSenderMBean(toberemoved); 497 toberemoved.disconnect(); 498 map.remove(key); 499 500 } 501 502 504 509 protected synchronized void addStats(int length) { 510 nrOfRequests++; 511 totalBytes += length; 512 if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) { 513 log.debug("Nr of bytes sent=" + totalBytes + " over " 514 + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests) 515 + " bytes/request; failures=" + failureCounter); 516 } 517 518 } 519 520 525 protected void transferSenderProperty(IDataSender sender) { 526 for (Iterator iter = getPropertyNames(); iter.hasNext();) { 527 String pkey = (String ) iter.next(); 528 Object value = getProperty(pkey); 529 IntrospectionUtils.setProperty(sender, pkey, value.toString()); 530 } 531 } 532 533 539 protected String getKey(Member member) { 540 return member.getHost() + ":" + member.getPort(); 541 } 542 543 549 protected void unregisterSenderMBean(IDataSender sender) { 550 try { 551 MBeanServer mserver = cluster.getMBeanServer(); 552 if (mserver != null) { 553 mserver.unregisterMBean(getSenderObjectName(sender)); 554 } 555 } catch (Exception e) { 556 log.warn(e); 557 } 558 } 559 560 566 protected void registerSenderMBean(Member member, IDataSender sender) { 567 if (member != null && cluster != null) { 568 try { 569 MBeanServer mserver = cluster.getMBeanServer(); 570 ObjectName senderName = getSenderObjectName(sender); 571 if (mserver.isRegistered(senderName)) { 572 if (log.isWarnEnabled()) 573 log.warn(sm.getString( 574 "cluster.mbean.register.allready", senderName)); 575 return; 576 } 577 mserver.registerMBean(cluster.getManagedBean(sender), 578 senderName); 579 } catch (Exception e) { 580 log.warn(e); 581 } 582 } 583 } 584 585 592 protected ObjectName getSenderObjectName(IDataSender sender) { 593 ObjectName senderName = null; 594 try { 595 ObjectName clusterName = cluster.getObjectName(); 596 MBeanServer mserver = cluster.getMBeanServer(); 597 senderName = new ObjectName (clusterName.getDomain() 598 + ":type=IDataSender,host=" 599 + clusterName.getKeyProperty("host") + ",senderAddress=" 600 + sender.getAddress().getHostAddress() + ",senderPort=" 601 + sender.getPort()); 602 } catch (Exception e) { 603 log.warn(e); 604 } 605 return senderName; 606 } 607 608 617 protected byte[] convertSenderData(byte[] data) throws IOException { 618 return XByteBuffer.createDataPackage(data, isCompress()); 619 } 620 621 638 protected void sendMessageData(String sessionId, byte[] data, 639 IDataSender sender) throws java.io.IOException { 640 if (sender == null) 641 throw new java.io.IOException ( 642 "Sender not available. Make sure sender information is available to the ReplicationTransmitter."); 643 try { 644 if (autoConnect && !sender.isConnected()) 645 sender.connect(); 646 sender.sendMessage(sessionId, data); 647 sender.setSuspect(false); 648 addStats(data.length); 649 } catch (Exception x) { 650 if (log.isWarnEnabled()) { 651 if (!sender.getSuspect()) { 652 log 653 .warn( 654 "Unable to send replicated message, is server down?", 655 x); 656 } 657 } 658 sender.setSuspect(true); 659 failureCounter++; 660 } 661 662 } 663 664 } | Popular Tags |