001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PipedInputStream; 024import java.io.PipedOutputStream; 025import java.time.Duration; 026import java.util.Objects; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.TimeUnit; 030 031import org.apache.commons.io.build.AbstractStreamBuilder; 032import org.apache.commons.io.output.QueueOutputStream; 033 034/** 035 * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream. 036 * 037 * <p> 038 * Example usage: 039 * </p> 040 * 041 * <pre> 042 * QueueInputStream inputStream = new QueueInputStream(); 043 * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); 044 * 045 * outputStream.write("hello world".getBytes(UTF_8)); 046 * inputStream.read(); 047 * </pre> 048 * <p> 049 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads. 050 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited. 051 * </p> 052 * <p> 053 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an 054 * {@link IOException}. 055 * </p> 056 * 057 * @see QueueOutputStream 058 * @since 2.9.0 059 */ 060public class QueueInputStream extends InputStream { 061 062 /** 063 * Builds a new {@link QueueInputStream} instance. 064 * <p> 065 * For example: 066 * </p> 067 * 068 * <pre>{@code 069 * QueueInputStream s = QueueInputStream.builder() 070 * .setBlockingQueue(new LinkedBlockingQueue<>()) 071 * .setTimeout(Duration.ZERO) 072 * .get()} 073 * </pre> 074 * <p> 075 * 076 * @since 2.12.0 077 */ 078 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> { 079 080 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); 081 private Duration timeout = Duration.ZERO; 082 083 @Override 084 public QueueInputStream get() { 085 return new QueueInputStream(blockingQueue, timeout); 086 } 087 088 /** 089 * Sets backing queue for the stream. 090 * 091 * @param blockingQueue backing queue for the stream. 092 * @return this 093 */ 094 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) { 095 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>(); 096 return this; 097 } 098 099 /** 100 * Sets the polling timeout. 101 * 102 * @param timeout the polling timeout. 103 * @return this. 104 */ 105 public Builder setTimeout(final Duration timeout) { 106 if (timeout != null && timeout.toNanos() < 0) { 107 throw new IllegalArgumentException("timeout must not be negative"); 108 } 109 this.timeout = timeout != null ? timeout : Duration.ZERO; 110 return this; 111 } 112 113 } 114 115 /** 116 * Constructs a new {@link Builder}. 117 * 118 * @return a new {@link Builder}. 119 * @since 2.12.0 120 */ 121 public static Builder builder() { 122 return new Builder(); 123 } 124 125 private final BlockingQueue<Integer> blockingQueue; 126 127 private final long timeoutNanos; 128 129 /** 130 * Constructs a new instance with no limit to its internal queue size and zero timeout. 131 */ 132 public QueueInputStream() { 133 this(new LinkedBlockingQueue<>()); 134 } 135 136 /** 137 * Constructs a new instance with given queue and zero timeout. 138 * 139 * @param blockingQueue backing queue for the stream. 140 * @deprecated Use {@link #builder()}. 141 */ 142 @Deprecated 143 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { 144 this(blockingQueue, Duration.ZERO); 145 } 146 147 /** 148 * Constructs a new instance with given queue and timeout. 149 * 150 * @param blockingQueue backing queue for the stream. 151 * @param timeout how long to wait before giving up when polling the queue. 152 */ 153 private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) { 154 this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue"); 155 this.timeoutNanos = Objects.requireNonNull(timeout, "timeout").toNanos(); 156 } 157 158 /** 159 * Gets the blocking queue. 160 * 161 * @return the blocking queue. 162 */ 163 BlockingQueue<Integer> getBlockingQueue() { 164 return blockingQueue; 165 } 166 167 /** 168 * Gets the timeout duration. 169 * 170 * @return the timeout duration. 171 */ 172 Duration getTimeout() { 173 return Duration.ofNanos(timeoutNanos); 174 } 175 176 /** 177 * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. 178 * 179 * @return QueueOutputStream connected to this stream. 180 */ 181 public QueueOutputStream newQueueOutputStream() { 182 return new QueueOutputStream(blockingQueue); 183 } 184 185 /** 186 * Reads and returns a single byte. 187 * 188 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available. 189 * @throws IllegalStateException if thread is interrupted while waiting. 190 */ 191 @Override 192 public int read() { 193 try { 194 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS); 195 return value == null ? EOF : 0xFF & value; 196 } catch (final InterruptedException e) { 197 Thread.currentThread().interrupt(); 198 // throw runtime unchecked exception to maintain signature backward-compatibilty of 199 // this read method, which does not declare IOException 200 throw new IllegalStateException(e); 201 } 202 } 203 204}