1 23 24 package com.sun.enterprise.web.ara; 25 26 import com.sun.enterprise.web.connector.grizzly.Pipeline; 27 import com.sun.enterprise.web.connector.grizzly.ReadTask; 28 import com.sun.enterprise.web.connector.grizzly.SelectorThread; 29 import com.sun.enterprise.web.connector.grizzly.SelectorThreadConfig; 30 import com.sun.enterprise.web.connector.grizzly.StreamAlgorithm; 31 import com.sun.enterprise.web.connector.grizzly.Task; 32 import com.sun.enterprise.web.connector.grizzly.TaskEvent; 33 import com.sun.enterprise.web.connector.grizzly.TaskListener; 34 35 import java.io.IOException ; 36 import java.net.Socket ; 37 import java.nio.ByteBuffer ; 38 import java.nio.channels.SelectionKey ; 39 import java.nio.channels.SocketChannel ; 40 import java.util.ArrayList ; 41 import java.util.concurrent.ConcurrentHashMap ; 42 import java.util.logging.Logger ; 43 import java.util.logging.Level ; 44 45 51 public class IsolatedTask extends TaskWrapper implements TaskListener{ 52 53 public final static int ISOLATED_TASK = 4; 54 55 59 protected StreamAlgorithm algorithm; 60 61 62 65 protected RulesExecutor rulesExecutor; 66 67 68 71 protected ArrayList <TaskListener> listeners = new ArrayList <TaskListener>(); 72 73 74 78 protected int initialBytePosition; 79 80 81 85 protected int initialByteLimit; 86 87 88 92 protected TaskEvent<IsolatedTask> taskEvent; 93 94 95 99 private static ConcurrentHashMap <SelectionKey ,Pipeline> cacheKey = 100 new ConcurrentHashMap <SelectionKey ,Pipeline>(); 101 102 103 public IsolatedTask(){ 104 taskEvent = new TaskEvent<IsolatedTask>(); 105 taskEvent.attach(this); 106 } 107 108 109 114 public void doTask() throws IOException { 115 ReadTask readTask = (ReadTask)wrappedTask; 116 117 Pipeline pipeline = cacheKey.get(readTask.getSelectionKey()); 118 if ( pipeline != null ){ 119 readTask.setPipeline(pipeline); 120 readTask.execute(); 121 return; 122 } 123 124 ByteBuffer byteBuffer = readTask.getByteBuffer(); 125 try { 126 SocketChannel socketChannel = 127 (SocketChannel )readTask.getSelectionKey().channel(); 128 Socket socket = socketChannel.socket(); 129 130 socketChannel.read(byteBuffer); 131 132 int position = byteBuffer.position(); 133 int limit = byteBuffer.limit(); 134 135 boolean execute = true; 138 139 if (algorithm.parse(byteBuffer)) { 141 execute = rulesExecutor.execute(this); 142 } 143 144 if ( execute ){ 145 readTask.setBytesAvailable(true); 148 byteBuffer.limit(limit); 149 byteBuffer.position(position); 150 151 if ( rulesExecutor.isCachingAllowed()) { 152 cacheKey.put(readTask.getSelectionKey(), 154 readTask.getPipeline()); 155 } 156 157 readTask.addTaskListener(this); 159 160 readTask.execute(); 161 } 162 } catch (Exception ex){ 163 SelectorThread.logger() 164 .log(Level.SEVERE,"IsolatedTask logic exception.",ex); 165 } finally { 166 fireTaskEvent(taskEvent); 167 } 168 } 169 170 171 174 public void setRulesExecutor(RulesExecutor rulesExecutor){ 175 this.rulesExecutor = rulesExecutor; 176 } 177 178 181 public void setAlgorithm(StreamAlgorithm algorithm){ 182 this.algorithm = algorithm; 183 } 184 185 186 189 public IsolatedTask wrap(Task task){ 190 wrappedTask = task; 191 return this; 192 } 193 194 195 197 198 201 public void execute(){ 202 run(); 203 } 204 205 206 209 public void run(){ 210 try{ 211 doTask(); 212 } catch (IOException ex){ 213 throw new RuntimeException (ex); 214 }; 215 } 216 217 219 220 223 public void addTaskListener(TaskListener task){ 224 listeners.add(task); 225 } 226 227 228 232 public void removeTaskListener(TaskListener task){ 233 listeners.remove(task); 234 } 235 236 237 240 public void clearTaskListeners(){ 241 listeners.clear(); 242 } 243 244 245 248 protected void fireTaskEvent(TaskEvent<?> event){ 249 for (int i=0; i < listeners.size(); i++){ 250 listeners.get(i).taskEvent(event); 251 } 252 } 253 254 255 258 public void taskEvent(TaskEvent event) { 259 if (event.getStatus() == TaskEvent.COMPLETED) 260 cacheKey.remove(event.attachement()); 261 } 262 263 264 267 public int getType(){ 268 return ISOLATED_TASK; 269 } 270 271 } 272 | Popular Tags |