1 package org.jacorb.notification.engine; 2 3 23 24 import java.util.Collections ; 25 import java.util.HashMap ; 26 import java.util.Map ; 27 28 import org.jacorb.notification.interfaces.FilterStage; 29 import org.jacorb.notification.interfaces.Message; 30 import org.omg.CORBA.AnyHolder ; 31 import org.omg.CosNotifyFilter.UnsupportedFilterableData; 32 33 37 38 public class FilterProxySupplierTask extends AbstractFilterTask 39 { 40 static class AlternateMessageMap 41 { 42 private final Map alternateMessages_; 43 44 46 public AlternateMessageMap() 47 { 48 this(new HashMap ()); 49 } 50 51 AlternateMessageMap(Map m) 52 { 53 alternateMessages_ = m; 54 } 55 56 58 public Message getAlternateMessage(FilterStage s) 59 { 60 if (alternateMessages_.containsKey(s)) 61 { 62 return (Message) alternateMessages_.get(s); 63 } 64 return null; 65 } 66 67 public void putAlternateMessage(FilterStage s, Message e) 68 { 69 alternateMessages_.put(s, e); 70 } 71 72 public void clear() 73 { 74 alternateMessages_.clear(); 75 } 76 } 77 78 public static final AlternateMessageMap EMPTY_MAP = new AlternateMessageMap( 79 Collections.EMPTY_MAP) 80 { 81 public void clear() 82 { 83 } 84 }; 85 86 88 final AlternateMessageMap changedMessages_ = new AlternateMessageMap(); 89 90 private static int sCount = 0; 91 92 private int id_ = ++sCount; 93 94 96 public FilterProxySupplierTask(TaskFactory taskFactory, TaskExecutor taskExecutor) 97 { 98 super(taskFactory, taskExecutor); 99 } 100 101 103 public String toString() 104 { 105 return "[FilterProxySupplierTask#" + id_ + "]"; 106 } 107 108 public void reset() 109 { 110 super.reset(); 111 112 changedMessages_.clear(); 113 } 114 115 public void doFilter() throws InterruptedException 116 { 117 filter(); 118 119 getTaskFactory().enqueueMessage(this); 120 } 121 122 private Message updatePriority(int indexOfCurrentEvent, Message message) 123 { 124 AnyHolder _priorityFilterResult = new AnyHolder (); 125 126 Message _currentMessage = message; 127 128 try 129 { 130 boolean priorityMatch = message.match(arrayCurrentFilterStage_[indexOfCurrentEvent] 131 .getPriorityFilter(), _priorityFilterResult); 132 133 if (priorityMatch) 134 { 135 _currentMessage = (Message) getMessage().clone(); 136 137 _currentMessage.setPriority(_priorityFilterResult.value.extract_long()); 138 } 139 } catch (UnsupportedFilterableData e) 140 { 141 } 143 144 return _currentMessage; 145 } 146 147 private Message updateTimeout(int indexOfCurrentFilterStage, Message message) 148 { 149 AnyHolder _lifetimeFilterResult = new AnyHolder (); 150 Message _currentMessage = message; 151 152 try 153 { 154 boolean lifetimeMatch = _currentMessage.match( 155 arrayCurrentFilterStage_[indexOfCurrentFilterStage].getLifetimeFilter(), 156 _lifetimeFilterResult); 157 158 if (lifetimeMatch && (_currentMessage == getMessage())) 159 { 160 164 _currentMessage = (Message) getMessage().clone(); 165 166 _currentMessage.setTimeout(_lifetimeFilterResult.value.extract_long()); 167 } 168 169 } catch (UnsupportedFilterableData e) 170 { 171 } 173 174 return _currentMessage; 175 } 176 177 private void filter() 178 { 179 for (int x = 0; x < arrayCurrentFilterStage_.length; ++x) 180 { 181 boolean _forward = false; 182 183 if (!arrayCurrentFilterStage_[x].isDisposed()) 184 { 185 Message _currentMessage = getMessage(); 186 187 if (arrayCurrentFilterStage_[x].hasPriorityFilter()) 188 { 189 _currentMessage = updatePriority(x, _currentMessage); 190 } 191 192 if (arrayCurrentFilterStage_[x].hasLifetimeFilter()) 193 { 194 _currentMessage = updateTimeout(x, _currentMessage); 195 } 196 197 if (_currentMessage != getMessage()) 198 { 199 changedMessages_.putAlternateMessage(arrayCurrentFilterStage_[x], 204 _currentMessage); 205 } 206 207 _forward = _currentMessage.match(arrayCurrentFilterStage_[x]); 208 } 209 210 if (_forward) 211 { 212 addFilterStage(arrayCurrentFilterStage_[x].getSubsequentFilterStages()); 214 } 215 } 216 } 217 } | Popular Tags |