KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > tcp > ReplicationTransmitter


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */

16
17 package org.apache.catalina.cluster.tcp;
18
19 import java.io.IOException JavaDoc;
20 import java.util.HashMap JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.Map JavaDoc;
23
24 import javax.management.MBeanServer JavaDoc;
25 import javax.management.ObjectName JavaDoc;
26
27 import org.apache.catalina.cluster.ClusterSender;
28 import org.apache.catalina.cluster.Member;
29 import org.apache.catalina.cluster.io.XByteBuffer;
30 import org.apache.catalina.util.StringManager;
31 import org.apache.tomcat.util.IntrospectionUtils;
32
33 /**
34  * Transmit message to ohter cluster members create sender from replicationMode
35  * type
36  * FIXME i18n log messages
37  * FIXME compress data depends on message type and size
38  * FIXME send very big messages at some block see FarmWarDeployer!
39  * TODO pause and resume senders
40  *
41  * @author Peter Rossbach
42  * @author Filip Hanik
43  * @version $Revision: 1.22 $ $Date: 2005/03/25 22:12:31 $
44  */

45 public class ReplicationTransmitter implements ClusterSender {
46     private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
47             .getLog(ReplicationTransmitter.class);
48
49     /**
50      * The descriptive information about this implementation.
51      */

52     private static final String JavaDoc info = "ReplicationTransmitter/1.3";
53
54     /**
55      * The string manager for this package.
56      */

57     protected StringManager sm = StringManager.getManager(Constants.Package);
58
59     private Map JavaDoc map = new HashMap JavaDoc();
60
61     public ReplicationTransmitter() {
62     }
63
64     /**
65      * number of transmitted messages>
66      */

67     private long nrOfRequests = 0;
68
69     /**
70      * number of transmitted bytes
71      */

72     private long totalBytes = 0;
73
74     private long failureCounter = 0;
75
76     /**
77      * current sender replication mode
78      */

79     private String JavaDoc replicationMode;
80
81     /**
82      * sender default ackTimeout
83      */

84     private long ackTimeout = 15000; //15 seconds by default
85

86     /**
87      * enabled wait for ack
88      */

89     private boolean waitForAck = true;
90
91     /**
92      * autoConnect sender when next message send
93      */

94     private boolean autoConnect = true;
95
96     /**
97      * Compress message data bytes
98      */

99     private boolean compress = true;
100
101     /**
102      * dynamic sender <code>properties</code>
103      */

104     private Map JavaDoc properties = new HashMap JavaDoc();
105
106     /**
107      * my cluster
108      */

109     private SimpleTcpCluster cluster;
110
111     /**
112      * Transmitter Mbean name
113      */

114     private ObjectName JavaDoc objectName;
115
116     // ------------------------------------------------------------- Properties
117

118     /**
119      * Return descriptive information about this implementation and the
120      * corresponding version number, in the format
121      * <code>&lt;description&gt;/&lt;version&gt;</code>.
122      */

123     public String JavaDoc getInfo() {
124
125         return (info);
126
127     }
128
129     /**
130      * @return Returns the nrOfRequests.
131      */

132     public long getNrOfRequests() {
133         return nrOfRequests;
134     }
135
136     /**
137      * @return Returns the totalBytes.
138      */

139     public long getTotalBytes() {
140         return totalBytes;
141     }
142
143     /**
144      * @return Returns the failureCounter.
145      */

146     public long getFailureCounter() {
147         return failureCounter;
148     }
149
150     /**
151      * current replication mode
152      *
153      * @return
154      */

155     public String JavaDoc getReplicationMode() {
156         return replicationMode;
157     }
158
159     /**
160      * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue)
161      *
162      * @see IDataSenderFactory#validateMode(String)
163      * @param mode
164      */

165     public void setReplicationMode(String JavaDoc mode) {
166         String JavaDoc msg = IDataSenderFactory.validateMode(mode);
167         if (msg == null) {
168             if (log.isDebugEnabled())
169                 log.debug("Setting replcation mode to " + mode);
170             this.replicationMode = mode;
171         } else
172             throw new IllegalArgumentException JavaDoc(msg);
173
174     }
175
176     /**
177      * Transmitter ObjectName
178      *
179      * @param name
180      */

181     public void setObjectName(ObjectName JavaDoc name) {
182         objectName = name;
183     }
184
185     public ObjectName JavaDoc getObjectName() {
186         return objectName;
187     }
188
189     /**
190      * @return Returns the compress.
191      */

192     public boolean isCompress() {
193         return compress;
194     }
195
196     /**
197      * @param compress
198      * The compress to set.
199      */

200     public void setCompress(boolean compressMessageData) {
201         this.compress = compressMessageData;
202     }
203
204     /**
205      * @return Returns the autoConnect.
206      */

207     public boolean isAutoConnect() {
208         return autoConnect;
209     }
210
211     /**
212      * @param autoConnect
213      * The autoConnect to set.
214      */

215     public void setAutoConnect(boolean autoConnect) {
216         this.autoConnect = autoConnect;
217         setProperty("autoConnect", String.valueOf(autoConnect));
218
219     }
220
221     /**
222      * @return
223      */

224     public long getAckTimeout() {
225         return ackTimeout;
226     }
227
228     /**
229      * @param ackTimeout
230      */

231     public void setAckTimeout(long ackTimeout) {
232         this.ackTimeout = ackTimeout;
233         setProperty("ackTimeout", String.valueOf(ackTimeout));
234     }
235
236     /**
237      * @return Returns the waitForAck.
238      */

239     public boolean isWaitForAck() {
240         return waitForAck;
241     }
242
243     /**
244      * @param waitForAck
245      * The waitForAck to set.
246      */

247     public void setWaitForAck(boolean waitForAck) {
248         this.waitForAck = waitForAck;
249         setProperty("waitForAck", String.valueOf(waitForAck));
250     }
251
252     /*
253      * configured in cluster
254      *
255      * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster)
256      */

257     public void setCatalinaCluster(SimpleTcpCluster cluster) {
258         this.cluster = cluster;
259
260     }
261
262     /**
263      * @return
264      * @deprecated since version 5.5.7
265      */

266     public boolean getIsSenderSynchronized() {
267         return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
268                 || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
269     }
270
271     // ------------------------------------------------------------- dynamic
272
// sender property handling
273

274     /**
275      * set config attributes with reflect
276      *
277      * @param name
278      * @param value
279      */

280     public void setProperty(String JavaDoc name, Object JavaDoc value) {
281         if (log.isTraceEnabled())
282             log.trace(sm.getString("ReplicationTransmitter.setProperty", name,
283                     value));
284
285         properties.put(name, value);
286     }
287
288     /**
289      * get current config
290      *
291      * @param key
292      * @return
293      */

294     public Object JavaDoc getProperty(String JavaDoc key) {
295         if (log.isTraceEnabled())
296             log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
297         return properties.get(key);
298     }
299
300     /**
301      * Get all properties keys
302      *
303      * @return
304      */

305     public Iterator JavaDoc getPropertyNames() {
306         return properties.keySet().iterator();
307     }
308
309     /**
310      * remove a configured property.
311      *
312      * @param key
313      */

314     public void removeProperty(String JavaDoc key) {
315         properties.remove(key);
316     }
317
318     // ------------------------------------------------------------- public
319

320     /**
321      * Send data to one member
322      *
323      * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
324      * byte[], org.apache.catalina.cluster.Member)
325      */

326     public void sendMessage(String JavaDoc sessionId, byte[] indata, Member member)
327             throws java.io.IOException JavaDoc {
328         byte[] data = convertSenderData(indata);
329         String JavaDoc key = getKey(member);
330         IDataSender sender = (IDataSender) map.get(key);
331         sendMessageData(sessionId, data, sender);
332     }
333
334     /**
335      * send message to all senders (broadcast)
336      *
337      * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
338      * byte[])
339      */

340     public void sendMessage(String JavaDoc sessionId, byte[] indata)
341             throws java.io.IOException JavaDoc {
342         IDataSender[] senders = getSenders();
343         byte[] data = convertSenderData(indata);
344         for (int i = 0; i < senders.length; i++) {
345
346             IDataSender sender = senders[i];
347             try {
348                 sendMessageData(sessionId, data, sender);
349             } catch (Exception JavaDoc x) {
350
351                 if (!sender.getSuspect())
352                     log.warn("Unable to send replicated message to " + sender
353                             + ", is server down?", x);
354                 sender.setSuspect(true);
355             }
356         }
357     }
358
359     /**
360      * start the sender and register transmitter mbean
361      *
362      * @see org.apache.catalina.cluster.ClusterSender#start()
363      */

364     public void start() throws java.io.IOException JavaDoc {
365         if (cluster != null) {
366             ObjectName JavaDoc clusterName = cluster.getObjectName();
367             try {
368                 MBeanServer JavaDoc mserver = cluster.getMBeanServer();
369                 ObjectName JavaDoc transmitterName = new ObjectName JavaDoc(clusterName
370                         .getDomain()
371                         + ":type=ClusterSender,host="
372                         + clusterName.getKeyProperty("host"));
373                 if (mserver.isRegistered(transmitterName)) {
374                     if (log.isWarnEnabled())
375                         log.warn(sm.getString(
376                                 "cluster.mbean.register.allready",
377                                 transmitterName));
378                     return;
379                 }
380                 setObjectName(transmitterName);
381                 mserver.registerMBean(cluster.getManagedBean(this),
382                         getObjectName());
383             } catch (Exception JavaDoc e) {
384                 log.warn(e);
385             }
386         }
387
388     }
389
390     /*
391      * stop the sender and deregister mbeans (transmitter, senders)
392      *
393      * @see org.apache.catalina.cluster.ClusterSender#stop()
394      */

395     public synchronized void stop() {
396         Iterator JavaDoc i = map.entrySet().iterator();
397         while (i.hasNext()) {
398             IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next())
399                     .getValue();
400             try {
401                 unregisterSenderMBean(sender);
402                 sender.disconnect();
403             } catch (Exception JavaDoc x) {
404             }
405             i.remove();
406         }
407         if (cluster != null && getObjectName() != null) {
408             try {
409                 MBeanServer JavaDoc mserver = cluster.getMBeanServer();
410                 mserver.unregisterMBean(getObjectName());
411             } catch (Exception JavaDoc e) {
412                 log.error(e);
413             }
414         }
415
416     }
417
418     /**
419      * get all current senders
420      *
421      * @return
422      */

