1 18 package org.apache.activemq.transport.nio; 19 20 import java.io.IOException ; 21 import java.nio.channels.SelectionKey ; 22 import java.nio.channels.Selector ; 23 import java.util.Iterator ; 24 import java.util.Set ; 25 import java.util.concurrent.atomic.AtomicInteger ; 26 27 28 public class SelectorWorker implements Runnable { 29 30 private final static AtomicInteger NEXT_ID = new AtomicInteger (); 31 32 final SelectorManager manager; 33 final Selector selector; 34 final int id = NEXT_ID.getAndIncrement(); 35 final AtomicInteger useCounter = new AtomicInteger (); 36 final private int maxChannelsPerWorker; 37 38 39 public SelectorWorker(SelectorManager manager) throws IOException { 40 this.manager = manager; 41 selector = Selector.open(); 42 maxChannelsPerWorker = manager.getMaxChannelsPerWorker(); 43 } 44 45 void incrementUseCounter() { 46 int use = useCounter.getAndIncrement(); 47 if( use == 0 ) { 48 manager.getSelectorExecutor().execute(this); 49 } else if( use+1 == maxChannelsPerWorker ) { 50 manager.onWorkerFullEvent(this); 51 } 52 } 53 54 void decrementUseCounter() { 55 int use = useCounter.getAndDecrement(); 56 if (use == 1) { 57 manager.onWorkerEmptyEvent(this); 58 } else if (use == maxChannelsPerWorker ) { 59 manager.onWorkerNotFullEvent(this); 60 } 61 } 62 63 boolean isRunning() { 64 return useCounter.get()!=0; 65 } 66 67 public void run() { 68 69 String origName = Thread.currentThread().getName(); 70 try { 71 Thread.currentThread().setName("Selector Worker: " + id); 72 while (isRunning()) { 73 74 int count = selector.select(10); 75 if (count == 0) 76 continue; 77 78 if (!isRunning()) 79 return; 80 81 Set keys = selector.selectedKeys(); 84 85 for (Iterator i = keys.iterator(); i.hasNext();) { 86 final SelectionKey key = (SelectionKey ) i.next(); 87 i.remove(); 88 89 final SelectorSelection s = (SelectorSelection) key.attachment(); 90 try { 91 s.disable(); 92 93 manager.getChannelExecutor().execute(new Runnable () { 96 public void run() { 97 try { 98 s.onSelect(); 99 s.enable(); 100 } catch (Throwable e) { 101 s.onError(e); 102 } 103 } 104 }); 105 106 } catch ( Throwable e ) { 107 s.onError(e); 108 } 109 110 } 111 112 } 113 } catch (IOException e) { 114 115 manager.onWorkerEmptyEvent(this); 117 118 Set keys = selector.keys(); 120 for (Iterator i = keys.iterator(); i.hasNext();) { 121 SelectionKey key = (SelectionKey ) i.next(); 122 SelectorSelection s = (SelectorSelection) key.attachment(); 123 s.onError(e); 124 } 125 126 } finally { 127 Thread.currentThread().setName(origName); 128 } 129 } 130 } 131 | Popular Tags |