1 21 package org.dbunit.dataset.stream; 22 23 import org.dbunit.DatabaseUnitRuntimeException; 24 import org.dbunit.dataset.*; 25 import org.dbunit.util.concurrent.BoundedBuffer; 26 import org.dbunit.util.concurrent.Channel; 27 import org.dbunit.util.concurrent.Puttable; 28 import org.dbunit.util.concurrent.Takable; 29 30 35 public class StreamingIterator implements ITableIterator 36 { 37 private static final Object EOD = new Object (); 39 private final Takable _channel; 40 private StreamingTable _activeTable; 41 private Object _taken = null; 42 private boolean _eod = false; 43 44 public StreamingIterator(IDataSetProducer source) throws DataSetException 45 { 46 Channel channel = new BoundedBuffer(30); 47 _channel = channel; 48 49 AsynchronousConsumer consumer = new AsynchronousConsumer(source, channel); 50 Thread thread = new Thread (consumer); 51 thread.setDaemon(true); 52 thread.start(); 53 54 try 56 { 57 _taken = _channel.take(); 58 } 59 catch (InterruptedException e) 60 { 61 throw new DataSetException(e); 62 } 63 } 64 65 68 public boolean next() throws DataSetException 69 { 70 if (_eod) 72 { 73 return false; 74 } 75 76 while (_activeTable != null && _activeTable.next()) 78 ; 79 80 if (_taken == EOD) 82 { 83 _eod = true; 84 _activeTable = null; 85 86 return false; 88 } 89 90 if (_taken instanceof ITableMetaData) 92 { 93 _activeTable = new StreamingTable((ITableMetaData)_taken); 94 return true; 95 } 96 97 throw new IllegalStateException ( 98 "Unexpected object taken from asyncronous handler: " + _taken); 99 } 100 101 public ITableMetaData getTableMetaData() throws DataSetException 102 { 103 return _activeTable.getTableMetaData(); 104 } 105 106 public ITable getTable() throws DataSetException 107 { 108 return _activeTable; 109 } 110 111 114 private class StreamingTable extends AbstractTable 115 { 116 private ITableMetaData _metaData; 117 private int _lastRow = -1; 118 private boolean _eot = false; 119 private Object [] _rowValues; 120 121 public StreamingTable(ITableMetaData metaData) 122 { 123 _metaData = metaData; 124 } 125 126 boolean next() throws DataSetException 127 { 128 if (_eot) 130 { 131 return false; 132 } 133 134 try 135 { 136 _taken = _channel.take(); 137 if (!(_taken instanceof Object [])) 138 { 139 _eot = true; 140 return false; 141 } 142 143 _lastRow++; 144 _rowValues = (Object [])_taken; 145 return true; 146 } 147 catch (InterruptedException e) 148 { 149 throw new DataSetException(); 150 } 151 } 152 153 156 public ITableMetaData getTableMetaData() 157 { 158 return _metaData; 159 } 160 161 public int getRowCount() 162 { 163 throw new UnsupportedOperationException (); 164 } 165 166 public Object getValue(int row, String column) throws DataSetException 167 { 168 while (!_eot && row > _lastRow) 170 { 171 next(); 172 } 173 174 if (row < _lastRow) 175 { 176 throw new UnsupportedOperationException ("Cannot go backward!"); 177 } 178 179 if (_eot || row > _lastRow) 180 { 181 throw new RowOutOfBoundsException(row + " > " + _lastRow); 182 } 183 184 return _rowValues[getColumnIndex(column)]; 185 } 186 187 } 188 189 192 private static class AsynchronousConsumer implements Runnable , IDataSetConsumer 193 { 194 private final IDataSetProducer _producer; 195 private final Puttable _channel; 196 197 public AsynchronousConsumer(IDataSetProducer source, Puttable channel) 198 { 199 _producer = source; 200 _channel = channel; 201 } 202 203 206 public void run() 207 { 208 try 209 { 210 _producer.setConsumer(this); 211 _producer.produce(); 212 } 214 catch (DataSetException e) 215 { 216 throw new DatabaseUnitRuntimeException(e); 217 } 218 } 219 220 223 public void startDataSet() throws DataSetException 224 { 225 } 226 227 public void endDataSet() throws DataSetException 228 { 229 try 230 { 231 _channel.put(EOD); 232 } 233 catch (InterruptedException e) 234 { 235 throw new DataSetException(); 236 } 237 } 238 239 public void startTable(ITableMetaData metaData) throws DataSetException 240 { 241 try 242 { 243 _channel.put(metaData); 244 } 245 catch (InterruptedException e) 246 { 247 throw new DataSetException(); 248 } 249 } 250 251 public void endTable() throws DataSetException 252 { 253 } 254 255 public void row(Object [] values) throws DataSetException 256 { 257 try 258 { 259 _channel.put(values); 260 } 261 catch (InterruptedException e) 262 { 263 throw new DataSetException(); 264 } 265 } 266 } 267 } 268 | Popular Tags |