KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > SHUFFLE


1 package org.jgroups.protocols;
2
3 import org.jgroups.Event;
4 import org.jgroups.Message;
5 import org.jgroups.stack.Protocol;
6
7 import java.util.*;
8
9
10
11 /**
12  * This layer shuffles upcoming messages, put it just above your bottom layer.
13  * If you system sends less than 2 messages per sec you can notice a latency due
14  * to this layer.
15  *
16  * @author Gianluca Collot
17  *
18  */

19
20 public class SHUFFLE extends Protocol implements Runnable JavaDoc {
21
22     String JavaDoc name="SHUFFLE";
23     final List messages;
24     Thread JavaDoc messagesHandler;
25
26     public SHUFFLE() {
27         messages = Collections.synchronizedList(new ArrayList());
28     }
29
30     public String JavaDoc getName() {
31         return name;
32     }
33
34     public boolean setProperties(Properties props) {
35         String JavaDoc str;
36
37         super.setProperties(props);
38         str=props.getProperty("name");
39         if(str != null) {
40             name=str;
41             props.remove("name");
42         }
43
44         if(props.size() > 0) {
45             System.err.println("DUMMY.setProperties(): these properties are not recognized:");
46             props.list(System.out);
47             return false;
48         }
49         return true;
50     }
51
52     /**
53      * Adds upcoming messages to the <code>messages List<\code> where the <code>messagesHandler<\code>
54      * retrieves them.
55      */

56
57     public void up(Event evt) {
58         Message msg;
59
60         switch (evt.getType()) {
61
62     case Event.MSG:
63             msg=(Message)evt.getArg();
64             // Do something with the event, e.g. extract the message and remove a header.
65
// Optionally pass up
66
messages.add(msg);
67             return;
68         }
69
70         passUp(evt); // Pass up to the layer above us
71
}
72
73
74
75
76     /**
77      * Starts the <code>messagesHandler<\code>
78      */

79     public void start() throws Exception JavaDoc {
80         messagesHandler = new Thread JavaDoc(this,"MessagesHandler");
81         messagesHandler.setDaemon(true);
82         messagesHandler.start();
83     }
84
85     /**
86      * Stops the messagesHandler
87      */

88     public void stop() {
89         Thread JavaDoc tmp = messagesHandler;
90         messagesHandler = null;
91         try {
92             tmp.join();
93         } catch (Exception JavaDoc ex) {ex.printStackTrace();}
94     }
95
96     /**
97      * Removes a random chosen message from the <code>messages List<\code> if there
98      * are less than 10 messages in the List it waits some time to ensure to chose from
99      * a set of messages > 1.
100      */

101
102     public void run() {
103         Message msg;
104         while (messagesHandler != null) {
105             if ( messages.size() > 0 ) {
106                 msg = (Message) messages.remove(rnd(messages.size()));
107                 passUp(new Event(Event.MSG,msg));
108             }
109             if (messages.size() < 5) {
110                 try {
111                     Thread.sleep(300); /** @todo make this time user configurable */
112                 }
113                 catch (Exception JavaDoc ex) {
114                     ex.printStackTrace();
115                 }
116             }
117         }// while
118
// PassUp remaining messages
119
Iterator iter = messages.iterator();
120         while (iter.hasNext()) {
121             msg = (Message) iter.next();
122             passUp(new Event(Event.MSG,msg));
123         }
124     }
125
126     // random integer between 0 and n-1
127
int rnd(int n) { return (int)(Math.random()*n); }
128
129 }
130
Popular Tags