423     public IDataSender[] getSenders() {
424         java.util.Iterator JavaDoc iter = map.entrySet().iterator();
425         IDataSender[] array = new IDataSender[map.size()];
426         int i = 0;
427         while (iter.hasNext()) {
428             IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
429                     .next()).getValue();
430             if (sender != null)
431                 array[i] = sender;
432             i++;
433         }
434         return array;
435     }
436
437     /**
438      * get all current senders
439      *
440      * @return
441      */

442     public ObjectName JavaDoc[] getSenderObjectNames() {
443         java.util.Iterator JavaDoc iter = map.entrySet().iterator();
444         ObjectName JavaDoc array[] = new ObjectName JavaDoc[map.size()];
445         int i = 0;
446         while (iter.hasNext()) {
447             IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
448                     .next()).getValue();
449             if (sender != null)
450                 array[i] = getSenderObjectName(sender);
451             i++;
452         }
453         return array;
454     }
455
456     /*
457      * Reset sender statistics
458      */

459     public synchronized void resetStatistics() {
460         nrOfRequests = 0;
461         totalBytes = 0;
462         failureCounter = 0;
463     }
464
465     /*
466      * add new cluster member and create sender ( s. replicationMode) transfer
467      * current properties to sender
468      *
469      * @see org.apache.catalina.cluster.ClusterSender#add(org.apache.catalina.cluster.Member)
470      */

