001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.apache.commons.io.input; 015 016import static org.apache.commons.io.IOUtils.EOF; 017 018// import javax.annotation.concurrent.GuardedBy; 019import java.io.EOFException; 020import java.io.FilterInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InterruptedIOException; 024import java.nio.ByteBuffer; 025import java.util.Objects; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.locks.Condition; 031import java.util.concurrent.locks.ReentrantLock; 032 033import org.apache.commons.io.build.AbstractStreamBuilder; 034 035/** 036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current 037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a 038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we 039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. 040 * <p> 041 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. 042 * </p> 043 * 044 * @since 2.9.0 045 */ 046public class ReadAheadInputStream extends FilterInputStream { 047 048 /** 049 * Builds a new {@link ReadAheadInputStream} instance. 050 * <p> 051 * For example: 052 * </p> 053 * <pre>{@code 054 * ReadAheadInputStream s = ReadAheadInputStream.builder() 055 * .setPath(path) 056 * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread)) 057 * .get()} 058 * </pre> 059 * <p> 060 * @since 2.12.0 061 */ 062 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> { 063 064 private ExecutorService executorService; 065 066 /** 067 * Constructs a new instance. 068 * 069 * @throws UnsupportedOperationException if the origin cannot be converted to an InputStream. 070 */ 071 @SuppressWarnings("resource") 072 @Override 073 public ReadAheadInputStream get() throws IOException { 074 return new ReadAheadInputStream(getOrigin().getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(), 075 executorService == null); 076 } 077 078 /** 079 * Sets the executor service for the read-ahead thread. 080 * 081 * @param executorService the executor service for the read-ahead thread. 082 * @return this 083 */ 084 public Builder setExecutorService(final ExecutorService executorService) { 085 this.executorService = executorService; 086 return this; 087 } 088 089 } 090 091 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]); 092 093 /** 094 * Constructs a new {@link Builder}. 095 * 096 * @return a new {@link Builder}. 097 * @since 2.12.0 098 */ 099 public static Builder builder() { 100 return new Builder(); 101 } 102 103 /** 104 * Creates a new daemon thread. 105 * 106 * @param r the thread's runnable. 107 * @return a new daemon thread. 108 */ 109 private static Thread newDaemonThread(final Runnable r) { 110 final Thread thread = new Thread(r, "commons-io-read-ahead"); 111 thread.setDaemon(true); 112 return thread; 113 } 114 115 /** 116 * Creates a new daemon executor service. 117 * 118 * @return a new daemon executor service. 119 */ 120 private static ExecutorService newExecutorService() { 121 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread); 122 } 123 124 private final ReentrantLock stateChangeLock = new ReentrantLock(); 125 126 // @GuardedBy("stateChangeLock") 127 private ByteBuffer activeBuffer; 128 129 // @GuardedBy("stateChangeLock") 130 private ByteBuffer readAheadBuffer; 131 132 // @GuardedBy("stateChangeLock") 133 private boolean endOfStream; 134 135 // @GuardedBy("stateChangeLock") 136 // true if async read is in progress 137 private boolean readInProgress; 138 139 // @GuardedBy("stateChangeLock") 140 // true if read is aborted due to an exception in reading from underlying input stream. 141 private boolean readAborted; 142 143 // @GuardedBy("stateChangeLock") 144 private Throwable readException; 145 146 // @GuardedBy("stateChangeLock") 147 // whether the close method is called. 148 private boolean isClosed; 149 150 // @GuardedBy("stateChangeLock") 151 // true when the close method will close the underlying input stream. This is valid only if 152 // `isClosed` is true. 153 private boolean isUnderlyingInputStreamBeingClosed; 154 155 // @GuardedBy("stateChangeLock") 156 // whether there is a read ahead task running, 157 private boolean isReading; 158 159 // Whether there is a reader waiting for data. 160 private final AtomicBoolean isWaiting = new AtomicBoolean(false); 161 162 private final ExecutorService executorService; 163 164 private final boolean shutdownExecutorService; 165 166 private final Condition asyncReadComplete = stateChangeLock.newCondition(); 167 168 /** 169 * Creates an instance with the specified buffer size and read-ahead threshold 170 * 171 * @param inputStream The underlying input stream. 172 * @param bufferSizeInBytes The buffer size. 173 * @deprecated Use {@link #builder()} 174 */ 175 @Deprecated 176 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { 177 this(inputStream, bufferSizeInBytes, newExecutorService(), true); 178 } 179 180 /** 181 * Creates an instance with the specified buffer size and read-ahead threshold 182 * 183 * @param inputStream The underlying input stream. 184 * @param bufferSizeInBytes The buffer size. 185 * @param executorService An executor service for the read-ahead thread. 186 * @deprecated Use {@link #builder()} 187 */ 188 @Deprecated 189 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) { 190 this(inputStream, bufferSizeInBytes, executorService, false); 191 } 192 193 /** 194 * Creates an instance with the specified buffer size and read-ahead threshold 195 * 196 * @param inputStream The underlying input stream. 197 * @param bufferSizeInBytes The buffer size. 198 * @param executorService An executor service for the read-ahead thread. 199 * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close. 200 */ 201 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService, 202 final boolean shutdownExecutorService) { 203 super(Objects.requireNonNull(inputStream, "inputStream")); 204 if (bufferSizeInBytes <= 0) { 205 throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); 206 } 207 this.executorService = Objects.requireNonNull(executorService, "executorService"); 208 this.shutdownExecutorService = shutdownExecutorService; 209 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); 210 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); 211 this.activeBuffer.flip(); 212 this.readAheadBuffer.flip(); 213 } 214 215 @Override 216 public int available() throws IOException { 217 stateChangeLock.lock(); 218 // Make sure we have no integer overflow. 219 try { 220 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); 221 } finally { 222 stateChangeLock.unlock(); 223 } 224 } 225 226 private void checkReadException() throws IOException { 227 if (readAborted) { 228 if (readException instanceof IOException) { 229 throw (IOException) readException; 230 } 231 throw new IOException(readException); 232 } 233 } 234 235 @Override 236 public void close() throws IOException { 237 boolean isSafeToCloseUnderlyingInputStream = false; 238 stateChangeLock.lock(); 239 try { 240 if (isClosed) { 241 return; 242 } 243 isClosed = true; 244 if (!isReading) { 245 // Nobody is reading, so we can close the underlying input stream in this method. 246 isSafeToCloseUnderlyingInputStream = true; 247 // Flip this to make sure the read ahead task will not close the underlying input stream. 248 isUnderlyingInputStreamBeingClosed = true; 249 } 250 } finally { 251 stateChangeLock.unlock(); 252 } 253 254 if (shutdownExecutorService) { 255 try { 256 executorService.shutdownNow(); 257 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 258 } catch (final InterruptedException e) { 259 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 260 iio.initCause(e); 261 throw iio; 262 } finally { 263 if (isSafeToCloseUnderlyingInputStream) { 264 super.close(); 265 } 266 } 267 } 268 } 269 270 private void closeUnderlyingInputStreamIfNecessary() { 271 boolean needToCloseUnderlyingInputStream = false; 272 stateChangeLock.lock(); 273 try { 274 isReading = false; 275 if (isClosed && !isUnderlyingInputStreamBeingClosed) { 276 // close method cannot close underlyingInputStream because we were reading. 277 needToCloseUnderlyingInputStream = true; 278 } 279 } finally { 280 stateChangeLock.unlock(); 281 } 282 if (needToCloseUnderlyingInputStream) { 283 try { 284 super.close(); 285 } catch (final IOException ignored) { 286 // TODO Rethrow as UncheckedIOException? 287 } 288 } 289 } 290 291 private boolean isEndOfStream() { 292 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; 293 } 294 295 @Override 296 public int read() throws IOException { 297 if (activeBuffer.hasRemaining()) { 298 // short path - just get one byte. 299 return activeBuffer.get() & 0xFF; 300 } 301 final byte[] oneByteArray = BYTE_ARRAY_1.get(); 302 oneByteArray[0] = 0; 303 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF; 304 } 305 306 @Override 307 public int read(final byte[] b, final int offset, int len) throws IOException { 308 if (offset < 0 || len < 0 || len > b.length - offset) { 309 throw new IndexOutOfBoundsException(); 310 } 311 if (len == 0) { 312 return 0; 313 } 314 315 if (!activeBuffer.hasRemaining()) { 316 // No remaining in active buffer - lock and switch to write ahead buffer. 317 stateChangeLock.lock(); 318 try { 319 waitForAsyncReadComplete(); 320 if (!readAheadBuffer.hasRemaining()) { 321 // The first read. 322 readAsync(); 323 waitForAsyncReadComplete(); 324 if (isEndOfStream()) { 325 return EOF; 326 } 327 } 328 // Swap the newly read ahead buffer in place of empty active buffer. 329 swapBuffers(); 330 // After swapping buffers, trigger another async read for read ahead buffer. 331 readAsync(); 332 } finally { 333 stateChangeLock.unlock(); 334 } 335 } 336 len = Math.min(len, activeBuffer.remaining()); 337 activeBuffer.get(b, offset, len); 338 339 return len; 340 } 341 342 /** 343 * Read data from underlyingInputStream to readAheadBuffer asynchronously. 344 * 345 * @throws IOException if an I/O error occurs. 346 */ 347 private void readAsync() throws IOException { 348 stateChangeLock.lock(); 349 final byte[] arr; 350 try { 351 arr = readAheadBuffer.array(); 352 if (endOfStream || readInProgress) { 353 return; 354 } 355 checkReadException(); 356 readAheadBuffer.position(0); 357 readAheadBuffer.flip(); 358 readInProgress = true; 359 } finally { 360 stateChangeLock.unlock(); 361 } 362 executorService.execute(() -> { 363 stateChangeLock.lock(); 364 try { 365 if (isClosed) { 366 readInProgress = false; 367 return; 368 } 369 // Flip this so that the close method will not close the underlying input stream when we 370 // are reading. 371 isReading = true; 372 } finally { 373 stateChangeLock.unlock(); 374 } 375 376 // Please note that it is safe to release the lock and read into the read ahead buffer 377 // because either of following two conditions will hold: 378 // 379 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. 380 // 381 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits 382 // for this async read to complete. 383 // 384 // So there is no race condition in both the situations. 385 int read = 0; 386 int off = 0, len = arr.length; 387 Throwable exception = null; 388 try { 389 // try to fill the read ahead buffer. 390 // if a reader is waiting, possibly return early. 391 do { 392 read = in.read(arr, off, len); 393 if (read <= 0) { 394 break; 395 } 396 off += read; 397 len -= read; 398 } while (len > 0 && !isWaiting.get()); 399 } catch (final Throwable ex) { 400 exception = ex; 401 if (ex instanceof Error) { 402 // `readException` may not be reported to the user. Rethrow Error to make sure at least 403 // The user can see Error in UncaughtExceptionHandler. 404 throw (Error) ex; 405 } 406 } finally { 407 stateChangeLock.lock(); 408 try { 409 readAheadBuffer.limit(off); 410 if (read < 0 || exception instanceof EOFException) { 411 endOfStream = true; 412 } else if (exception != null) { 413 readAborted = true; 414 readException = exception; 415 } 416 readInProgress = false; 417 signalAsyncReadComplete(); 418 } finally { 419 stateChangeLock.unlock(); 420 } 421 closeUnderlyingInputStreamIfNecessary(); 422 } 423 }); 424 } 425 426 private void signalAsyncReadComplete() { 427 stateChangeLock.lock(); 428 try { 429 asyncReadComplete.signalAll(); 430 } finally { 431 stateChangeLock.unlock(); 432 } 433 } 434 435 @Override 436 public long skip(final long n) throws IOException { 437 if (n <= 0L) { 438 return 0L; 439 } 440 if (n <= activeBuffer.remaining()) { 441 // Only skipping from active buffer is sufficient 442 activeBuffer.position((int) n + activeBuffer.position()); 443 return n; 444 } 445 stateChangeLock.lock(); 446 final long skipped; 447 try { 448 skipped = skipInternal(n); 449 } finally { 450 stateChangeLock.unlock(); 451 } 452 return skipped; 453 } 454 455 /** 456 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before 457 * calling this function. 458 * 459 * @param n the number of bytes to be skipped. 460 * @return the actual number of bytes skipped. 461 * @throws IOException if an I/O error occurs. 462 */ 463 private long skipInternal(final long n) throws IOException { 464 assert stateChangeLock.isLocked(); 465 waitForAsyncReadComplete(); 466 if (isEndOfStream()) { 467 return 0; 468 } 469 if (available() >= n) { 470 // we can skip from the internal buffers 471 int toSkip = (int) n; 472 // We need to skip from both active buffer and read ahead buffer 473 toSkip -= activeBuffer.remaining(); 474 assert toSkip > 0; // skipping from activeBuffer already handled. 475 activeBuffer.position(0); 476 activeBuffer.flip(); 477 readAheadBuffer.position(toSkip + readAheadBuffer.position()); 478 swapBuffers(); 479 // Trigger async read to emptied read ahead buffer. 480 readAsync(); 481 return n; 482 } 483 final int skippedBytes = available(); 484 final long toSkip = n - skippedBytes; 485 activeBuffer.position(0); 486 activeBuffer.flip(); 487 readAheadBuffer.position(0); 488 readAheadBuffer.flip(); 489 final long skippedFromInputStream = in.skip(toSkip); 490 readAsync(); 491 return skippedBytes + skippedFromInputStream; 492 } 493 494 /** 495 * Flips the active and read ahead buffer 496 */ 497 private void swapBuffers() { 498 final ByteBuffer temp = activeBuffer; 499 activeBuffer = readAheadBuffer; 500 readAheadBuffer = temp; 501 } 502 503 private void waitForAsyncReadComplete() throws IOException { 504 stateChangeLock.lock(); 505 try { 506 isWaiting.set(true); 507 // There is only one reader, and one writer, so the writer should signal only once, 508 // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups. 509 while (readInProgress) { 510 asyncReadComplete.await(); 511 } 512 } catch (final InterruptedException e) { 513 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 514 iio.initCause(e); 515 throw iio; 516 } finally { 517 try { 518 isWaiting.set(false); 519 } finally { 520 stateChangeLock.unlock(); 521 } 522 } 523 checkReadException(); 524 } 525}