1 4 package demo.sharedqueue; 5 6 import java.io.File ; 7 import java.net.InetAddress ; 8 import java.net.UnknownHostException ; 9 import javax.management.MBeanServer ; 10 import javax.management.MBeanServerFactory ; 11 import javax.management.MBeanServerNotification ; 12 import javax.management.Notification ; 13 import javax.management.NotificationFilter ; 14 import javax.management.NotificationListener ; 15 import javax.management.ObjectName ; 16 import org.mortbay.http.handler.ResourceHandler; 17 import org.mortbay.http.HttpContext; 18 import org.mortbay.http.HttpHandler; 19 import org.mortbay.http.HttpServer; 20 import org.mortbay.http.SocketListener; 21 22 public class Main { 23 private final File cwd = new File (System.getProperty("user.dir")); 24 private int lastPortUsed; 25 private Queue queue; 26 private Worker worker; 27 28 34 public void start(int port) 35 throws Exception { 36 String nodeId = registerForNotifications(); 37 port = setPort(port); 38 39 System.out.println("DSO SharedQueue (node " + nodeId + ")"); 40 System.out.println("Open your browser and go to - http://" + getHostName() + ":" + port + "/webapp\n"); 41 HttpServer server = new HttpServer(); 42 SocketListener listener = new SocketListener(); 43 listener.setPort(port); 44 server.addListener(listener); 45 46 HttpContext context = server.addContext("/"); 47 String resourceBase = cwd.getPath(); 48 context.setResourceBase(resourceBase); 49 context.addHandler(new ResourceHandler()); 50 51 queue = new Queue(port); 52 worker = queue.createWorker(nodeId); 53 54 HttpContext ajaxContext = server.addContext(SimpleHttpHandler.ACTION); 55 HttpHandler ajaxHandler = new SimpleHttpHandler(queue, resourceBase); 56 ajaxContext.addHandler(ajaxHandler); 57 58 startReaper(); 59 server.start(); 60 } 61 62 private int setPort(int port) { 63 if (port == -1) { 64 if (lastPortUsed == 0) { 65 port = lastPortUsed = 1990; 66 } 67 else { 68 port = ++lastPortUsed; 69 } 70 } 71 else { 72 lastPortUsed = port; 73 } 74 75 return port; 76 } 77 78 82 private void startReaper() { 83 Thread reaper = new Thread ( 84 new Runnable () { 85 public void run() { 86 while (true) { 87 Main.this.queue.reap(); 88 try { 89 Thread.sleep(1000); 90 } 91 catch (InterruptedException ie) { 92 System.err.println(ie.getMessage()); 93 } 94 } 95 } 96 }); 97 reaper.start(); 98 } 99 100 106 public static final void main(String [] args) 107 throws Exception { 108 int port = -1; 109 try { port = Integer.parseInt(args[0]); } 110 catch (Exception e) { } 111 112 (new Main()).start(port); 113 } 114 115 118 static String getHostName() { 119 try { 120 InetAddress addr = InetAddress.getLocalHost(); 121 byte[] ipAddr = addr.getAddress(); 122 return addr.getHostName(); 123 } 124 catch (UnknownHostException e) { 125 return "Unknown"; 126 } 127 } 128 129 133 private String registerForNotifications() throws Exception { 134 java.util.List servers = MBeanServerFactory.findMBeanServer(null); 135 if (servers.size() == 0) { 136 System.err.println("WARNING: No JMX servers found, unable to register for notifications."); 137 return "0"; 138 } 139 140 MBeanServer server = (MBeanServer )servers.get(0); 141 final ObjectName clusterBean = new ObjectName ("org.terracotta:type=Terracotta Cluster,name=Terracotta Cluster Bean"); 142 ObjectName delegateName = ObjectName.getInstance("JMImplementation:type=MBeanServerDelegate"); 143 final java.util.List clusterBeanBag = new java.util.ArrayList (); 144 145 NotificationListener listener0 = new NotificationListener () { 147 public void handleNotification(Notification notification, Object handback) { 148 synchronized (clusterBeanBag) { 149 clusterBeanBag.add(handback); 150 clusterBeanBag.notifyAll(); 151 } 152 } 153 }; 154 155 NotificationFilter filter0 = new NotificationFilter () { 157 public boolean isNotificationEnabled(Notification notification) { 158 if (notification.getType().equals("JMX.mbean.registered") 159 && ((MBeanServerNotification ) notification) 160 .getMBeanName().equals(clusterBean)) 161 return true; 162 return false; 163 } 164 }; 165 166 server.addNotificationListener(delegateName, listener0, filter0, 168 clusterBean); 169 170 java.util.Set allObjectNames = server.queryNames(null, null); 173 174 if (!allObjectNames.contains(clusterBean)) { 175 synchronized (clusterBeanBag) { 176 while (clusterBeanBag.isEmpty()) { 177 clusterBeanBag.wait(); 178 } 179 } 180 } 181 182 server.removeNotificationListener(delegateName, listener0); 184 185 NotificationListener listener1 = new NotificationListener () { 187 public void handleNotification(Notification notification, Object handback) { 188 String nodeId = notification.getMessage(); 189 Worker worker = Main.this.queue.getWorker(nodeId); 190 if (worker != null) { 191 worker.markForExpiration(); 192 } else { 193 System.err.println("Worker for nodeId: " + nodeId + " not found."); 194 } 195 } 196 }; 197 198 NotificationFilter filter1 = new NotificationFilter () { 200 public boolean isNotificationEnabled(Notification notification) { 201 return notification.getType().equals("com.tc.cluster.event.nodeDisconnected"); 202 } 203 }; 204 205 server.addNotificationListener(clusterBean, listener1, filter1, clusterBean); 207 return (server.getAttribute(clusterBean, "NodeId")).toString(); 208 } 209 } 210 | Popular Tags |