001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018import java.io.File;
019import java.io.IOException;
020import java.io.InputStream;
021import java.nio.ByteBuffer;
022import java.nio.channels.FileChannel;
023import java.nio.file.Path;
024import java.nio.file.StandardOpenOption;
025import java.util.Objects;
026
027import org.apache.commons.io.IOUtils;
028import org.apache.commons.io.build.AbstractStreamBuilder;
029
030/**
031 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
032 * using {@link java.io.BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
033 * reading a file using NIO, but does not support buffering.
034 * <p>
035 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
036 * </p>
037 *
038 * @since 2.9.0
039 */
040public final class BufferedFileChannelInputStream extends InputStream {
041
042    /**
043     * Builds a new {@link BufferedFileChannelInputStream} instance.
044     * <p>
045     * Using File IO:
046     * </p>
047     *
048     * <pre>{@code
049     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
050     *   .setFile(file)
051     *   .setBufferSize(4096)
052     *   .get()}
053     * </pre>
054     * <p>
055     * Using NIO Path:
056     * </p>
057     *
058     * <pre>{@code
059     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
060     *   .setPath(path)
061     *   .setBufferSize(4096)
062     *   .get()}
063     * </pre>
064     *
065     * @since 2.12.0
066     */
067    public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
068
069        /**
070         * Constructs a new instance.
071         *
072         * @throws UnsupportedOperationException if the origin cannot be converted to a Path.
073         */
074        @Override
075        public BufferedFileChannelInputStream get() throws IOException {
076            return new BufferedFileChannelInputStream(getOrigin().getPath(), getBufferSize());
077        }
078
079    }
080
081    /**
082     * Constructs a new {@link Builder}.
083     *
084     * @return a new {@link Builder}.
085     * @since 2.12.0
086     */
087    public static Builder builder() {
088        return new Builder();
089    }
090
091    private final ByteBuffer byteBuffer;
092
093    private final FileChannel fileChannel;
094
095    /**
096     * Constructs a new instance for the given File.
097     *
098     * @param file The file to stream.
099     * @throws IOException If an I/O error occurs
100     * @deprecated Use {@link #builder()}
101     */
102    @Deprecated
103    public BufferedFileChannelInputStream(final File file) throws IOException {
104        this(file, IOUtils.DEFAULT_BUFFER_SIZE);
105    }
106
107    /**
108     * Constructs a new instance for the given File and buffer size.
109     *
110     * @param file       The file to stream.
111     * @param bufferSize buffer size.
112     * @throws IOException If an I/O error occurs
113     * @deprecated Use {@link #builder()}
114     */
115    @Deprecated
116    public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
117        this(file.toPath(), bufferSize);
118    }
119
120    /**
121     * Constructs a new instance for the given Path.
122     *
123     * @param path The path to stream.
124     * @throws IOException If an I/O error occurs
125     * @deprecated Use {@link #builder()}
126     */
127    @Deprecated
128    public BufferedFileChannelInputStream(final Path path) throws IOException {
129        this(path, IOUtils.DEFAULT_BUFFER_SIZE);
130    }
131
132    /**
133     * Constructs a new instance for the given Path and buffer size.
134     *
135     * @param path       The path to stream.
136     * @param bufferSize buffer size.
137     * @throws IOException If an I/O error occurs
138     * @deprecated Use {@link #builder()}
139     */
140    @Deprecated
141    public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
142        Objects.requireNonNull(path, "path");
143        fileChannel = FileChannel.open(path, StandardOpenOption.READ);
144        byteBuffer = ByteBuffer.allocateDirect(bufferSize);
145        byteBuffer.flip();
146    }
147
148    @Override
149    public synchronized int available() throws IOException {
150        return byteBuffer.remaining();
151    }
152
153    /**
154     * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
155     * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
156     * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
157     * API to manually dispose of these kinds of buffers.
158     *
159     * @param buffer the buffer to clean.
160     */
161    private void clean(final ByteBuffer buffer) {
162        if (buffer.isDirect()) {
163            cleanDirectBuffer(buffer);
164        }
165    }
166
167    /**
168     * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
169     * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
170     * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
171     * reflection.
172     *
173     * @param buffer the buffer to clean. must be a DirectBuffer.
174     */
175    private void cleanDirectBuffer(final ByteBuffer buffer) {
176        if (ByteBufferCleaner.isSupported()) {
177            ByteBufferCleaner.clean(buffer);
178        }
179    }
180
181    @Override
182    public synchronized void close() throws IOException {
183        try {
184            fileChannel.close();
185        } finally {
186            clean(byteBuffer);
187        }
188    }
189
190    @Override
191    public synchronized int read() throws IOException {
192        if (!refill()) {
193            return EOF;
194        }
195        return byteBuffer.get() & 0xFF;
196    }
197
198    @Override
199    public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
200        if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
201            throw new IndexOutOfBoundsException();
202        }
203        if (!refill()) {
204            return EOF;
205        }
206        len = Math.min(len, byteBuffer.remaining());
207        byteBuffer.get(b, offset, len);
208        return len;
209    }
210
211    /**
212     * Checks whether data is left to be read from the input stream.
213     *
214     * @return true if data is left, false otherwise
215     * @throws IOException if an I/O error occurs.
216     */
217    private boolean refill() throws IOException {
218        if (!byteBuffer.hasRemaining()) {
219            byteBuffer.clear();
220            int nRead = 0;
221            while (nRead == 0) {
222                nRead = fileChannel.read(byteBuffer);
223            }
224            byteBuffer.flip();
225            return nRead >= 0;
226        }
227        return true;
228    }
229
230    @Override
231    public synchronized long skip(final long n) throws IOException {
232        if (n <= 0L) {
233            return 0L;
234        }
235        if (byteBuffer.remaining() >= n) {
236            // The buffered content is enough to skip
237            byteBuffer.position(byteBuffer.position() + (int) n);
238            return n;
239        }
240        final long skippedFromBuffer = byteBuffer.remaining();
241        final long toSkipFromFileChannel = n - skippedFromBuffer;
242        // Discard everything we have read in the buffer.
243        byteBuffer.position(0);
244        byteBuffer.flip();
245        return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
246    }
247
248    private long skipFromFileChannel(final long n) throws IOException {
249        final long currentFilePosition = fileChannel.position();
250        final long size = fileChannel.size();
251        if (n > size - currentFilePosition) {
252            fileChannel.position(size);
253            return size - currentFilePosition;
254        }
255        fileChannel.position(currentFilePosition + n);
256        return n;
257    }
258
259}