From 9ba5afc26057672fd0491bb3f3e50a55ce1c32b1 Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Tue, 21 Jan 2025 09:52:41 +0100 Subject: [PATCH] Revert "[FLINK-36652][Formats/ORC] Upgrade Apache ORC Version to 1.9.4 (#25711)" This reverts commit 363a69dad3c22c9b7063f4eb9ba9ca17b930432a. --- flink-formats/flink-orc-nohive/pom.xml | 70 -- .../writer/NoHivePhysicalWriterImpl.java | 2 +- .../OrcColumnarRowSplitReaderNoHiveTest.java | 4 +- flink-formats/flink-orc/pom.xml | 36 +- .../orc/writer/OrcBulkWriterFactory.java | 23 +- .../flink/orc/writer/PhysicalWriterImpl.java | 741 +++++------------- .../orc/OrcColumnarRowInputFormatTest.java | 34 +- .../orc/OrcColumnarRowSplitReaderTest.java | 53 +- .../orc/OrcFormatStatisticsReportTest.java | 4 +- .../src/main/resources/META-INF/NOTICE | 10 +- pom.xml | 3 +- 11 files changed, 246 insertions(+), 734 deletions(-) diff --git a/flink-formats/flink-orc-nohive/pom.xml b/flink-formats/flink-orc-nohive/pom.xml index 0e3c9f70b90..e623f8cd31e 100644 --- a/flink-formats/flink-orc-nohive/pom.xml +++ b/flink-formats/flink-orc-nohive/pom.xml @@ -87,76 +87,6 @@ under the License. org.slf4j slf4j-reload4j - - org.slf4j - slf4j-api - - - - - com.google.protobuf - protobuf-java - ${protoc.version} - provided - - - - org.apache.hive - hive-storage-api - ${storage-api.version} - provided - - - org.apache.hadoop - hadoop-hdfs - - - org.apache.hadoop - hadoop-common - - - org.slf4j - slf4j-api - - - - - org.apache.hadoop - hadoop-common - provided - - - ch.qos.reload4j - reload4j - - - org.slf4j - slf4j-reload4j - - - com.google.protobuf - protobuf-java - - - - - - org.apache.hadoop - hadoop-hdfs - provided - - - ch.qos.reload4j - reload4j - - - org.slf4j - slf4j-reload4j - - - com.google.protobuf - protobuf-java - diff --git a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java index 30c417111cb..0734e9bf90b 100644 --- a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java +++ b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java @@ -21,9 +21,9 @@ package org.apache.flink.orc.nohive.writer; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.orc.writer.PhysicalWriterImpl; +import com.google.protobuf25.CodedOutputStream; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; -import org.apache.orc.protobuf.CodedOutputStream; import java.io.IOException; diff --git a/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java b/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java index de2f4fd8662..425ae3105a3 100644 --- a/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java +++ b/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Map; +import java.util.stream.IntStream; /** Test for {@link OrcColumnarRowSplitReader}. */ class OrcColumnarRowSplitReaderNoHiveTest extends OrcColumnarRowSplitReaderTest { @@ -99,13 +100,12 @@ class OrcColumnarRowSplitReaderNoHiveTest extends OrcColumnarRowSplitReaderTest protected OrcColumnarRowSplitReader createReader( int[] selectedFields, DataType[] fullTypes, - String[] fullNames, Map partitionSpec, FileInputSplit split) throws IOException { return OrcNoHiveSplitReaderUtil.genPartColumnarRowReader( new Configuration(), - fullNames, + IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new), fullTypes, partitionSpec, selectedFields, diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml index d5176a8a036..3f92726ba62 100644 --- a/flink-formats/flink-orc/pom.xml +++ b/flink-formats/flink-orc/pom.xml @@ -101,11 +101,7 @@ under the License. org.slf4j - slf4j-api - - - org.apache.commons - commons-lang3 + slf4j-reload4j @@ -129,10 +125,6 @@ under the License. org.slf4j slf4j-reload4j - - com.google.protobuf - protobuf-java - @@ -149,32 +141,6 @@ under the License. org.slf4j slf4j-reload4j - - com.google.protobuf - protobuf-java - - - - - - com.google.protobuf - protobuf-java - ${protoc.version} - - - - org.apache.hive - hive-storage-api - ${storage-api.version} - - - org.apache.hadoop - hadoop-hdfs - - - org.slf4j - slf4j-api - diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java index 010deb824a0..5e4310107ab 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java @@ -28,10 +28,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.orc.OrcFile; import org.apache.orc.impl.WriterImpl; -import org.apache.orc.impl.writer.WriterEncryptionVariant; import java.io.IOException; -import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -98,29 +96,14 @@ public class OrcBulkWriterFactory implements BulkWriter.Factory { @Override public BulkWriter create(FSDataOutputStream out) throws IOException { OrcFile.WriterOptions opts = getWriterOptions(); - PhysicalWriterImpl physicalWriter = new PhysicalWriterImpl(out, opts); - opts.physicalWriter(physicalWriter); + opts.physicalWriter(new PhysicalWriterImpl(out, opts)); + // The path of the Writer is not used to indicate the destination file // in this case since we have used a dedicated physical writer to write // to the give output stream directly. However, the path would be used as // the key of writer in the ORC memory manager, thus we need to make it unique. Path unusedPath = new Path(UUID.randomUUID().toString()); - WriterImpl writer = new WriterImpl(null, unusedPath, opts); - - // Obtaining encryption variant from Writer, and setting encryption variant for - // physicalWriter. - try { - Field encryptionFiled = WriterImpl.class.getDeclaredField("encryption"); - encryptionFiled.setAccessible(true); - WriterEncryptionVariant[] encryption = - (WriterEncryptionVariant[]) encryptionFiled.get(writer); - physicalWriter.setEncryptionVariant(encryption); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException( - "Can not access to the encryption field in Class org.apache.orc.impl.WriterImpl", - e); - } - return new OrcBulkWriter<>(vectorizer, writer); + return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, unusedPath, opts)); } @VisibleForTesting diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java index 7dd8b378964..56e16c99c0a 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java @@ -21,24 +21,16 @@ package org.apache.flink.orc.writer; import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.FSDataOutputStream; -import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; import org.apache.orc.CompressionCodec; -import org.apache.orc.EncryptionVariant; +import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; import org.apache.orc.PhysicalWriter; -import org.apache.orc.TypeDescription; -import org.apache.orc.impl.CryptoUtils; import org.apache.orc.impl.HadoopShims; import org.apache.orc.impl.OrcCodecPool; import org.apache.orc.impl.OutStream; -import org.apache.orc.impl.SerializationUtils; import org.apache.orc.impl.StreamName; -import org.apache.orc.impl.WriterImpl; -import org.apache.orc.impl.writer.StreamOptions; -import org.apache.orc.impl.writer.WriterEncryptionKey; -import org.apache.orc.impl.writer.WriterEncryptionVariant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +38,12 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; +import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize; + /** * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}. * @@ -62,227 +55,247 @@ import java.util.TreeMap; */ @Internal public class PhysicalWriterImpl implements PhysicalWriter { - private static final Logger LOG = LoggerFactory.getLogger(PhysicalWriterImpl.class); - private static final int HDFS_BUFFER_SIZE = 256 * 1024; + private static final Logger LOG = LoggerFactory.getLogger(PhysicalWriterImpl.class); private static final byte[] ZEROS = new byte[64 * 1024]; - - private FSDataOutputStream rawWriter; - private final DirectStream rawStream; + private static final int HDFS_BUFFER_SIZE = 256 * 1024; protected final OutStream writer; - private final CodedOutputStream codedCompressStream; - + private final CodedOutputStream protobufWriter; + private final CompressionKind compress; + private final Map streams; private final HadoopShims shims; - private final long blockSize; private final int maxPadding; - private final StreamOptions compress; - private final OrcFile.CompressionStrategy compressionStrategy; + private final int bufferSize; + private final long blockSize; private final boolean addBlockPadding; private final boolean writeVariableLengthBlocks; - private final VariantTracker unencrypted; + private CompressionCodec codec; + private FSDataOutputStream out; private long headerLength; private long stripeStart; - private long blockOffset; private int metadataLength; - private int stripeStatisticsLength = 0; private int footerLength; - private int stripeNumber = 0; - - private final Map variants = new TreeMap<>(); public PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) throws IOException { - this(out, opts, new WriterEncryptionVariant[0]); - } - - public PhysicalWriterImpl( - FSDataOutputStream out, - OrcFile.WriterOptions opts, - WriterEncryptionVariant[] encryption) - throws IOException { - this.rawWriter = out; - long defaultStripeSize = opts.getStripeSize(); - this.addBlockPadding = opts.getBlockPadding(); if (opts.isEnforceBufferSize()) { - this.compress = new StreamOptions(opts.getBufferSize()); + this.bufferSize = opts.getBufferSize(); } else { - this.compress = - new StreamOptions( - WriterImpl.getEstimatedBufferSize( - defaultStripeSize, - opts.getSchema().getMaximumId() + 1, - opts.getBufferSize())); - } - CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress()); - if (codec != null) { - compress.withCodec(codec, codec.getDefaultOptions()); + this.bufferSize = + getEstimatedBufferSize( + opts.getStripeSize(), + opts.getSchema().getMaximumId() + 1, + opts.getBufferSize()); } - this.compressionStrategy = opts.getCompressionStrategy(); - this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize); + + this.out = out; + this.blockOffset = 0; this.blockSize = opts.getBlockSize(); - blockOffset = 0; - unencrypted = new VariantTracker(opts.getSchema(), compress); - writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks(); - shims = opts.getHadoopShims(); - rawStream = new DirectStream(rawWriter); - writer = new OutStream("stripe footer", compress, rawStream); - codedCompressStream = CodedOutputStream.newInstance(writer); - for (WriterEncryptionVariant variant : encryption) { - WriterEncryptionKey key = variant.getKeyDescription(); - StreamOptions encryptOptions = - new StreamOptions(unencrypted.options) - .withEncryption(key.getAlgorithm(), variant.getFileFooterKey()); - variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions)); - } + this.maxPadding = (int) (opts.getPaddingTolerance() * (double) opts.getBufferSize()); + this.compress = opts.getCompress(); + this.codec = OrcCodecPool.getCodec(this.compress); + this.streams = new TreeMap<>(); + this.writer = + new OutStream("metadata", this.bufferSize, this.codec, new DirectStream(this.out)); + this.shims = opts.getHadoopShims(); + this.addBlockPadding = opts.getBlockPadding(); + this.protobufWriter = CodedOutputStream.newInstance(this.writer); + this.writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks(); } - public void setEncryptionVariant(WriterEncryptionVariant[] encryption) { - if (encryption == null) { - return; - } - for (WriterEncryptionVariant variant : encryption) { - WriterEncryptionKey key = variant.getKeyDescription(); - StreamOptions encryptOptions = - new StreamOptions(unencrypted.options) - .withEncryption(key.getAlgorithm(), variant.getFileFooterKey()); - variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions)); - } + @Override + public void writeHeader() throws IOException { + this.out.write("ORC".getBytes()); + this.headerLength = this.out.getPos(); } - protected static class VariantTracker { - // the streams that make up the current stripe - protected final Map streams = new TreeMap<>(); - private final int rootColumn; - private final int lastColumn; - protected final StreamOptions options; - // a list for each column covered by this variant - // the elements in the list correspond to each stripe in the file - protected final List[] stripeStats; - protected final List stripeStatsStreams = new ArrayList<>(); - protected final OrcProto.ColumnStatistics[] fileStats; - - VariantTracker(TypeDescription schema, StreamOptions options) { - rootColumn = schema.getId(); - lastColumn = schema.getMaximumId(); - this.options = options; - stripeStats = new List[schema.getMaximumId() - schema.getId() + 1]; - for (int i = 0; i < stripeStats.length; ++i) { - stripeStats[i] = new ArrayList<>(); - } - fileStats = new OrcProto.ColumnStatistics[stripeStats.length]; - } + @Override + public OutputReceiver createDataStream(StreamName name) throws IOException { + BufferedStream result = streams.get(name); - public BufferedStream createStream(StreamName name) { - BufferedStream result = new BufferedStream(); + if (result == null) { + result = new BufferedStream(); streams.put(name, result); - return result; } - /** - * Place the streams in the appropriate area while updating the sizes with the number of - * bytes in the area. - * - * @param area the area to write - * @param sizes the sizes of the areas - * @return the list of stream descriptions to add - */ - public List placeStreams(StreamName.Area area, SizeCounters sizes) { - List result = new ArrayList<>(streams.size()); - for (Map.Entry stream : streams.entrySet()) { - StreamName name = stream.getKey(); - BufferedStream bytes = stream.getValue(); - if (name.getArea() == area && !bytes.isSuppressed) { - OrcProto.Stream.Builder builder = OrcProto.Stream.newBuilder(); - long size = bytes.getOutputSize(); - if (area == StreamName.Area.INDEX) { - sizes.index += size; - } else { - sizes.data += size; - } - builder.setColumn(name.getColumn()).setKind(name.getKind()).setLength(size); - result.add(builder.build()); - } - } - return result; - } + return result; + } - /** - * Write the streams in the appropriate area. - * - * @param area the area to write - * @param raw the raw stream to write to - */ - public void writeStreams(StreamName.Area area, FSDataOutputStream raw) throws IOException { - for (Map.Entry stream : streams.entrySet()) { - if (stream.getKey().getArea() == area) { - stream.getValue().spillToDiskAndClear(raw); + @Override + public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec) + throws IOException { + OutputStream stream = + new OutStream(this.toString(), bufferSize, codec, createDataStream(name)); + index.build().writeTo(stream); + stream.flush(); + } + + @Override + public void writeBloomFilter( + StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec) + throws IOException { + OutputStream stream = + new OutStream(this.toString(), bufferSize, codec, createDataStream(name)); + bloom.build().writeTo(stream); + stream.flush(); + } + + @Override + public void finalizeStripe( + OrcProto.StripeFooter.Builder footerBuilder, + OrcProto.StripeInformation.Builder dirEntry) + throws IOException { + long indexSize = 0; + long dataSize = 0; + + for (Map.Entry pair : streams.entrySet()) { + BufferedStream receiver = pair.getValue(); + if (!receiver.isSuppressed) { + long streamSize = receiver.getOutputSize(); + StreamName name = pair.getKey(); + footerBuilder.addStreams( + OrcProto.Stream.newBuilder() + .setColumn(name.getColumn()) + .setKind(name.getKind()) + .setLength(streamSize)); + if (StreamName.Area.INDEX == name.getArea()) { + indexSize += streamSize; + } else { + dataSize += streamSize; } } } - /** - * Computed the size of the given column on disk for this stripe. It excludes the index - * streams. - * - * @param column a column id - * @return the total number of bytes - */ - public long getFileBytes(int column) { - long result = 0; - if (column >= rootColumn && column <= lastColumn) { - for (Map.Entry entry : streams.entrySet()) { - StreamName name = entry.getKey(); - if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) { - result += entry.getValue().getOutputSize(); - } - } - } - return result; + dirEntry.setIndexLength(indexSize).setDataLength(dataSize); + OrcProto.StripeFooter footer = footerBuilder.build(); + // Do we need to pad the file so the stripe doesn't straddle a block boundary? + padStripe(indexSize + dataSize + footer.getSerializedSize()); + + // write out the data streams + for (Map.Entry pair : streams.entrySet()) { + pair.getValue().spillToDiskAndClear(out); } + + // Write out the footer. + writeStripeFooter(footer, dataSize, indexSize, dirEntry); + } + + @Override + public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException { + long startPosition = out.getPos(); + OrcProto.Metadata metadata = builder.build(); + writeMetadata(metadata); + this.metadataLength = (int) (out.getPos() - startPosition); + } + + @Override + public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException { + long bodyLength = out.getPos() - metadataLength; + builder.setContentLength(bodyLength); + builder.setHeaderLength(headerLength); + long startPosition = out.getPos(); + OrcProto.Footer footer = builder.build(); + writeFileFooter(footer); + this.footerLength = (int) (out.getPos() - startPosition); } - VariantTracker getVariant(EncryptionVariant column) { - if (column == null) { - return unencrypted; + @Override + public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException { + builder.setFooterLength(footerLength); + builder.setMetadataLength(metadataLength); + + OrcProto.PostScript ps = builder.build(); + // need to write this uncompressed + long startPosition = out.getPos(); + ps.writeTo(out); + long length = out.getPos() - startPosition; + + if (length > 255) { + throw new IllegalArgumentException("PostScript too large at " + length); } - return variants.get(column); + + out.write((int) length); + return out.getPos(); } @Override - public long getFileBytes(int column, WriterEncryptionVariant variant) { - return getVariant(variant).getFileBytes(column); + public void close() { + // Just release the codec but don't close the internal stream here to avoid + // Stream Closed or ClosedChannelException when Flink performs checkpoint. + OrcCodecPool.returnCodec(compress, codec); + codec = null; } @Override - public StreamOptions getStreamOptions() { - return unencrypted.options; + public void flush() throws IOException { + out.flush(); } - private static void writeZeros(OutputStream output, long remaining) throws IOException { - while (remaining > 0) { - long size = Math.min(ZEROS.length, remaining); - output.write(ZEROS, 0, (int) size); - remaining -= size; + @Override + public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry) + throws IOException { + long start = out.getPos(); + int length = buffer.remaining(); + long availBlockSpace = blockSize - (start % blockSize); + + // see if stripe can fit in the current hdfs block, else pad the remaining + // space in the block + if (length < blockSize && length > availBlockSpace && addBlockPadding) { + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; + LOG.info(String.format("Padding ORC by %d bytes while merging..", availBlockSpace)); + start += availBlockSpace; + while (availBlockSpace > 0) { + int writeLen = (int) Math.min(availBlockSpace, pad.length); + out.write(pad, 0, writeLen); + availBlockSpace -= writeLen; + } + } + + out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length); + dirEntry.setOffset(start); + } + + @Override + public CompressionCodec getCompressionCodec() { + return this.codec; + } + + @Override + public long getFileBytes(int column) { + long size = 0; + + for (final Map.Entry pair : streams.entrySet()) { + final BufferedStream receiver = pair.getValue(); + if (!receiver.isSuppressed) { + + final StreamName name = pair.getKey(); + if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) { + size += receiver.getOutputSize(); + } + } } + + return size; } private void padStripe(long stripeSize) throws IOException { - this.stripeStart = rawWriter.getPos(); + this.stripeStart = out.getPos(); long previousBytesInBlock = (stripeStart - blockOffset) % blockSize; + // We only have options if this isn't the first stripe in the block if (previousBytesInBlock > 0) { if (previousBytesInBlock + stripeSize >= blockSize) { // Try making a short block - if (writeVariableLengthBlocks && shims.endVariableLengthBlock(rawWriter)) { + if (writeVariableLengthBlocks && shims.endVariableLengthBlock(out)) { blockOffset = stripeStart; } else if (addBlockPadding) { // if we cross the block boundary, figure out what we should do long padding = blockSize - previousBytesInBlock; if (padding <= maxPadding) { - writeZeros(rawWriter, padding); + writeZeros(out, padding); stripeStart += padding; } } @@ -290,225 +303,62 @@ public class PhysicalWriterImpl implements PhysicalWriter { } } - private static class DirectStream implements OutputReceiver { - private final FSDataOutputStream output; - - DirectStream(FSDataOutputStream output) { - this.output = output; - } - - @Override - public void output(ByteBuffer buffer) throws IOException { - output.write( - buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); - } - - @Override - public void suppress() { - throw new UnsupportedOperationException("Can't suppress direct stream"); - } - } - private void writeStripeFooter( OrcProto.StripeFooter footer, - SizeCounters sizes, + long dataSize, + long indexSize, OrcProto.StripeInformation.Builder dirEntry) throws IOException { writeStripeFooter(footer); + dirEntry.setOffset(stripeStart); - dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - sizes.total()); + dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize - indexSize); } protected void writeMetadata(OrcProto.Metadata metadata) throws IOException { - metadata.writeTo(codedCompressStream); - codedCompressStream.flush(); + metadata.writeTo(protobufWriter); + protobufWriter.flush(); writer.flush(); } protected void writeFileFooter(OrcProto.Footer footer) throws IOException { - footer.writeTo(codedCompressStream); - codedCompressStream.flush(); + footer.writeTo(protobufWriter); + protobufWriter.flush(); writer.flush(); } protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException { - footer.writeTo(codedCompressStream); - codedCompressStream.flush(); + footer.writeTo(protobufWriter); + protobufWriter.flush(); writer.flush(); } - static void writeEncryptedStripeStatistics( - DirectStream output, int stripeNumber, VariantTracker tracker) throws IOException { - StreamOptions options = new StreamOptions(tracker.options); - tracker.stripeStatsStreams.clear(); - for (int col = tracker.rootColumn; - col < tracker.rootColumn + tracker.stripeStats.length; - ++col) { - options.modifyIv( - CryptoUtils.modifyIvForStream( - col, OrcProto.Stream.Kind.STRIPE_STATISTICS, stripeNumber + 1)); - OutStream stream = new OutStream("stripe stats for " + col, options, output); - OrcProto.ColumnarStripeStatistics stats = - OrcProto.ColumnarStripeStatistics.newBuilder() - .addAllColStats(tracker.stripeStats[col - tracker.rootColumn]) - .build(); - long start = output.output.getPos(); - stats.writeTo(stream); - stream.flush(); - OrcProto.Stream description = - OrcProto.Stream.newBuilder() - .setColumn(col) - .setKind(OrcProto.Stream.Kind.STRIPE_STATISTICS) - .setLength(output.output.getPos() - start) - .build(); - tracker.stripeStatsStreams.add(description); - } - } - - static void setUnencryptedStripeStatistics( - OrcProto.Metadata.Builder builder, - int stripeCount, - List[] stats) { - // Make the unencrypted stripe stats into lists of StripeStatistics. - builder.clearStripeStats(); - for (int s = 0; s < stripeCount; ++s) { - OrcProto.StripeStatistics.Builder stripeStats = OrcProto.StripeStatistics.newBuilder(); - for (List col : stats) { - stripeStats.addColStats(col.get(s)); - } - builder.addStripeStats(stripeStats.build()); - } - } - - static void setEncryptionStatistics( - OrcProto.Encryption.Builder encryption, - int stripeNumber, - Collection variants) - throws IOException { - int v = 0; - for (VariantTracker variant : variants) { - OrcProto.EncryptionVariant.Builder variantBuilder = encryption.getVariantsBuilder(v++); - - // Add the stripe statistics streams to the variant description. - variantBuilder.clearStripeStatistics(); - variantBuilder.addAllStripeStatistics(variant.stripeStatsStreams); - - // Serialize and encrypt the file statistics. - OrcProto.FileStatistics.Builder file = OrcProto.FileStatistics.newBuilder(); - for (OrcProto.ColumnStatistics col : variant.fileStats) { - file.addColumn(col); - } - StreamOptions options = new StreamOptions(variant.options); - options.modifyIv( - CryptoUtils.modifyIvForStream( - variant.rootColumn, - OrcProto.Stream.Kind.FILE_STATISTICS, - stripeNumber + 1)); - BufferedStream buffer = new BufferedStream(); - OutStream stream = new OutStream("stats for " + variant, options, buffer); - file.build().writeTo(stream); - stream.flush(); - variantBuilder.setFileStatistics(buffer.getBytes()); - } - } - - @Override - public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException { - long stripeStatisticsStart = rawWriter.getPos(); - for (VariantTracker variant : variants.values()) { - writeEncryptedStripeStatistics(rawStream, stripeNumber, variant); - } - setUnencryptedStripeStatistics(builder, stripeNumber, unencrypted.stripeStats); - long metadataStart = rawWriter.getPos(); - writeMetadata(builder.build()); - this.stripeStatisticsLength = (int) (metadataStart - stripeStatisticsStart); - this.metadataLength = (int) (rawWriter.getPos() - metadataStart); - } - - static void addUnencryptedStatistics( - OrcProto.Footer.Builder builder, OrcProto.ColumnStatistics[] stats) { - for (OrcProto.ColumnStatistics stat : stats) { - builder.addStatistics(stat); + private static void writeZeros(OutputStream output, long remaining) throws IOException { + while (remaining > 0) { + long size = Math.min(ZEROS.length, remaining); + output.write(ZEROS, 0, (int) size); + remaining -= size; } } - @Override - public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException { - if (!variants.isEmpty()) { - OrcProto.Encryption.Builder encryption = builder.getEncryptionBuilder(); - setEncryptionStatistics(encryption, stripeNumber, variants.values()); - } - addUnencryptedStatistics(builder, unencrypted.fileStats); - long bodyLength = rawWriter.getPos() - metadataLength - stripeStatisticsLength; - builder.setContentLength(bodyLength); - builder.setHeaderLength(headerLength); - long startPosn = rawWriter.getPos(); - OrcProto.Footer footer = builder.build(); - writeFileFooter(footer); - this.footerLength = (int) (rawWriter.getPos() - startPosn); - } + private static class DirectStream implements OutputReceiver { + private final FSDataOutputStream output; - @Override - public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException { - builder.setFooterLength(footerLength); - builder.setMetadataLength(metadataLength); - if (!variants.isEmpty()) { - builder.setStripeStatisticsLength(stripeStatisticsLength); - } - OrcProto.PostScript ps = builder.build(); - // need to write this uncompressed - long startPosn = rawWriter.getPos(); - ps.writeTo(rawWriter); - long length = rawWriter.getPos() - startPosn; - if (length > 255) { - throw new IllegalArgumentException("PostScript too large at " + length); + DirectStream(FSDataOutputStream output) { + this.output = output; } - rawWriter.write((int) length); - return rawWriter.getPos(); - } - @Override - public void close() throws IOException { - // Just release the codec but don't close the internal stream here to avoid - // Stream Closed or ClosedChannelException when Flink performs checkpoint. - - CompressionCodec codec = compress.getCodec(); - if (codec != null) { - OrcCodecPool.returnCodec(codec.getKind(), codec); + public void output(ByteBuffer buffer) throws IOException { + this.output.write( + buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); } - compress.withCodec(null, null); - } - @Override - public void flush() throws IOException { - rawWriter.flush(); - } - - @Override - public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry) - throws IOException { - long start = rawWriter.getPos(); - int length = buffer.remaining(); - long availBlockSpace = blockSize - (start % blockSize); - - // see if stripe can fit in the current hdfs block, else pad the remaining - // space in the block - if (length < blockSize && length > availBlockSpace && addBlockPadding) { - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; - LOG.debug("Padding ORC by {} bytes while merging", availBlockSpace); - start += availBlockSpace; - while (availBlockSpace > 0) { - int writeLen = (int) Math.min(availBlockSpace, pad.length); - rawWriter.write(pad, 0, writeLen); - availBlockSpace -= writeLen; - } + public void suppress() { + throw new UnsupportedOperationException("Can't suppress direct stream"); } - rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length); - dirEntry.setOffset(start); - stripeNumber += 1; } - static final class BufferedStream implements OutputReceiver { + private static final class BufferedStream implements OutputReceiver { private boolean isSuppressed = false; private final List output = new ArrayList<>(); @@ -519,18 +369,12 @@ public class PhysicalWriterImpl implements PhysicalWriter { } } - @Override public void suppress() { isSuppressed = true; output.clear(); } - /** - * Write any saved buffers to the OutputStream if needed, and clears all the buffers. - * - * @return true if the stream was written - */ - boolean spillToDiskAndClear(FSDataOutputStream raw) throws IOException { + void spillToDiskAndClear(FSDataOutputStream raw) throws IOException { if (!isSuppressed) { for (ByteBuffer buffer : output) { raw.write( @@ -539,38 +383,10 @@ public class PhysicalWriterImpl implements PhysicalWriter { buffer.remaining()); } output.clear(); - return true; } isSuppressed = false; - return false; - } - - /** - * Get the buffer as a protobuf ByteString and clears the BufferedStream. - * - * @return the bytes - */ - ByteString getBytes() { - int len = output.size(); - if (len == 0) { - return ByteString.EMPTY; - } else { - ByteString result = ByteString.copyFrom(output.get(0)); - for (int i = 1; i < output.size(); ++i) { - result = result.concat(ByteString.copyFrom(output.get(i))); - } - output.clear(); - return result; - } } - /** - * Get the number of bytes that will be written to the output. - * - *

Assumes the stream writing into this receiver has already been flushed. - * - * @return number of bytes - */ public long getOutputSize() { long result = 0; for (ByteBuffer buffer : output) { @@ -579,141 +395,4 @@ public class PhysicalWriterImpl implements PhysicalWriter { return result; } } - - static class SizeCounters { - long index = 0; - long data = 0; - - long total() { - return index + data; - } - } - - void buildStreamList(OrcProto.StripeFooter.Builder footerBuilder, SizeCounters sizes) - throws IOException { - footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.INDEX, sizes)); - final long unencryptedIndexSize = sizes.index; - int v = 0; - for (VariantTracker variant : variants.values()) { - OrcProto.StripeEncryptionVariant.Builder builder = - footerBuilder.getEncryptionBuilder(v++); - builder.addAllStreams(variant.placeStreams(StreamName.Area.INDEX, sizes)); - } - if (sizes.index != unencryptedIndexSize) { - // add a placeholder that covers the hole where the encrypted indexes are - footerBuilder.addStreams( - OrcProto.Stream.newBuilder() - .setKind(OrcProto.Stream.Kind.ENCRYPTED_INDEX) - .setLength(sizes.index - unencryptedIndexSize)); - } - footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.DATA, sizes)); - final long unencryptedDataSize = sizes.data; - v = 0; - for (VariantTracker variant : variants.values()) { - OrcProto.StripeEncryptionVariant.Builder builder = - footerBuilder.getEncryptionBuilder(v++); - builder.addAllStreams(variant.placeStreams(StreamName.Area.DATA, sizes)); - } - if (sizes.data != unencryptedDataSize) { - // add a placeholder that covers the hole where the encrypted indexes are - footerBuilder.addStreams( - OrcProto.Stream.newBuilder() - .setKind(OrcProto.Stream.Kind.ENCRYPTED_DATA) - .setLength(sizes.data - unencryptedDataSize)); - } - } - - @Override - public void finalizeStripe( - OrcProto.StripeFooter.Builder footerBuilder, - OrcProto.StripeInformation.Builder dirEntry) - throws IOException { - SizeCounters sizes = new SizeCounters(); - buildStreamList(footerBuilder, sizes); - - OrcProto.StripeFooter footer = footerBuilder.build(); - - // Do we need to pad the file so the stripe doesn't straddle a block boundary? - padStripe(sizes.total() + footer.getSerializedSize()); - - // write the unencrypted index streams - unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter); - // write the encrypted index streams - for (VariantTracker variant : variants.values()) { - variant.writeStreams(StreamName.Area.INDEX, rawWriter); - } - - // write the unencrypted data streams - unencrypted.writeStreams(StreamName.Area.DATA, rawWriter); - // write out the unencrypted data streams - for (VariantTracker variant : variants.values()) { - variant.writeStreams(StreamName.Area.DATA, rawWriter); - } - - // Write out the footer. - writeStripeFooter(footer, sizes, dirEntry); - - // fill in the data sizes - dirEntry.setDataLength(sizes.data); - dirEntry.setIndexLength(sizes.index); - - stripeNumber += 1; - } - - @Override - public void writeHeader() throws IOException { - rawWriter.write(OrcFile.MAGIC.getBytes()); - headerLength = rawWriter.getPos(); - } - - @Override - public BufferedStream createDataStream(StreamName name) { - VariantTracker variant = getVariant(name.getEncryption()); - BufferedStream result = variant.streams.get(name); - if (result == null) { - result = new BufferedStream(); - variant.streams.put(name, result); - } - return result; - } - - protected OutputStream createIndexStream(StreamName name) { - BufferedStream buffer = createDataStream(name); - VariantTracker tracker = getVariant(name.getEncryption()); - StreamOptions options = - SerializationUtils.getCustomizedCodec( - tracker.options, compressionStrategy, name.getKind()); - if (options.isEncrypted()) { - if (options == tracker.options) { - options = new StreamOptions(options); - } - options.modifyIv(CryptoUtils.modifyIvForStream(name, stripeNumber + 1)); - } - return new OutStream(name.toString(), options, buffer); - } - - @Override - public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index) throws IOException { - OutputStream stream = createIndexStream(name); - index.build().writeTo(stream); - stream.flush(); - } - - @Override - public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom) - throws IOException { - OutputStream stream = createIndexStream(name); - bloom.build().writeTo(stream); - stream.flush(); - } - - @Override - public void writeStatistics(StreamName name, OrcProto.ColumnStatistics.Builder statistics) { - VariantTracker tracker = getVariant(name.getEncryption()); - if (name.getKind() == OrcProto.Stream.Kind.FILE_STATISTICS) { - tracker.fileStats[name.getColumn() - tracker.rootColumn] = statistics.build(); - } else { - tracker.stripeStats[name.getColumn() - tracker.rootColumn].add(statistics.build()); - } - } } diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java index 04d6bd74ab5..2b1458c8179 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java @@ -204,26 +204,20 @@ class OrcColumnarRowInputFormatTest { RowType tableType = RowType.of( - new LogicalType[] { - /* 0 */ DataTypes.INT().getLogicalType(), - /* 1 */ DataTypes.INT().getLogicalType(), // part-1 - /* 2 */ DataTypes.STRING().getLogicalType(), - /* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2 - /* 4 */ DataTypes.STRING().getLogicalType(), - /* 5 */ DataTypes.STRING().getLogicalType(), // part-3 - /* 6 */ DataTypes.STRING().getLogicalType(), - /* 7 */ DataTypes.INT().getLogicalType(), - /* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), // part-4 - /* 9 */ DataTypes.STRING().getLogicalType(), - /* 10*/ DataTypes.INT().getLogicalType(), - /* 11*/ DataTypes.INT().getLogicalType(), - /* 12*/ DataTypes.STRING().getLogicalType(), // part-5 - /* 13*/ DataTypes.INT().getLogicalType() - }, - new String[] { - "_col0", "f1", "_col1", "f3", "_col2", "f5", "_col3", "_col4", "f8", - "_col5", "_col6", "_col7", "f13", "_col8" - }); + /* 0 */ DataTypes.INT().getLogicalType(), + /* 1 */ DataTypes.INT().getLogicalType(), // part-1 + /* 2 */ DataTypes.STRING().getLogicalType(), + /* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2 + /* 4 */ DataTypes.STRING().getLogicalType(), + /* 5 */ DataTypes.STRING().getLogicalType(), // part-3 + /* 6 */ DataTypes.STRING().getLogicalType(), + /* 7 */ DataTypes.INT().getLogicalType(), + /* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), // part-4 + /* 9 */ DataTypes.STRING().getLogicalType(), + /* 11*/ DataTypes.INT().getLogicalType(), + /* 12*/ DataTypes.INT().getLogicalType(), + /* 13*/ DataTypes.STRING().getLogicalType(), // part-5 + /* 14*/ DataTypes.INT().getLogicalType()); int[] projectedFields = {8, 1, 3, 0, 5, 2}; diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java index a7fd08c343d..bf0418c657f 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java @@ -73,11 +73,6 @@ public class OrcColumnarRowSplitReaderTest { DataTypes.INT() }; - private final String[] testSchemaNameFlat = - new String[] { - "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8" - }; - private final DataType[] testSchemaDecimal = new DataType[] {DataTypes.DECIMAL(10, 5)}; private static Path testFileFlat; @@ -102,12 +97,7 @@ public class OrcColumnarRowSplitReaderTest { for (FileInputSplit split : splits) { try (OrcColumnarRowSplitReader reader = - createReader( - new int[] {0, 1}, - testSchemaFlat, - testSchemaNameFlat, - new HashMap<>(), - split)) { + createReader(new int[] {0, 1}, testSchemaFlat, new HashMap<>(), split)) { // read and count all rows while (!reader.reachedEnd()) { RowData row = reader.nextRecord(null); @@ -129,12 +119,7 @@ public class OrcColumnarRowSplitReaderTest { FileInputSplit[] splits = createSplits(testFileDecimal, 1); try (OrcColumnarRowSplitReader reader = - createReader( - new int[] {0}, - testSchemaDecimal, - new String[] {"_col0"}, - new HashMap<>(), - splits[0])) { + createReader(new int[] {0}, testSchemaDecimal, new HashMap<>(), splits[0])) { assertThat(reader.reachedEnd()).isFalse(); RowData row = reader.nextRecord(null); @@ -191,14 +176,10 @@ public class OrcColumnarRowSplitReaderTest { /* 7 */ DataTypes.INT(), /* 8 */ DataTypes.DECIMAL(10, 5), // part-4 /* 9 */ DataTypes.STRING(), - /* 10*/ DataTypes.INT(), /* 11*/ DataTypes.INT(), - /* 12*/ DataTypes.STRING(), // part-5 - /* 13*/ DataTypes.INT() - }, - new String[] { - "_col0", "f1", "_col1", "f3", "_col2", "f5", "_col3", "_col4", "f8", - "_col5", "_col6", "_col7", "f13", "_col8" + /* 12*/ DataTypes.INT(), + /* 13*/ DataTypes.STRING(), // part-5 + /* 14*/ DataTypes.INT() }, partSpec, split)) { @@ -241,12 +222,7 @@ public class OrcColumnarRowSplitReaderTest { for (FileInputSplit split : splits) { try (OrcColumnarRowSplitReader reader = - createReader( - new int[] {2, 0, 1}, - testSchemaFlat, - testSchemaNameFlat, - new HashMap<>(), - split)) { + createReader(new int[] {2, 0, 1}, testSchemaFlat, new HashMap<>(), split)) { // read and count all rows while (!reader.reachedEnd()) { RowData row = reader.nextRecord(null); @@ -427,25 +403,10 @@ public class OrcColumnarRowSplitReaderTest { Map partitionSpec, FileInputSplit split) throws IOException { - return createReader( - selectedFields, - fullTypes, - IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new), - partitionSpec, - split); - } - - protected OrcColumnarRowSplitReader createReader( - int[] selectedFields, - DataType[] fullTypes, - String[] fullNames, - Map partitionSpec, - FileInputSplit split) - throws IOException { return OrcSplitReaderUtil.genPartColumnarRowReader( "2.3.0", new Configuration(), - fullNames, + IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new), fullTypes, partitionSpec, selectedFields, diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java index 17e912c2c8e..f2d2bdaf65a 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java @@ -206,10 +206,10 @@ public class OrcFormatStatisticsReportTest extends StatisticsReportTestBase { "f_timestamp9", new ColumnStats.Builder() .setMax( - DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123456789", 9) + DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3) .toTimestamp()) .setMin( - DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123456789", 9) + DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3) .toTimestamp()) .setNullCount(0L) .build()); diff --git a/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE b/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE index 872bd712535..8554030115d 100644 --- a/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE +++ b/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE @@ -6,13 +6,13 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- org.apache.orc:orc-core:1.9.4 -- org.apache.orc:orc-shims:1.9.4 -- org.apache.hive:hive-storage-api:2.8.1 -- io.airlift:aircompressor:0.27 +- org.apache.orc:orc-core:1.5.6 +- org.apache.orc:orc-shims:1.5.6 +- org.apache.hive:hive-storage-api:2.6.0 +- io.airlift:aircompressor:0.10 - commons-lang:commons-lang:2.6 This project bundles the following dependencies under the BSD license. See bundled license files for details. -- com.google.protobuf:protobuf-java:3.21.7 +- com.google.protobuf:protobuf-java:2.5.0 diff --git a/pom.xml b/pom.xml index 04267393d89..0903bea943a 100644 --- a/pom.xml +++ b/pom.xml @@ -170,8 +170,7 @@ under the License. --> 3.2.4 2.3.10 - 1.9.4 - 2.8.1 + 1.5.6 1.20.0 tools/japicmp-output 10.18.2