1 18 19 package org.objectweb.jac.aspects.queue; 20 21 22 23 import org.objectweb.jac.core.rtti.FieldItem; 24 import org.objectweb.jac.core.rtti.MethodItem; 25 import java.util.Collections ; 26 import java.util.HashMap ; 27 import java.util.HashSet ; 28 import java.util.LinkedList ; 29 import java.util.List ; 30 import java.util.Map ; 31 import java.util.Set ; 32 import org.objectweb.jac.util.Log; 33 34 35 38 public class MessageQueue { 39 40 NotifyThread thread; 41 public MessageQueue() { 42 thread = new NotifyThread(); 43 thread.start(); 44 } 45 46 HashMap fieldClients = new HashMap (); 48 49 54 public Map getFieldClients() { 55 return fieldClients; 56 } 57 58 List queue = Collections.synchronizedList(new LinkedList ()); 59 60 66 public void fieldChanged(Object substance, FieldItem field, 67 Object previousValue, Object currentValue) { 68 Log.trace("mqueue","fieldChanged("+substance+","+field+","+ 69 previousValue+" -> "+currentValue); 70 queue.add(new FieldChangeEvent(substance,field,previousValue,currentValue)); 71 thread.notifyClients(); 72 } 73 74 80 public void registerFieldChange(FieldItem field, MethodItem callback) { 81 Set clients = (Set )fieldClients.get(field); 82 if (clients==null) { 83 clients = new HashSet (); 84 fieldClients.put(field,clients); 85 } 86 clients.add(callback); 87 } 88 89 93 public void notifyFieldChange(FieldChangeEvent event) { 94 Set clients = (Set )fieldClients.get(event.getField()); 95 if (clients!=null) { 96 MethodItem[] array = (MethodItem[])clients.toArray(new MethodItem[] {}); 97 Object [] params = new Object [] {event}; 98 for(int i=0; i<array.length; i++) { 99 try { 100 Log.warning("mqueue","notifying "+event+" to "+array[i]); 101 array[i].invokeStatic(params); 102 } catch (Exception e) { 103 Log.warning("mqueue","Failed to invoke "+array[i]+": "+e); 104 } 105 } 106 } 107 } 108 109 115 public void unregisterFieldChange(FieldItem field, MethodItem callback) { 116 Set clients = (Set )fieldClients.get(field); 117 if (clients!=null) { 118 clients.remove(callback); 119 } 120 } 121 122 class NotifyThread extends Thread { 123 public void run() { 124 while (true) { 125 try { 126 synchronized(this) { 127 this.wait(); 128 } 129 } catch (InterruptedException e) { 130 } 131 Log.trace("mqueue","Queue = "+queue); 132 if (!queue.isEmpty()) { 133 Object events[] = queue.toArray(); 134 for (int i=0; i<events.length; i++) { 135 if (events[i] instanceof FieldChangeEvent) { 136 FieldChangeEvent event = (FieldChangeEvent)events[i]; 137 notifyFieldChange(event); 138 queue.remove(event); 139 } 140 } 141 } 142 } 143 } 144 145 148 public synchronized void notifyClients() { 149 this.notify(); 150 } 151 } 152 } 153 | Popular Tags |