471     public synchronized void add(Member member) {
472         try {
473             String JavaDoc key = getKey(member);
474             if (!map.containsKey(key)) {
475                 IDataSender sender = IDataSenderFactory.getIDataSender(
476                         replicationMode, member);
477                 transferSenderProperty(sender);
478                 map.put(key, sender);
479                 registerSenderMBean(member, sender);
480             }
481         } catch (java.io.IOException JavaDoc x) {
482             log.error("Unable to create and add a IDataSender object.", x);
483         }
484     }
485
486     /**
487      * remove sender from transmitter. ( deregister mbean and disconnect sender )
488      *
489      * @see org.apache.catalina.cluster.ClusterSender#remove(org.apache.catalina.cluster.Member)
490      */

491     public synchronized void remove(Member member) {
492         String JavaDoc key = getKey(member);
493         IDataSender toberemoved = (IDataSender) map.get(key);
494         if (toberemoved == null)
495             return;
496         unregisterSenderMBean(toberemoved);
497         toberemoved.disconnect();
498         map.remove(key);
499
500     }
501
502     // ------------------------------------------------------------- protected
503

504     /**
505      * calc number of requests and transfered bytes. Log stats all 100 requets
506      *
507      * @param length
508      */

509     protected synchronized void addStats(int length) {
510         nrOfRequests++;
511         totalBytes += length;
512         if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
513             log.debug("Nr of bytes sent=" + totalBytes + " over "
514                     + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests)
515                     + " bytes/request; failures=" + failureCounter);
516         }
517
518     }
519
520     /**
521      * Transfer all properties from transmitter to concrete sender
522      *
523      * @param sender
524      */

