1 23 package com.sun.enterprise.web.connector.grizzly; 24 25 import java.io.IOException ; 26 import java.util.ArrayList ; 27 import java.util.Iterator ; 28 import java.util.Set ; 29 import java.util.logging.Level ; 30 import java.nio.channels.CancelledKeyException ; 31 import java.nio.channels.ClosedChannelException ; 32 import java.nio.channels.Selector ; 33 import java.nio.channels.SelectionKey ; 34 import java.nio.channels.SocketChannel ; 35 36 37 43 public class SelectorReadThread extends SelectorThread{ 44 45 48 ArrayList <SocketChannel > channels = new ArrayList <SocketChannel >(); 49 50 51 54 public int countName; 55 56 57 60 protected void initPipeline(){ 61 62 initKeepAlivePipeline(); 63 64 processorPipeline = newPipeline(maxProcessorWorkerThreads, 65 minWorkerThreads, "http-SelectorRead-" 66 + countName + "-", 67 port, 68 Thread.MAX_PRIORITY); 69 processorPipeline.initPipeline(); 70 71 if ( maxReadWorkerThreads > 0 ){ 73 readPipeline = newPipeline(maxReadWorkerThreads, 74 minWorkerThreads, "read", 75 port,Thread.NORM_PRIORITY); 76 readPipeline.initPipeline(); 77 } else { 78 readPipeline = processorPipeline; 79 } 80 } 81 82 83 87 public synchronized void addChannel(SocketChannel channel) 88 throws IOException , ClosedChannelException { 89 channels.add(channel); 90 selector.wakeup(); 91 } 92 93 94 97 private synchronized void registerNewChannels() throws IOException { 98 int size = channels.size(); 99 for (int i = 0; i < size; i++) { 100 SocketChannel sc = channels.get(i); 101 sc.configureBlocking(false); 102 try { 103 SelectionKey readKey = 104 sc.register(selector, SelectionKey.OP_READ); 105 setSocketOptions(((SocketChannel )readKey.channel()).socket()); 106 } catch (ClosedChannelException cce) { 107 } 108 } 109 channels.clear(); 110 } 111 112 113 116 public void initEndpoint() throws IOException , InstantiationException { 117 setName("SelectorReaderThread-" + port); 118 initAlgorithm(); 119 initPipeline(); 120 initProcessorTask(maxProcessorWorkerThreads); 121 initReadTask(minReadQueueLength); 122 } 123 124 125 128 public void startEndpoint() throws IOException , InstantiationException { 129 running = true; 130 131 if (readPipeline != null){ 132 readPipeline.startPipeline(); 133 } 134 135 while (running) { 136 try{ 137 if ( selector == null ){ 138 selector = Selector.open(); 139 } 140 141 registerNewChannels(); 142 doSelect(); 143 } catch (Throwable t){ 144 logger.log(Level.FINE,"selectorThread.errorOnRequest", t); 145 } 146 } 147 } 148 149 150 156 public int getCurrentBusyProcessorThreads() { 157 return (processorPipeline.getCurrentThreadsBusy()); 158 } 159 160 } 161 | Popular Tags |