This value is always in the range {@code 0} through {@code count}. If it is less than {@code + * count}, then {@code buf[pos]} is the next byte to be supplied as input; if it is equal to + * {@code count}, then the next {@code read} or {@code skip} operation will require more bytes to + * be read from the contained input stream. + * + * @see NoSyncBufferedInputStream#buf + */ + protected int pos; + + /** + * The value of the {@code pos} field at the time the last {@code mark} method was called. + * + *
This value is always in the range {@code -1} through {@code pos}. If there is no marked + * position in the input stream, this field is {@code -1}. If there is a marked position in the + * input stream, then {@code buf[markpos]} is the first byte to be supplied as input after a + * {@code reset} operation. If {@code markpos} is not {@code -1}, then all bytes from positions + * {@code buf[markpos]} through {@code buf[pos-1]} must remain in the buffer array (though they + * may be moved to another place in the buffer array, with suitable adjustments to the values of + * {@code count}, {@code pos}, and {@code markpos}); they may not be discarded unless and until + * the difference between {@code pos} and {@code markpos} exceeds {@code marklimit}. + * + * @see NoSyncBufferedInputStream#mark(int) + * @see NoSyncBufferedInputStream#pos + */ + protected int markpos = -1; + + /** + * The maximum read ahead allowed after a call to the {@code mark} method before subsequent calls + * to the {@code reset} method fail. Whenever the difference between {@code pos} and {@code + * markpos} exceeds {@code marklimit}, then the mark may be dropped by setting {@code markpos} to + * {@code -1}. + * + * @see NoSyncBufferedInputStream#mark(int) + * @see NoSyncBufferedInputStream#reset() + */ + protected int marklimit; + + /** + * Check to make sure that underlying input stream has not been nulled out due to close; if not + * return it; + */ + private InputStream getInIfOpen() throws IOException { + InputStream input = in; + if (input == null) throw new IOException("Stream closed"); + return input; + } + + /** Check to make sure that buffer has not been nulled out due to close; if not return it; */ + private byte[] getBufIfOpen() throws IOException { + byte[] buffer = buf; + if (buffer == null) throw new IOException("Stream closed"); + return buffer; + } + + /** + * Creates a {@code BufferedInputStream} and saves its argument, the input stream {@code in}, for + * later use. An internal buffer array is created and stored in {@code buf}. + * + * @param in the underlying input stream. + */ + public NoSyncBufferedInputStream(InputStream in) { + this(in, DEFAULT_BUFFER_SIZE); + } + + /** + * Creates a {@code BufferedInputStream} with the specified buffer size, and saves its argument, + * the input stream {@code in}, for later use. An internal buffer array of length {@code size} is + * created and stored in {@code buf}. + * + * @param in the underlying input stream. + * @param size the buffer size. + * @throws IllegalArgumentException if {@code size <= 0}. + */ + public NoSyncBufferedInputStream(InputStream in, int size) { + super(in); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + buf = new byte[size]; + } + + /** + * Fills the buffer with more data, taking into account shuffling and other tricks for dealing + * with marks. Assumes that it is being called by a synchronized method. This method also assumes + * that all data has already been read in, hence pos > count. + */ + private void fill() throws IOException { + byte[] buffer = getBufIfOpen(); + if (markpos < 0) pos = 0; /* no mark: throw away the buffer */ + else if (pos >= buffer.length) { + /* no room left in buffer */ + if (markpos > 0) { + /* can throw away early part of the buffer */ + int sz = pos - markpos; + System.arraycopy(buffer, markpos, buffer, 0, sz); + pos = sz; + markpos = 0; + } else if (buffer.length >= marklimit) { + markpos = -1; /* buffer got too big, invalidate mark */ + pos = 0; /* drop buffer contents */ + } + } + count = pos; + int n = getInIfOpen().read(buffer, pos, buffer.length - pos); + if (n > 0) count = n + pos; + } + + /** + * See the general contract of the {@code read} method of {@code InputStream}. + * + * @return the next byte of data, or {@code -1} if the end of the stream is reached. + * @throws IOException if this input stream has been closed by invoking its {@link #close()} + * method, or an I/O error occurs. + * @see FilterInputStream#in + */ + public int read() throws IOException { + if (pos >= count) { + fill(); + if (pos >= count) return -1; + } + return getBufIfOpen()[pos++] & 0xff; + } + + /** + * Read characters into a portion of an array, reading from the underlying stream at most once if + * necessary. + */ + private int read1(byte[] b, int off, int len) throws IOException { + int avail = count - pos; + if (avail <= 0) { + /* If the requested length is at least as large as the buffer, and + if there is no mark/reset activity, do not bother to copy the + bytes into the local buffer. In this way buffered streams will + cascade harmlessly. */ + if (len >= getBufIfOpen().length && markpos < 0) { + return getInIfOpen().read(b, off, len); + } + fill(); + avail = count - pos; + if (avail <= 0) return -1; + } + int cnt = (avail < len) ? avail : len; + System.arraycopy(getBufIfOpen(), pos, b, off, cnt); + pos += cnt; + return cnt; + } + + /** + * Reads bytes from this byte-input stream into the specified byte array, starting at the given + * offset. + * + *
This method implements the general contract of the corresponding {@link + * InputStream#read(byte[], int, int) read} method of the {@link InputStream} class. As an + * additional convenience, it attempts to read as many bytes as possible by repeatedly invoking + * the {@code read} method of the underlying stream. This iterated {@code read} continues until + * one of the following conditions becomes true: + * + *
Subclasses of this class are encouraged, but not required, to attempt to read as many bytes + * as possible in the same fashion. + * + * @param b destination buffer. + * @param off offset at which to start storing bytes. + * @param len maximum number of bytes to read. + * @return the number of bytes read, or {@code -1} if the end of the stream has been reached. + * @throws IOException if this input stream has been closed by invoking its {@link #close()} + * method, or an I/O error occurs. + */ + public int read(byte[] b, int off, int len) throws IOException { + getBufIfOpen(); // Check for closed stream + if ((off | len | (off + len) | (b.length - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + int n = 0; + for (; ; ) { + int nread = read1(b, off + n, len - n); + if (nread <= 0) return (n == 0) ? nread : n; + n += nread; + if (n >= len) return n; + // if not closed but no bytes available, return + InputStream input = in; + if (input != null && input.available() <= 0) return n; + } + } + + /** + * See the general contract of the {@code skip} method of {@code InputStream}. + * + * @throws IOException if this input stream has been closed by invoking its {@link #close()} + * method, {@code in.skip(n)} throws an IOException, or an I/O error occurs. + */ + public long skip(long n) throws IOException { + getBufIfOpen(); // Check for closed stream + if (n <= 0) { + return 0; + } + long avail = count - pos; + + if (avail <= 0) { + // If no mark position set then don't keep in buffer + if (markpos < 0) return getInIfOpen().skip(n); + + // Fill in buffer to save bytes for reset + fill(); + avail = count - pos; + if (avail <= 0) return 0; + } + + long skipped = (avail < n) ? avail : n; + pos += skipped; + return skipped; + } + + /** + * Returns an estimate of the number of bytes that can be read (or skipped over) from this input + * stream without blocking by the next invocation of a method for this input stream. The next + * invocation might be the same thread or another thread. A single read or skip of this many bytes + * will not block, but may read or skip fewer bytes. + * + *
This method returns the sum of the number of bytes remaining to be read in the buffer + * ({@code count - pos}) and the result of calling the {@link FilterInputStream#in in}{@code + * .available()}. + * + * @return an estimate of the number of bytes that can be read (or skipped over) from this input + * stream without blocking. + * @throws IOException if this input stream has been closed by invoking its {@link #close()} + * method, or an I/O error occurs. + */ + public int available() throws IOException { + int n = count - pos; + int avail = getInIfOpen().available(); + return n > (Integer.MAX_VALUE - avail) ? Integer.MAX_VALUE : n + avail; + } + + /** + * See the general contract of the {@code mark} method of {@code InputStream}. + * + * @param readlimit the maximum limit of bytes that can be read before the mark position becomes + * invalid. + * @see NoSyncBufferedInputStream#reset() + */ + public void mark(int readlimit) { + marklimit = readlimit; + markpos = pos; + } + + /** + * See the general contract of the {@code reset} method of {@code InputStream}. + * + *
If {@code markpos} is {@code -1} (no mark has been set or the mark has been invalidated), an + * {@code IOException} is thrown. Otherwise, {@code pos} is set equal to {@code markpos}. + * + * @throws IOException if this stream has not been marked or, if the mark has been invalidated, or + * the stream has been closed by invoking its {@link #close()} method, or an I/O error occurs. + * @see NoSyncBufferedInputStream#mark(int) + */ + public void reset() throws IOException { + getBufIfOpen(); // Cause exception if closed + if (markpos < 0) throw new IOException("Resetting to invalid mark"); + pos = markpos; + } + + /** + * Tests if this input stream supports the {@code mark} and {@code reset} methods. The {@code + * markSupported} method of {@code BufferedInputStream} returns {@code true}. + * + * @return a {@code boolean} indicating if this stream type supports the {@code mark} and {@code + * reset} methods. + * @see InputStream#mark(int) + * @see InputStream#reset() + */ + public boolean markSupported() { + return false; + } + + /** + * Closes this input stream and releases any system resources associated with the stream. Once the + * stream has been closed, further read(), available(), reset(), or skip() invocations will throw + * an IOException. Closing a previously closed stream has no effect. + * + * @throws IOException if an I/O error occurs. + */ + public void close() throws IOException { + InputStream input = in; + in = null; + if (input != null) input.close(); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/utils/NoSyncBufferedOutputStream.java b/java/tsfile/src/main/java/org/apache/tsfile/utils/NoSyncBufferedOutputStream.java new file mode 100644 index 000000000..5a7cd216e --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/utils/NoSyncBufferedOutputStream.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.utils; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** + * The class is a copy of java.io.BufferedOutputStream, without adding synchronized to the methods. + * Therefore, the caller must guarantee the concurrent safety. + */ +public class NoSyncBufferedOutputStream extends FilterOutputStream { + /** The internal buffer where data is stored. */ + protected byte buf[]; + + /** + * The number of valid bytes in the buffer. This value is always in the range {@code 0} through + * {@code buf.length}; elements {@code buf[0]} through {@code buf[count-1]} contain valid byte + * data. + */ + protected int count; + + /** + * Creates a new buffered output stream to write data to the specified underlying output stream. + * + * @param out the underlying output stream. + */ + public NoSyncBufferedOutputStream(OutputStream out) { + this(out, 8192); + } + + /** + * Creates a new buffered output stream to write data to the specified underlying output stream + * with the specified buffer size. + * + * @param out the underlying output stream. + * @param size the buffer size. + * @throws IllegalArgumentException if size <= 0. + */ + public NoSyncBufferedOutputStream(OutputStream out, int size) { + super(out); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + buf = new byte[size]; + } + + /** Flush the internal buffer */ + private void flushBuffer() throws IOException { + if (count > 0) { + out.write(buf, 0, count); + count = 0; + } + } + + /** + * Writes the specified byte to this buffered output stream. + * + * @param b the byte to be written. + * @throws IOException if an I/O error occurs. + */ + @Override + public void write(int b) throws IOException { + if (count >= buf.length) { + flushBuffer(); + } + buf[count++] = (byte) b; + } + + /** + * Writes {@code len} bytes from the specified byte array starting at offset {@code off} to this + * buffered output stream. + * + *
Ordinarily this method stores bytes from the given array into this stream's buffer, flushing
+ * the buffer to the underlying output stream as needed. If the requested length is at least as
+ * large as this stream's buffer, however, then this method will flush the buffer and write the
+ * bytes directly to the underlying output stream. Thus redundant {@code BufferedOutputStream}s
+ * will not copy data unnecessarily.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (len >= buf.length) {
+ /* If the request length exceeds the size of the output buffer,
+ flush the output buffer and then write the data directly.
+ In this way buffered streams will cascade harmlessly. */
+ flushBuffer();
+ out.write(b, off, len);
+ return;
+ }
+ if (len > buf.length - count) {
+ flushBuffer();
+ }
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ /**
+ * Flushes this buffered output stream. This forces any buffered output bytes to be written out to
+ * the underlying output stream.
+ *
+ * @throws IOException if an I/O error occurs.
+ * @see FilterOutputStream#out
+ */
+ @Override
+ public void flush() throws IOException {
+ flushBuffer();
+ out.flush();
+ }
+}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/LocalTsFileOutput.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/LocalTsFileOutput.java
index 9b7a18a0a..e6a3d9c89 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/LocalTsFileOutput.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/LocalTsFileOutput.java
@@ -18,7 +18,8 @@
*/
package org.apache.tsfile.write.writer;
-import java.io.BufferedOutputStream;
+import org.apache.tsfile.utils.NoSyncBufferedOutputStream;
+
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -31,12 +32,12 @@
public class LocalTsFileOutput extends OutputStream implements TsFileOutput {
private FileOutputStream outputStream;
- private BufferedOutputStream bufferedStream;
+ private OutputStream bufferedStream;
private long position;
public LocalTsFileOutput(FileOutputStream outputStream) {
this.outputStream = outputStream;
- this.bufferedStream = new BufferedOutputStream(outputStream);
+ this.bufferedStream = new NoSyncBufferedOutputStream(outputStream);
position = 0;
}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index e5d465f8b..f1df08cd7 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -461,7 +461,7 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
currentPath = timeseriesMetadataPair.left;
// build bloom filter
- filter.add(currentPath.getFullPath());
+ filter.add(currentPath);
// construct the index tree node for the series
currentDevice = currentPath.getIDeviceID();
if (!currentDevice.equals(prevDevice)) {
@@ -726,7 +726,7 @@ public int checkMetadataSizeAndMayFlush() throws IOException {
protected int sortAndFlushChunkMetadata() throws IOException {
int writtenSize = 0;
// group by series
- final List