525     protected void transferSenderProperty(IDataSender sender) {
526         for (Iterator JavaDoc iter = getPropertyNames(); iter.hasNext();) {
527             String JavaDoc pkey = (String JavaDoc) iter.next();
528             Object JavaDoc value = getProperty(pkey);
529             IntrospectionUtils.setProperty(sender, pkey, value.toString());
530         }
531     }
532
533     /**
534      * set unique key to find sender
535      *
536      * @param member
537      * @return concat member.host:member.port
538      */

539     protected String JavaDoc getKey(Member member) {
540         return member.getHost() + ":" + member.getPort();
541     }
542
543     /**
544      * unregsister sendern Mbean
545      *
546      * @see #getSenderObjectName(IDataSender)
547      * @param sender
548      */

549     protected void unregisterSenderMBean(IDataSender sender) {
550         try {
551             MBeanServer JavaDoc mserver = cluster.getMBeanServer();
552             if (mserver != null) {
553                 mserver.unregisterMBean(getSenderObjectName(sender));
554             }
555         } catch (Exception JavaDoc e) {
556             log.warn(e);
557         }
558     }
559
560     /**
561      * register MBean and check it exist (big problem!)
562      *
563      * @param member
564      * @param sender
565      */

566     protected void registerSenderMBean(Member member, IDataSender sender) {
567         if (member != null && cluster != null) {
568             try {
569                 MBeanServer JavaDoc mserver = cluster.getMBeanServer();
570                 ObjectName JavaDoc senderName = getSenderObjectName(sender);
571                 if (mserver.isRegistered(senderName)) {
572                     if (log.isWarnEnabled())
573                         log.warn(sm.getString(
574                                 "cluster.mbean.register.allready", senderName));
575                     return;
576                 }
577                 mserver.registerMBean(cluster.getManagedBean(sender),
578                         senderName);
579             } catch (Exception JavaDoc e) {
580                 log.warn(e);
581             }
582         }
583     }
584
585     /**
586      * build sender ObjectName (
587      * engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port" )
588      *
589      * @param sender
590      * @return
591      */

592     protected ObjectName JavaDoc getSenderObjectName(IDataSender sender) {
593         ObjectName JavaDoc senderName = null;
594         try {
595             ObjectName JavaDoc clusterName = cluster.getObjectName();
596             MBeanServer JavaDoc mserver = cluster.getMBeanServer();
597             senderName = new ObjectName JavaDoc(clusterName.getDomain()
598                     + ":type=IDataSender,host="
599                     + clusterName.getKeyProperty("host") + ",senderAddress="
600                     + sender.getAddress().getHostAddress() + ",senderPort="
601                     + sender.getPort());
602         } catch (Exception JavaDoc e) {
603             log.warn(e);
604         }
605         return senderName;
606     }
607
608     /**
609      * compress data
610      *
611      * @see XByteBuffer#createDataPackage(byte[])
612      * @param indata
613      * @return
614      * @throws IOException
615      * FIXME get CompressMessageDate from cluster instanz
616      */

617     protected byte[] convertSenderData(byte[] data) throws IOException JavaDoc {
618         return XByteBuffer.createDataPackage(data, isCompress());
619     }
620
621     /**
622      * Send message to concrete sender. If autoConnect is true, check is
623      * connection broken and the reconnect the complete sender.
624      * <ul>
625      * <li>failure the suspect flag is set true. After successfully sending the
626      * suspect flag is set to false.</li>
627      * <li>Stats is only update after sussesfull sending</li>
628      * </ul>
629      *
630      * @param sessionId
631      * Unique Message Id
632      * @param data
633      * message Data
634      * @param sender
635      * concrete message sender
636      * @throws java.io.IOException
637      */

638     protected void sendMessageData(String JavaDoc sessionId, byte[] data,
639             IDataSender sender) throws java.io.IOException JavaDoc {
640         if (sender == null)
641             throw new java.io.IOException JavaDoc(
642                     "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
643         try {
644             if (autoConnect && !sender.isConnected())
645                 sender.connect();
646             sender.sendMessage(sessionId, data);
647             sender.setSuspect(false);
648             addStats(data.length);
649         } catch (Exception JavaDoc x) {
650             if (log.isWarnEnabled()) {
651                 if (!sender.getSuspect()) {
652                     log
653                             .warn(
654                                     "Unable to send replicated message, is server down?",
655                                     x);
656                 }
657             }
658             sender.setSuspect(true);
659             failureCounter++;
660         }
661
662     }
663
664 }
Popular Tags