KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > nio > SelectorWorker


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.nio;
19
20 import java.io.IOException JavaDoc;
21 import java.nio.channels.SelectionKey JavaDoc;
22 import java.nio.channels.Selector JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.Set JavaDoc;
25 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
26
27
28 public class SelectorWorker implements Runnable JavaDoc {
29
30     private final static AtomicInteger JavaDoc NEXT_ID = new AtomicInteger JavaDoc();
31
32     final SelectorManager manager;
33     final Selector JavaDoc selector;
34     final int id = NEXT_ID.getAndIncrement();
35     final AtomicInteger JavaDoc useCounter = new AtomicInteger JavaDoc();
36     final private int maxChannelsPerWorker;
37
38
39     public SelectorWorker(SelectorManager manager) throws IOException JavaDoc {
40         this.manager = manager;
41         selector = Selector.open();
42         maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
43     }
44     
45     void incrementUseCounter() {
46         int use = useCounter.getAndIncrement();
47         if( use == 0 ) {
48             manager.getSelectorExecutor().execute(this);
49         } else if( use+1 == maxChannelsPerWorker ) {
50             manager.onWorkerFullEvent(this);
51         }
52     }
53
54     void decrementUseCounter() {
55         int use = useCounter.getAndDecrement();
56         if (use == 1) {
57             manager.onWorkerEmptyEvent(this);
58         } else if (use == maxChannelsPerWorker ) {
59             manager.onWorkerNotFullEvent(this);
60         }
61     }
62
63     boolean isRunning() {
64         return useCounter.get()!=0;
65     }
66
67     public void run() {
68
69         String JavaDoc origName = Thread.currentThread().getName();
70         try {
71             Thread.currentThread().setName("Selector Worker: " + id);
72             while (isRunning()) {
73
74                 int count = selector.select(10);
75                 if (count == 0)
76                     continue;
77                 
78                 if (!isRunning())
79                     return;
80
81                 // Get a java.util.Set containing the SelectionKey objects
82
// for all channels that are ready for I/O.
83
Set JavaDoc keys = selector.selectedKeys();
84
85                 for (Iterator JavaDoc i = keys.iterator(); i.hasNext();) {
86                     final SelectionKey JavaDoc key = (SelectionKey JavaDoc) i.next();
87                     i.remove();
88
89                     final SelectorSelection s = (SelectorSelection) key.attachment();
90                     try {
91                         s.disable();
92                         
93                         // Kick off another thread to find newly selected keys while we process the
94
// currently selected keys
95
manager.getChannelExecutor().execute(new Runnable JavaDoc() {
96                             public void run() {
97                                 try {
98                                     s.onSelect();
99                                     s.enable();
100                                 } catch (Throwable JavaDoc e) {
101                                     s.onError(e);
102                                 }
103                             }
104                         });
105                         
106                     } catch ( Throwable JavaDoc e ) {
107                         s.onError(e);
108                     }
109                     
110                 }
111
112             }
113         } catch (IOException JavaDoc e) {
114             
115             // Don't accept any more slections
116
manager.onWorkerEmptyEvent(this);
117
118             // Notify all the selections that the error occurred.
119
Set JavaDoc keys = selector.keys();
120             for (Iterator JavaDoc i = keys.iterator(); i.hasNext();) {
121                 SelectionKey JavaDoc key = (SelectionKey JavaDoc) i.next();
122                 SelectorSelection s = (SelectorSelection) key.attachment();
123                 s.onError(e);
124             }
125             
126         } finally {
127             Thread.currentThread().setName(origName);
128         }
129     }
130 }
131
Popular Tags