Revert "[FLINK-36652][Formats/ORC] Upgrade Apache ORC Version to 1.9.4 (#25711)"

This reverts commit 363a69dad3.
pull/26019/head
Martijn Visser 2 days ago
parent e000678091
commit 9ba5afc260

@ -87,76 +87,6 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protoc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
<version>${storage-api.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>

@ -21,9 +21,9 @@ package org.apache.flink.orc.nohive.writer;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.orc.writer.PhysicalWriterImpl;
import com.google.protobuf25.CodedOutputStream;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.protobuf.CodedOutputStream;
import java.io.IOException;

@ -36,6 +36,7 @@ import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Map;
import java.util.stream.IntStream;
/** Test for {@link OrcColumnarRowSplitReader}. */
class OrcColumnarRowSplitReaderNoHiveTest extends OrcColumnarRowSplitReaderTest {
@ -99,13 +100,12 @@ class OrcColumnarRowSplitReaderNoHiveTest extends OrcColumnarRowSplitReaderTest
protected OrcColumnarRowSplitReader createReader(
int[] selectedFields,
DataType[] fullTypes,
String[] fullNames,
Map<String, Object> partitionSpec,
FileInputSplit split)
throws IOException {
return OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(
new Configuration(),
fullNames,
IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new),
fullTypes,
partitionSpec,
selectedFields,

@ -101,11 +101,7 @@ under the License.
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
@ -129,10 +125,6 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
@ -149,32 +141,6 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protoc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
<version>${storage-api.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

@ -28,10 +28,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
import org.apache.orc.impl.WriterImpl;
import org.apache.orc.impl.writer.WriterEncryptionVariant;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -98,29 +96,14 @@ public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
@Override
public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
OrcFile.WriterOptions opts = getWriterOptions();
PhysicalWriterImpl physicalWriter = new PhysicalWriterImpl(out, opts);
opts.physicalWriter(physicalWriter);
opts.physicalWriter(new PhysicalWriterImpl(out, opts));
// The path of the Writer is not used to indicate the destination file
// in this case since we have used a dedicated physical writer to write
// to the give output stream directly. However, the path would be used as
// the key of writer in the ORC memory manager, thus we need to make it unique.
Path unusedPath = new Path(UUID.randomUUID().toString());
WriterImpl writer = new WriterImpl(null, unusedPath, opts);
// Obtaining encryption variant from Writer, and setting encryption variant for
// physicalWriter.
try {
Field encryptionFiled = WriterImpl.class.getDeclaredField("encryption");
encryptionFiled.setAccessible(true);
WriterEncryptionVariant[] encryption =
(WriterEncryptionVariant[]) encryptionFiled.get(writer);
physicalWriter.setEncryptionVariant(encryption);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(
"Can not access to the encryption field in Class org.apache.orc.impl.WriterImpl",
e);
}
return new OrcBulkWriter<>(vectorizer, writer);
return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, unusedPath, opts));
}
@VisibleForTesting

@ -21,24 +21,16 @@ package org.apache.flink.orc.writer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataOutputStream;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import org.apache.orc.CompressionCodec;
import org.apache.orc.EncryptionVariant;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.PhysicalWriter;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.CryptoUtils;
import org.apache.orc.impl.HadoopShims;
import org.apache.orc.impl.OrcCodecPool;
import org.apache.orc.impl.OutStream;
import org.apache.orc.impl.SerializationUtils;
import org.apache.orc.impl.StreamName;
import org.apache.orc.impl.WriterImpl;
import org.apache.orc.impl.writer.StreamOptions;
import org.apache.orc.impl.writer.WriterEncryptionKey;
import org.apache.orc.impl.writer.WriterEncryptionVariant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,11 +38,12 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
/**
* A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
*
@ -62,227 +55,247 @@ import java.util.TreeMap;
*/
@Internal
public class PhysicalWriterImpl implements PhysicalWriter {
private static final Logger LOG = LoggerFactory.getLogger(PhysicalWriterImpl.class);
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(PhysicalWriterImpl.class);
private static final byte[] ZEROS = new byte[64 * 1024];
private FSDataOutputStream rawWriter;
private final DirectStream rawStream;
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
protected final OutStream writer;
private final CodedOutputStream codedCompressStream;
private final CodedOutputStream protobufWriter;
private final CompressionKind compress;
private final Map<StreamName, BufferedStream> streams;
private final HadoopShims shims;
private final long blockSize;
private final int maxPadding;
private final StreamOptions compress;
private final OrcFile.CompressionStrategy compressionStrategy;
private final int bufferSize;
private final long blockSize;
private final boolean addBlockPadding;
private final boolean writeVariableLengthBlocks;
private final VariantTracker unencrypted;
private CompressionCodec codec;
private FSDataOutputStream out;
private long headerLength;
private long stripeStart;
private long blockOffset;
private int metadataLength;
private int stripeStatisticsLength = 0;
private int footerLength;
private int stripeNumber = 0;
private final Map<WriterEncryptionVariant, VariantTracker> variants = new TreeMap<>();
public PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts)
throws IOException {
this(out, opts, new WriterEncryptionVariant[0]);
}
public PhysicalWriterImpl(
FSDataOutputStream out,
OrcFile.WriterOptions opts,
WriterEncryptionVariant[] encryption)
throws IOException {
this.rawWriter = out;
long defaultStripeSize = opts.getStripeSize();
this.addBlockPadding = opts.getBlockPadding();
if (opts.isEnforceBufferSize()) {
this.compress = new StreamOptions(opts.getBufferSize());
this.bufferSize = opts.getBufferSize();
} else {
this.compress =
new StreamOptions(
WriterImpl.getEstimatedBufferSize(
defaultStripeSize,
opts.getSchema().getMaximumId() + 1,
opts.getBufferSize()));
}
CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
if (codec != null) {
compress.withCodec(codec, codec.getDefaultOptions());
this.bufferSize =
getEstimatedBufferSize(
opts.getStripeSize(),
opts.getSchema().getMaximumId() + 1,
opts.getBufferSize());
}
this.compressionStrategy = opts.getCompressionStrategy();
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
this.out = out;
this.blockOffset = 0;
this.blockSize = opts.getBlockSize();
blockOffset = 0;
unencrypted = new VariantTracker(opts.getSchema(), compress);
writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
shims = opts.getHadoopShims();
rawStream = new DirectStream(rawWriter);
writer = new OutStream("stripe footer", compress, rawStream);
codedCompressStream = CodedOutputStream.newInstance(writer);
for (WriterEncryptionVariant variant : encryption) {
WriterEncryptionKey key = variant.getKeyDescription();
StreamOptions encryptOptions =
new StreamOptions(unencrypted.options)
.withEncryption(key.getAlgorithm(), variant.getFileFooterKey());
variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions));
}
this.maxPadding = (int) (opts.getPaddingTolerance() * (double) opts.getBufferSize());
this.compress = opts.getCompress();
this.codec = OrcCodecPool.getCodec(this.compress);
this.streams = new TreeMap<>();
this.writer =
new OutStream("metadata", this.bufferSize, this.codec, new DirectStream(this.out));
this.shims = opts.getHadoopShims();
this.addBlockPadding = opts.getBlockPadding();
this.protobufWriter = CodedOutputStream.newInstance(this.writer);
this.writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
}
public void setEncryptionVariant(WriterEncryptionVariant[] encryption) {
if (encryption == null) {
return;
}
for (WriterEncryptionVariant variant : encryption) {
WriterEncryptionKey key = variant.getKeyDescription();
StreamOptions encryptOptions =
new StreamOptions(unencrypted.options)
.withEncryption(key.getAlgorithm(), variant.getFileFooterKey());
variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions));
}
@Override
public void writeHeader() throws IOException {
this.out.write("ORC".getBytes());
this.headerLength = this.out.getPos();
}
protected static class VariantTracker {
// the streams that make up the current stripe
protected final Map<StreamName, BufferedStream> streams = new TreeMap<>();
private final int rootColumn;
private final int lastColumn;
protected final StreamOptions options;
// a list for each column covered by this variant
// the elements in the list correspond to each stripe in the file
protected final List<OrcProto.ColumnStatistics>[] stripeStats;
protected final List<OrcProto.Stream> stripeStatsStreams = new ArrayList<>();
protected final OrcProto.ColumnStatistics[] fileStats;
VariantTracker(TypeDescription schema, StreamOptions options) {
rootColumn = schema.getId();
lastColumn = schema.getMaximumId();
this.options = options;
stripeStats = new List[schema.getMaximumId() - schema.getId() + 1];
for (int i = 0; i < stripeStats.length; ++i) {
stripeStats[i] = new ArrayList<>();
}
fileStats = new OrcProto.ColumnStatistics[stripeStats.length];
}
@Override
public OutputReceiver createDataStream(StreamName name) throws IOException {
BufferedStream result = streams.get(name);
public BufferedStream createStream(StreamName name) {
BufferedStream result = new BufferedStream();
if (result == null) {
result = new BufferedStream();
streams.put(name, result);
return result;
}
/**
* Place the streams in the appropriate area while updating the sizes with the number of
* bytes in the area.
*
* @param area the area to write
* @param sizes the sizes of the areas
* @return the list of stream descriptions to add
*/
public List<OrcProto.Stream> placeStreams(StreamName.Area area, SizeCounters sizes) {
List<OrcProto.Stream> result = new ArrayList<>(streams.size());
for (Map.Entry<StreamName, BufferedStream> stream : streams.entrySet()) {
StreamName name = stream.getKey();
BufferedStream bytes = stream.getValue();
if (name.getArea() == area && !bytes.isSuppressed) {
OrcProto.Stream.Builder builder = OrcProto.Stream.newBuilder();
long size = bytes.getOutputSize();
if (area == StreamName.Area.INDEX) {
sizes.index += size;
} else {
sizes.data += size;
}
builder.setColumn(name.getColumn()).setKind(name.getKind()).setLength(size);
result.add(builder.build());
}
}
return result;
}
return result;
}
/**
* Write the streams in the appropriate area.
*
* @param area the area to write
* @param raw the raw stream to write to
*/
public void writeStreams(StreamName.Area area, FSDataOutputStream raw) throws IOException {
for (Map.Entry<StreamName, BufferedStream> stream : streams.entrySet()) {
if (stream.getKey().getArea() == area) {
stream.getValue().spillToDiskAndClear(raw);
@Override
public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec)
throws IOException {
OutputStream stream =
new OutStream(this.toString(), bufferSize, codec, createDataStream(name));
index.build().writeTo(stream);
stream.flush();
}
@Override
public void writeBloomFilter(
StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec)
throws IOException {
OutputStream stream =
new OutStream(this.toString(), bufferSize, codec, createDataStream(name));
bloom.build().writeTo(stream);
stream.flush();
}
@Override
public void finalizeStripe(
OrcProto.StripeFooter.Builder footerBuilder,
OrcProto.StripeInformation.Builder dirEntry)
throws IOException {
long indexSize = 0;
long dataSize = 0;
for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
BufferedStream receiver = pair.getValue();
if (!receiver.isSuppressed) {
long streamSize = receiver.getOutputSize();
StreamName name = pair.getKey();
footerBuilder.addStreams(
OrcProto.Stream.newBuilder()
.setColumn(name.getColumn())
.setKind(name.getKind())
.setLength(streamSize));
if (StreamName.Area.INDEX == name.getArea()) {
indexSize += streamSize;
} else {
dataSize += streamSize;
}
}
}
/**
* Computed the size of the given column on disk for this stripe. It excludes the index
* streams.
*
* @param column a column id
* @return the total number of bytes
*/
public long getFileBytes(int column) {
long result = 0;
if (column >= rootColumn && column <= lastColumn) {
for (Map.Entry<StreamName, BufferedStream> entry : streams.entrySet()) {
StreamName name = entry.getKey();
if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) {
result += entry.getValue().getOutputSize();
}
}
}
return result;
dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
OrcProto.StripeFooter footer = footerBuilder.build();
// Do we need to pad the file so the stripe doesn't straddle a block boundary?
padStripe(indexSize + dataSize + footer.getSerializedSize());
// write out the data streams
for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
pair.getValue().spillToDiskAndClear(out);
}
// Write out the footer.
writeStripeFooter(footer, dataSize, indexSize, dirEntry);
}
@Override
public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
long startPosition = out.getPos();
OrcProto.Metadata metadata = builder.build();
writeMetadata(metadata);
this.metadataLength = (int) (out.getPos() - startPosition);
}
@Override
public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
long bodyLength = out.getPos() - metadataLength;
builder.setContentLength(bodyLength);
builder.setHeaderLength(headerLength);
long startPosition = out.getPos();
OrcProto.Footer footer = builder.build();
writeFileFooter(footer);
this.footerLength = (int) (out.getPos() - startPosition);
}
VariantTracker getVariant(EncryptionVariant column) {
if (column == null) {
return unencrypted;
@Override
public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
builder.setFooterLength(footerLength);
builder.setMetadataLength(metadataLength);
OrcProto.PostScript ps = builder.build();
// need to write this uncompressed
long startPosition = out.getPos();
ps.writeTo(out);
long length = out.getPos() - startPosition;
if (length > 255) {
throw new IllegalArgumentException("PostScript too large at " + length);
}
return variants.get(column);
out.write((int) length);
return out.getPos();
}
@Override
public long getFileBytes(int column, WriterEncryptionVariant variant) {
return getVariant(variant).getFileBytes(column);
public void close() {
// Just release the codec but don't close the internal stream here to avoid
// Stream Closed or ClosedChannelException when Flink performs checkpoint.
OrcCodecPool.returnCodec(compress, codec);
codec = null;
}
@Override
public StreamOptions getStreamOptions() {
return unencrypted.options;
public void flush() throws IOException {
out.flush();
}
private static void writeZeros(OutputStream output, long remaining) throws IOException {
while (remaining > 0) {
long size = Math.min(ZEROS.length, remaining);
output.write(ZEROS, 0, (int) size);
remaining -= size;
@Override
public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry)
throws IOException {
long start = out.getPos();
int length = buffer.remaining();
long availBlockSpace = blockSize - (start % blockSize);
// see if stripe can fit in the current hdfs block, else pad the remaining
// space in the block
if (length < blockSize && length > availBlockSpace && addBlockPadding) {
byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
LOG.info(String.format("Padding ORC by %d bytes while merging..", availBlockSpace));
start += availBlockSpace;
while (availBlockSpace > 0) {
int writeLen = (int) Math.min(availBlockSpace, pad.length);
out.write(pad, 0, writeLen);
availBlockSpace -= writeLen;
}
}
out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length);
dirEntry.setOffset(start);
}
@Override
public CompressionCodec getCompressionCodec() {
return this.codec;
}
@Override
public long getFileBytes(int column) {
long size = 0;
for (final Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
final BufferedStream receiver = pair.getValue();
if (!receiver.isSuppressed) {
final StreamName name = pair.getKey();
if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) {
size += receiver.getOutputSize();
}
}
}
return size;
}
private void padStripe(long stripeSize) throws IOException {
this.stripeStart = rawWriter.getPos();
this.stripeStart = out.getPos();
long previousBytesInBlock = (stripeStart - blockOffset) % blockSize;
// We only have options if this isn't the first stripe in the block
if (previousBytesInBlock > 0) {
if (previousBytesInBlock + stripeSize >= blockSize) {
// Try making a short block
if (writeVariableLengthBlocks && shims.endVariableLengthBlock(rawWriter)) {
if (writeVariableLengthBlocks && shims.endVariableLengthBlock(out)) {
blockOffset = stripeStart;
} else if (addBlockPadding) {
// if we cross the block boundary, figure out what we should do
long padding = blockSize - previousBytesInBlock;
if (padding <= maxPadding) {
writeZeros(rawWriter, padding);
writeZeros(out, padding);
stripeStart += padding;
}
}
@ -290,225 +303,62 @@ public class PhysicalWriterImpl implements PhysicalWriter {
}
}
private static class DirectStream implements OutputReceiver {
private final FSDataOutputStream output;
DirectStream(FSDataOutputStream output) {
this.output = output;
}
@Override
public void output(ByteBuffer buffer) throws IOException {
output.write(
buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
}
@Override
public void suppress() {
throw new UnsupportedOperationException("Can't suppress direct stream");
}
}
private void writeStripeFooter(
OrcProto.StripeFooter footer,
SizeCounters sizes,
long dataSize,
long indexSize,
OrcProto.StripeInformation.Builder dirEntry)
throws IOException {
writeStripeFooter(footer);
dirEntry.setOffset(stripeStart);
dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - sizes.total());
dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize - indexSize);
}
protected void writeMetadata(OrcProto.Metadata metadata) throws IOException {
metadata.writeTo(codedCompressStream);
codedCompressStream.flush();
metadata.writeTo(protobufWriter);
protobufWriter.flush();
writer.flush();
}
protected void writeFileFooter(OrcProto.Footer footer) throws IOException {
footer.writeTo(codedCompressStream);
codedCompressStream.flush();
footer.writeTo(protobufWriter);
protobufWriter.flush();
writer.flush();
}
protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException {
footer.writeTo(codedCompressStream);
codedCompressStream.flush();
footer.writeTo(protobufWriter);
protobufWriter.flush();
writer.flush();
}
static void writeEncryptedStripeStatistics(
DirectStream output, int stripeNumber, VariantTracker tracker) throws IOException {
StreamOptions options = new StreamOptions(tracker.options);
tracker.stripeStatsStreams.clear();
for (int col = tracker.rootColumn;
col < tracker.rootColumn + tracker.stripeStats.length;
++col) {
options.modifyIv(
CryptoUtils.modifyIvForStream(
col, OrcProto.Stream.Kind.STRIPE_STATISTICS, stripeNumber + 1));
OutStream stream = new OutStream("stripe stats for " + col, options, output);
OrcProto.ColumnarStripeStatistics stats =
OrcProto.ColumnarStripeStatistics.newBuilder()
.addAllColStats(tracker.stripeStats[col - tracker.rootColumn])
.build();
long start = output.output.getPos();
stats.writeTo(stream);
stream.flush();
OrcProto.Stream description =
OrcProto.Stream.newBuilder()
.setColumn(col)
.setKind(OrcProto.Stream.Kind.STRIPE_STATISTICS)
.setLength(output.output.getPos() - start)
.build();
tracker.stripeStatsStreams.add(description);
}
}
static void setUnencryptedStripeStatistics(
OrcProto.Metadata.Builder builder,
int stripeCount,
List<OrcProto.ColumnStatistics>[] stats) {
// Make the unencrypted stripe stats into lists of StripeStatistics.
builder.clearStripeStats();
for (int s = 0; s < stripeCount; ++s) {
OrcProto.StripeStatistics.Builder stripeStats = OrcProto.StripeStatistics.newBuilder();
for (List<OrcProto.ColumnStatistics> col : stats) {
stripeStats.addColStats(col.get(s));
}
builder.addStripeStats(stripeStats.build());
}
}
static void setEncryptionStatistics(
OrcProto.Encryption.Builder encryption,
int stripeNumber,
Collection<VariantTracker> variants)
throws IOException {
int v = 0;
for (VariantTracker variant : variants) {
OrcProto.EncryptionVariant.Builder variantBuilder = encryption.getVariantsBuilder(v++);
// Add the stripe statistics streams to the variant description.
variantBuilder.clearStripeStatistics();
variantBuilder.addAllStripeStatistics(variant.stripeStatsStreams);
// Serialize and encrypt the file statistics.
OrcProto.FileStatistics.Builder file = OrcProto.FileStatistics.newBuilder();
for (OrcProto.ColumnStatistics col : variant.fileStats) {
file.addColumn(col);
}
StreamOptions options = new StreamOptions(variant.options);
options.modifyIv(
CryptoUtils.modifyIvForStream(
variant.rootColumn,
OrcProto.Stream.Kind.FILE_STATISTICS,
stripeNumber + 1));
BufferedStream buffer = new BufferedStream();
OutStream stream = new OutStream("stats for " + variant, options, buffer);
file.build().writeTo(stream);
stream.flush();
variantBuilder.setFileStatistics(buffer.getBytes());
}
}
@Override
public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
long stripeStatisticsStart = rawWriter.getPos();
for (VariantTracker variant : variants.values()) {
writeEncryptedStripeStatistics(rawStream, stripeNumber, variant);
}
setUnencryptedStripeStatistics(builder, stripeNumber, unencrypted.stripeStats);
long metadataStart = rawWriter.getPos();
writeMetadata(builder.build());
this.stripeStatisticsLength = (int) (metadataStart - stripeStatisticsStart);
this.metadataLength = (int) (rawWriter.getPos() - metadataStart);
}
static void addUnencryptedStatistics(
OrcProto.Footer.Builder builder, OrcProto.ColumnStatistics[] stats) {
for (OrcProto.ColumnStatistics stat : stats) {
builder.addStatistics(stat);
private static void writeZeros(OutputStream output, long remaining) throws IOException {
while (remaining > 0) {
long size = Math.min(ZEROS.length, remaining);
output.write(ZEROS, 0, (int) size);
remaining -= size;
}
}
@Override
public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
if (!variants.isEmpty()) {
OrcProto.Encryption.Builder encryption = builder.getEncryptionBuilder();
setEncryptionStatistics(encryption, stripeNumber, variants.values());
}
addUnencryptedStatistics(builder, unencrypted.fileStats);
long bodyLength = rawWriter.getPos() - metadataLength - stripeStatisticsLength;
builder.setContentLength(bodyLength);
builder.setHeaderLength(headerLength);
long startPosn = rawWriter.getPos();
OrcProto.Footer footer = builder.build();
writeFileFooter(footer);
this.footerLength = (int) (rawWriter.getPos() - startPosn);
}
private static class DirectStream implements OutputReceiver {
private final FSDataOutputStream output;
@Override
public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
builder.setFooterLength(footerLength);
builder.setMetadataLength(metadataLength);
if (!variants.isEmpty()) {
builder.setStripeStatisticsLength(stripeStatisticsLength);
}
OrcProto.PostScript ps = builder.build();
// need to write this uncompressed
long startPosn = rawWriter.getPos();
ps.writeTo(rawWriter);
long length = rawWriter.getPos() - startPosn;
if (length > 255) {
throw new IllegalArgumentException("PostScript too large at " + length);
DirectStream(FSDataOutputStream output) {
this.output = output;
}
rawWriter.write((int) length);
return rawWriter.getPos();
}
@Override
public void close() throws IOException {
// Just release the codec but don't close the internal stream here to avoid
// Stream Closed or ClosedChannelException when Flink performs checkpoint.
CompressionCodec codec = compress.getCodec();
if (codec != null) {
OrcCodecPool.returnCodec(codec.getKind(), codec);
public void output(ByteBuffer buffer) throws IOException {
this.output.write(
buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
}
compress.withCodec(null, null);
}
@Override
public void flush() throws IOException {
rawWriter.flush();
}
@Override
public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry)
throws IOException {
long start = rawWriter.getPos();
int length = buffer.remaining();
long availBlockSpace = blockSize - (start % blockSize);
// see if stripe can fit in the current hdfs block, else pad the remaining
// space in the block
if (length < blockSize && length > availBlockSpace && addBlockPadding) {
byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
LOG.debug("Padding ORC by {} bytes while merging", availBlockSpace);
start += availBlockSpace;
while (availBlockSpace > 0) {
int writeLen = (int) Math.min(availBlockSpace, pad.length);
rawWriter.write(pad, 0, writeLen);
availBlockSpace -= writeLen;
}
public void suppress() {
throw new UnsupportedOperationException("Can't suppress direct stream");
}
rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length);
dirEntry.setOffset(start);
stripeNumber += 1;
}
static final class BufferedStream implements OutputReceiver {
private static final class BufferedStream implements OutputReceiver {
private boolean isSuppressed = false;
private final List<ByteBuffer> output = new ArrayList<>();
@ -519,18 +369,12 @@ public class PhysicalWriterImpl implements PhysicalWriter {
}
}
@Override
public void suppress() {
isSuppressed = true;
output.clear();
}
/**
* Write any saved buffers to the OutputStream if needed, and clears all the buffers.
*
* @return true if the stream was written
*/
boolean spillToDiskAndClear(FSDataOutputStream raw) throws IOException {
void spillToDiskAndClear(FSDataOutputStream raw) throws IOException {
if (!isSuppressed) {
for (ByteBuffer buffer : output) {
raw.write(
@ -539,38 +383,10 @@ public class PhysicalWriterImpl implements PhysicalWriter {
buffer.remaining());
}
output.clear();
return true;
}
isSuppressed = false;
return false;
}
/**
* Get the buffer as a protobuf ByteString and clears the BufferedStream.
*
* @return the bytes
*/
ByteString getBytes() {
int len = output.size();
if (len == 0) {
return ByteString.EMPTY;
} else {
ByteString result = ByteString.copyFrom(output.get(0));
for (int i = 1; i < output.size(); ++i) {
result = result.concat(ByteString.copyFrom(output.get(i)));
}
output.clear();
return result;
}
}
/**
* Get the number of bytes that will be written to the output.
*
* <p>Assumes the stream writing into this receiver has already been flushed.
*
* @return number of bytes
*/
public long getOutputSize() {
long result = 0;
for (ByteBuffer buffer : output) {
@ -579,141 +395,4 @@ public class PhysicalWriterImpl implements PhysicalWriter {
return result;
}
}
static class SizeCounters {
long index = 0;
long data = 0;
long total() {
return index + data;
}
}
void buildStreamList(OrcProto.StripeFooter.Builder footerBuilder, SizeCounters sizes)
throws IOException {
footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.INDEX, sizes));
final long unencryptedIndexSize = sizes.index;
int v = 0;
for (VariantTracker variant : variants.values()) {
OrcProto.StripeEncryptionVariant.Builder builder =
footerBuilder.getEncryptionBuilder(v++);
builder.addAllStreams(variant.placeStreams(StreamName.Area.INDEX, sizes));
}
if (sizes.index != unencryptedIndexSize) {
// add a placeholder that covers the hole where the encrypted indexes are
footerBuilder.addStreams(
OrcProto.Stream.newBuilder()
.setKind(OrcProto.Stream.Kind.ENCRYPTED_INDEX)
.setLength(sizes.index - unencryptedIndexSize));
}
footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.DATA, sizes));
final long unencryptedDataSize = sizes.data;
v = 0;
for (VariantTracker variant : variants.values()) {
OrcProto.StripeEncryptionVariant.Builder builder =
footerBuilder.getEncryptionBuilder(v++);
builder.addAllStreams(variant.placeStreams(StreamName.Area.DATA, sizes));
}
if (sizes.data != unencryptedDataSize) {
// add a placeholder that covers the hole where the encrypted indexes are
footerBuilder.addStreams(
OrcProto.Stream.newBuilder()
.setKind(OrcProto.Stream.Kind.ENCRYPTED_DATA)
.setLength(sizes.data - unencryptedDataSize));
}
}
@Override
public void finalizeStripe(
OrcProto.StripeFooter.Builder footerBuilder,
OrcProto.StripeInformation.Builder dirEntry)
throws IOException {
SizeCounters sizes = new SizeCounters();
buildStreamList(footerBuilder, sizes);
OrcProto.StripeFooter footer = footerBuilder.build();
// Do we need to pad the file so the stripe doesn't straddle a block boundary?
padStripe(sizes.total() + footer.getSerializedSize());
// write the unencrypted index streams
unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter);
// write the encrypted index streams
for (VariantTracker variant : variants.values()) {
variant.writeStreams(StreamName.Area.INDEX, rawWriter);
}
// write the unencrypted data streams
unencrypted.writeStreams(StreamName.Area.DATA, rawWriter);
// write out the unencrypted data streams
for (VariantTracker variant : variants.values()) {
variant.writeStreams(StreamName.Area.DATA, rawWriter);
}
// Write out the footer.
writeStripeFooter(footer, sizes, dirEntry);
// fill in the data sizes
dirEntry.setDataLength(sizes.data);
dirEntry.setIndexLength(sizes.index);
stripeNumber += 1;
}
@Override
public void writeHeader() throws IOException {
rawWriter.write(OrcFile.MAGIC.getBytes());
headerLength = rawWriter.getPos();
}
@Override
public BufferedStream createDataStream(StreamName name) {
VariantTracker variant = getVariant(name.getEncryption());
BufferedStream result = variant.streams.get(name);
if (result == null) {
result = new BufferedStream();
variant.streams.put(name, result);
}
return result;
}
protected OutputStream createIndexStream(StreamName name) {
BufferedStream buffer = createDataStream(name);
VariantTracker tracker = getVariant(name.getEncryption());
StreamOptions options =
SerializationUtils.getCustomizedCodec(
tracker.options, compressionStrategy, name.getKind());
if (options.isEncrypted()) {
if (options == tracker.options) {
options = new StreamOptions(options);
}
options.modifyIv(CryptoUtils.modifyIvForStream(name, stripeNumber + 1));
}
return new OutStream(name.toString(), options, buffer);
}
@Override
public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index) throws IOException {
OutputStream stream = createIndexStream(name);
index.build().writeTo(stream);
stream.flush();
}
@Override
public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom)
throws IOException {
OutputStream stream = createIndexStream(name);
bloom.build().writeTo(stream);
stream.flush();
}
@Override
public void writeStatistics(StreamName name, OrcProto.ColumnStatistics.Builder statistics) {
VariantTracker tracker = getVariant(name.getEncryption());
if (name.getKind() == OrcProto.Stream.Kind.FILE_STATISTICS) {
tracker.fileStats[name.getColumn() - tracker.rootColumn] = statistics.build();
} else {
tracker.stripeStats[name.getColumn() - tracker.rootColumn].add(statistics.build());
}
}
}

@ -204,26 +204,20 @@ class OrcColumnarRowInputFormatTest {
RowType tableType =
RowType.of(
new LogicalType[] {
/* 0 */ DataTypes.INT().getLogicalType(),
/* 1 */ DataTypes.INT().getLogicalType(), // part-1
/* 2 */ DataTypes.STRING().getLogicalType(),
/* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2
/* 4 */ DataTypes.STRING().getLogicalType(),
/* 5 */ DataTypes.STRING().getLogicalType(), // part-3
/* 6 */ DataTypes.STRING().getLogicalType(),
/* 7 */ DataTypes.INT().getLogicalType(),
/* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), // part-4
/* 9 */ DataTypes.STRING().getLogicalType(),
/* 10*/ DataTypes.INT().getLogicalType(),
/* 11*/ DataTypes.INT().getLogicalType(),
/* 12*/ DataTypes.STRING().getLogicalType(), // part-5
/* 13*/ DataTypes.INT().getLogicalType()
},
new String[] {
"_col0", "f1", "_col1", "f3", "_col2", "f5", "_col3", "_col4", "f8",
"_col5", "_col6", "_col7", "f13", "_col8"
});
/* 0 */ DataTypes.INT().getLogicalType(),
/* 1 */ DataTypes.INT().getLogicalType(), // part-1
/* 2 */ DataTypes.STRING().getLogicalType(),
/* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2
/* 4 */ DataTypes.STRING().getLogicalType(),
/* 5 */ DataTypes.STRING().getLogicalType(), // part-3
/* 6 */ DataTypes.STRING().getLogicalType(),
/* 7 */ DataTypes.INT().getLogicalType(),
/* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), // part-4
/* 9 */ DataTypes.STRING().getLogicalType(),
/* 11*/ DataTypes.INT().getLogicalType(),
/* 12*/ DataTypes.INT().getLogicalType(),
/* 13*/ DataTypes.STRING().getLogicalType(), // part-5
/* 14*/ DataTypes.INT().getLogicalType());
int[] projectedFields = {8, 1, 3, 0, 5, 2};

@ -73,11 +73,6 @@ public class OrcColumnarRowSplitReaderTest {
DataTypes.INT()
};
private final String[] testSchemaNameFlat =
new String[] {
"_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8"
};
private final DataType[] testSchemaDecimal = new DataType[] {DataTypes.DECIMAL(10, 5)};
private static Path testFileFlat;
@ -102,12 +97,7 @@ public class OrcColumnarRowSplitReaderTest {
for (FileInputSplit split : splits) {
try (OrcColumnarRowSplitReader reader =
createReader(
new int[] {0, 1},
testSchemaFlat,
testSchemaNameFlat,
new HashMap<>(),
split)) {
createReader(new int[] {0, 1}, testSchemaFlat, new HashMap<>(), split)) {
// read and count all rows
while (!reader.reachedEnd()) {
RowData row = reader.nextRecord(null);
@ -129,12 +119,7 @@ public class OrcColumnarRowSplitReaderTest {
FileInputSplit[] splits = createSplits(testFileDecimal, 1);
try (OrcColumnarRowSplitReader reader =
createReader(
new int[] {0},
testSchemaDecimal,
new String[] {"_col0"},
new HashMap<>(),
splits[0])) {
createReader(new int[] {0}, testSchemaDecimal, new HashMap<>(), splits[0])) {
assertThat(reader.reachedEnd()).isFalse();
RowData row = reader.nextRecord(null);
@ -191,14 +176,10 @@ public class OrcColumnarRowSplitReaderTest {
/* 7 */ DataTypes.INT(),
/* 8 */ DataTypes.DECIMAL(10, 5), // part-4
/* 9 */ DataTypes.STRING(),
/* 10*/ DataTypes.INT(),
/* 11*/ DataTypes.INT(),
/* 12*/ DataTypes.STRING(), // part-5
/* 13*/ DataTypes.INT()
},
new String[] {
"_col0", "f1", "_col1", "f3", "_col2", "f5", "_col3", "_col4", "f8",
"_col5", "_col6", "_col7", "f13", "_col8"
/* 12*/ DataTypes.INT(),
/* 13*/ DataTypes.STRING(), // part-5
/* 14*/ DataTypes.INT()
},
partSpec,
split)) {
@ -241,12 +222,7 @@ public class OrcColumnarRowSplitReaderTest {
for (FileInputSplit split : splits) {
try (OrcColumnarRowSplitReader reader =
createReader(
new int[] {2, 0, 1},
testSchemaFlat,
testSchemaNameFlat,
new HashMap<>(),
split)) {
createReader(new int[] {2, 0, 1}, testSchemaFlat, new HashMap<>(), split)) {
// read and count all rows
while (!reader.reachedEnd()) {
RowData row = reader.nextRecord(null);
@ -427,25 +403,10 @@ public class OrcColumnarRowSplitReaderTest {
Map<String, Object> partitionSpec,
FileInputSplit split)
throws IOException {
return createReader(
selectedFields,
fullTypes,
IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new),
partitionSpec,
split);
}
protected OrcColumnarRowSplitReader createReader(
int[] selectedFields,
DataType[] fullTypes,
String[] fullNames,
Map<String, Object> partitionSpec,
FileInputSplit split)
throws IOException {
return OrcSplitReaderUtil.genPartColumnarRowReader(
"2.3.0",
new Configuration(),
fullNames,
IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new),
fullTypes,
partitionSpec,
selectedFields,

@ -206,10 +206,10 @@ public class OrcFormatStatisticsReportTest extends StatisticsReportTestBase {
"f_timestamp9",
new ColumnStats.Builder()
.setMax(
DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123456789", 9)
DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3)
.toTimestamp())
.setMin(
DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123456789", 9)
DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3)
.toTimestamp())
.setNullCount(0L)
.build());

@ -6,13 +6,13 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
- org.apache.orc:orc-core:1.9.4
- org.apache.orc:orc-shims:1.9.4
- org.apache.hive:hive-storage-api:2.8.1
- io.airlift:aircompressor:0.27
- org.apache.orc:orc-core:1.5.6
- org.apache.orc:orc-shims:1.5.6
- org.apache.hive:hive-storage-api:2.6.0
- io.airlift:aircompressor:0.10
- commons-lang:commons-lang:2.6
This project bundles the following dependencies under the BSD license.
See bundled license files for details.
- com.google.protobuf:protobuf-java:3.21.7
- com.google.protobuf:protobuf-java:2.5.0

@ -170,8 +170,7 @@ under the License.
-->
<minikdc.version>3.2.4</minikdc.version>
<hive.version>2.3.10</hive.version>
<orc.version>1.9.4</orc.version>
<storage-api.version>2.8.1</storage-api.version>
<orc.version>1.5.6</orc.version>
<japicmp.referenceVersion>1.20.0</japicmp.referenceVersion>
<japicmp.outputDir>tools/japicmp-output</japicmp.outputDir>
<checkstyle.version>10.18.2</checkstyle.version>

Loading…
Cancel
Save