1 package org.jboss.mx.remoting.rmi; 2 3 import java.util.HashMap ; 4 import java.util.Map ; 5 import java.util.List ; 6 import java.util.ArrayList ; 7 import java.util.Set ; 8 import java.util.Iterator ; 9 import java.util.TimerTask ; 10 import java.util.Timer ; 11 import java.io.IOException ; 12 13 import javax.management.ObjectName ; 14 import javax.management.NotificationListener ; 15 import javax.management.Notification ; 16 import javax.management.NotificationFilter ; 17 import javax.management.remote.rmi.RMIConnection ; 18 import javax.management.remote.NotificationResult ; 19 import javax.management.remote.TargetedNotification ; 20 21 import org.jboss.logging.Logger; 22 import org.jboss.util.threadpool.BasicThreadPool; 23 import org.jboss.util.threadpool.BlockingMode; 24 25 import EDU.oswego.cs.dl.util.concurrent.ReaderPreferenceReadWriteLock; 26 import EDU.oswego.cs.dl.util.concurrent.SyncMap; 27 28 31 39 public class ClientNotifier extends TimerTask 40 { 41 private Map clientListeners = null; 42 private Timer fetchTimer = null; 43 private RMIConnection connection = null; 44 45 private long fetchTimeout = 1000; 47 private long clientSequenceNumber = -1; 48 private int maxNotifications = 10; 49 50 private BasicThreadPool notifierPool = null; 51 private int maxNumberThreads = 20; 52 53 private static final Logger log = Logger.getLogger(ClientNotifier.class); 54 55 56 public ClientNotifier(RMIConnection rmiConnection) 57 { 58 this.connection = rmiConnection; 59 clientListeners = new SyncMap(new HashMap (), new ReaderPreferenceReadWriteLock()); 60 fetchTimer = new Timer (true); 61 fetchTimer.schedule(this, 1000, 1000); 63 64 notifierPool = new BasicThreadPool("JBoss JMX Remoting client notifier"); 65 notifierPool.setMaximumPoolSize(maxNumberThreads); 66 notifierPool.setBlockingMode(BlockingMode.WAIT); 67 } 68 69 72 public void run() 73 { 74 try 75 { 76 NotificationResult result = connection.fetchNotifications(clientSequenceNumber, maxNotifications, fetchTimeout); 77 if(result != null) 78 { 79 clientSequenceNumber = result.getNextSequenceNumber(); 80 TargetedNotification [] targetedNotifications = result.getTargetedNotifications(); 81 if(targetedNotifications != null) 82 { 83 deliverNotifications(targetedNotifications); 84 } 85 } 86 else 87 { 88 log.error("Fetched notifications and result was null."); 89 } 90 } 91 catch(IOException e) 92 { 93 log.error("Error fetching notifications for sequence number " + clientSequenceNumber, e); 94 } 95 } 96 97 private void deliverNotifications(TargetedNotification [] targetedNotifications) 98 { 99 for(int x = 0; x < targetedNotifications.length; x++) 100 { 101 TargetedNotification targetedNotification = targetedNotifications[x]; 102 Integer id = targetedNotification.getListenerID(); 103 ClientListenerHolder holder = (ClientListenerHolder)clientListeners.get(id); 104 if(holder != null) 105 { 106 final Notification notification = targetedNotification.getNotification(); 107 boolean deliverNotification = true; 108 if(holder.getFilterOnClient()) 109 { 110 NotificationFilter filter = holder.getFilter(); 111 if(!filter.isNotificationEnabled(notification)) 112 { 113 deliverNotification = false; 114 } 115 } 116 if(deliverNotification) 117 { 118 final NotificationListener listener = holder.getListener(); 119 final Object handback = holder.getHandback(); 120 Runnable notifyRun = new Runnable () 121 { 122 public void run() 123 { 124 try 125 { 126 listener.handleNotification(notification, handback); 127 } 128 catch(Throwable e) 129 { 130 log.error("Error delivering notification to listener: " + listener, e); 131 } 132 } 133 }; 134 notifierPool.run(notifyRun); 135 } 136 } 137 } 138 } 139 140 public boolean exists(ClientListenerHolder holder) 141 { 142 return clientListeners.containsValue(holder); 143 } 144 145 146 public void addNotificationListener(Integer listenerID, ClientListenerHolder holder) 147 { 148 clientListeners.put(listenerID, holder); 149 } 150 151 public Integer [] getListeners(ObjectName name, NotificationListener listener) 152 { 153 List idList = new ArrayList (); 154 Set keys = clientListeners.keySet(); 155 Iterator itr = keys.iterator(); 156 while(itr.hasNext()) 157 { 158 Integer id = (Integer )itr.next(); 159 ClientListenerHolder holder = (ClientListenerHolder)clientListeners.get(id); 160 if(holder.getObjectName().equals(name) && holder.getListener().equals(listener)) 161 { 162 idList.add(id); 163 } 164 } 165 Integer [] ids = new Integer [idList.size()]; 166 ids = (Integer [])idList.toArray(ids); 167 return ids; 168 } 169 170 public void removeListeners(Integer [] ids) 171 { 172 for(int x = 0; x < ids.length; x++) 173 { 174 clientListeners.remove(ids[x]); 175 } 176 } 177 178 public Integer getListener(ClientListenerHolder clientListenerHolder) 179 { 180 Iterator itr = clientListeners.entrySet().iterator(); 181 while(itr.hasNext()) 182 { 183 Map.Entry value = (Map.Entry )itr.next(); 184 if(value.getValue().equals(clientListenerHolder)) 185 { 186 return (Integer )value.getKey(); 187 } 188 } 189 return null; 190 } 191 192 public void close() 193 { 194 fetchTimer.cancel(); 195 } 196 } 197 | Popular Tags |