1 22 23 24 package com.mchange.v2.async; 25 26 import com.mchange.v2.log.*; 27 import com.mchange.v2.util.ResourceClosedException; 28 29 35 public class RoundRobinAsynchronousRunner implements AsynchronousRunner, Queuable 36 { 37 private final static MLogger logger = MLog.getLogger( RoundRobinAsynchronousRunner.class ); 38 39 final RunnableQueue[] rqs; 41 42 int task_turn = 0; 44 45 int view_turn = 0; 47 48 public RoundRobinAsynchronousRunner( int num_threads, boolean daemon ) 49 { 50 this.rqs = new RunnableQueue[ num_threads ]; 51 for(int i = 0; i < num_threads; ++i) 52 rqs[i] = new CarefulRunnableQueue( daemon, false ); 53 } 54 55 public synchronized void postRunnable(Runnable r) 56 { 57 try 58 { 59 int index = task_turn; 60 task_turn = (task_turn + 1) % rqs.length; 61 rqs[index].postRunnable( r ); 62 63 64 65 66 67 68 69 70 71 72 } 75 catch ( NullPointerException e ) 76 { 77 if ( Debug.DEBUG ) 79 { 80 if ( logger.isLoggable( MLevel.FINE ) ) 81 logger.log( MLevel.FINE, "NullPointerException while posting Runnable -- Probably we're closed.", e ); 82 } 83 this.close( true ); 84 throw new ResourceClosedException("Attempted to use a RoundRobinAsynchronousRunner in a closed or broken state."); 85 } 86 } 87 88 public synchronized RunnableQueue asRunnableQueue() 89 { 90 try 91 { 92 int index = view_turn; 93 view_turn = (view_turn + 1) % rqs.length; 94 return new RunnableQueueView( index ); 95 96 97 98 } 102 catch ( NullPointerException e ) 103 { 104 if ( Debug.DEBUG ) 106 { 107 if ( logger.isLoggable( MLevel.FINE ) ) 108 logger.log( MLevel.FINE, "NullPointerException in asRunnableQueue() -- Probably we're closed.", e ); 109 } 110 this.close( true ); 111 throw new ResourceClosedException("Attempted to use a RoundRobinAsynchronousRunner in a closed or broken state."); 112 } 113 } 114 115 public synchronized void close( boolean skip_remaining_tasks ) 116 { 117 for (int i = 0, len = rqs.length; i < len; ++i) 118 { 119 attemptClose( rqs[i], skip_remaining_tasks ); 120 rqs[i] = null; 121 } 122 } 123 124 public void close() 125 { close( true ); } 126 127 static void attemptClose(RunnableQueue rq, boolean skip_remaining_tasks) 128 { 129 try { rq.close( skip_remaining_tasks ); } 130 catch ( Exception e ) 131 { 132 if ( logger.isLoggable( MLevel.WARNING ) ) 134 logger.log( MLevel.WARNING, "RunnableQueue close FAILED.", e ); 135 } 136 } 137 138 class RunnableQueueView implements RunnableQueue 139 { 140 final int rq_num; 141 142 RunnableQueueView( int rq_num ) 143 { this.rq_num = rq_num; } 144 145 public void postRunnable(Runnable r) 146 { rqs[ rq_num ].postRunnable( r ); } 147 148 public void close( boolean skip_remaining_tasks ) 149 { } 150 151 public void close() 152 { } 153 } 154 } 155 | Popular Tags |