diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java index cce30fd1c7..61616a9f23 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorBuilder.java @@ -25,19 +25,11 @@ import org.apache.iceberg.types.Types; import org.apache.spark.sql.vectorized.ColumnVector; class ColumnVectorBuilder { - private boolean[] isDeleted; - private int[] rowIdMapping; - - public ColumnVectorBuilder withDeletedRows(int[] rowIdMappingArray, boolean[] isDeletedArray) { - this.rowIdMapping = rowIdMappingArray; - this.isDeleted = isDeletedArray; - return this; - } public ColumnVector build(VectorHolder holder, int numRows) { if (holder.isDummy()) { if (holder instanceof VectorHolder.DeletedVectorHolder) { - return new DeletedColumnVector(Types.BooleanType.get(), isDeleted); + return new DeletedColumnVector(Types.BooleanType.get()); } else if (holder instanceof ConstantVectorHolder) { ConstantVectorHolder constantHolder = (ConstantVectorHolder) holder; Type icebergType = constantHolder.icebergType(); @@ -46,8 +38,6 @@ class ColumnVectorBuilder { } else { throw new IllegalStateException("Unknown dummy vector holder: " + holder); } - } else if (rowIdMapping != null) { - return new ColumnVectorWithFilter(holder, rowIdMapping); } else { return new IcebergArrowColumnVector(holder); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index c6f1fe8dfe..c65c24d02f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -18,15 +18,12 @@ */ package org.apache.iceberg.spark.data.vectorized; -import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.iceberg.arrow.vectorized.BaseBatchReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader; import org.apache.iceberg.data.DeleteFilter; -import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.Pair; @@ -88,44 +85,51 @@ public class ColumnarBatchReader extends BaseBatchReader { } private class ColumnBatchLoader { - private final int numRowsToRead; - // the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when - // there is no deletes - private int[] rowIdMapping; - // the array to indicate if a row is deleted or not, it is null when there is no "_deleted" - // metadata column - private boolean[] isDeleted; + private final int batchSize; ColumnBatchLoader(int numRowsToRead) { Preconditions.checkArgument( numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); - this.numRowsToRead = numRowsToRead; - if (hasIsDeletedColumn) { - isDeleted = new boolean[numRowsToRead]; - } + this.batchSize = numRowsToRead; } ColumnarBatch loadDataToColumnBatch() { - int numRowsUndeleted = initRowIdMapping(); - ColumnVector[] arrowColumnVectors = readDataToColumnVectors(); - - ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors); - newColumnarBatch.setNumRows(numRowsUndeleted); - - if (hasEqDeletes()) { - applyEqDelete(newColumnarBatch); - newColumnarBatch = removeExtraColumns(arrowColumnVectors, newColumnarBatch); + int numLiveRows = batchSize; + if (hasIsDeletedColumn) { + boolean[] isDeleted = + ColumnarBatchUtil.buildIsDeleted( + arrowColumnVectors, deletes, rowStartPosInBatch, batchSize); + for (int i = 0; i < arrowColumnVectors.length; i++) { + ColumnVector vector = arrowColumnVectors[i]; + if (vector instanceof DeletedColumnVector) { + ((DeletedColumnVector) vector).setValue(isDeleted); + } + } + } else { + Pair pair = + ColumnarBatchUtil.buildRowIdMapping( + arrowColumnVectors, deletes, rowStartPosInBatch, batchSize); + if (pair != null) { + int[] rowIdMapping = pair.first(); + numLiveRows = pair.second(); + for (int i = 0; i < arrowColumnVectors.length; i++) { + ColumnVector vector = arrowColumnVectors[i]; + if (vector instanceof IcebergArrowColumnVector) { + arrowColumnVectors[i] = + new ColumnVectorWithFilter( + ((IcebergArrowColumnVector) vector).vector(), rowIdMapping); + } + } + } } - if (hasIsDeletedColumn && rowIdMapping != null) { - // reset the row id mapping array, so that it doesn't filter out the deleted rows - for (int i = 0; i < numRowsToRead; i++) { - rowIdMapping[i] = i; - } - newColumnarBatch.setNumRows(numRowsToRead); + if (deletes != null && deletes.hasEqDeletes()) { + arrowColumnVectors = ColumnarBatchUtil.removeExtraColumns(deletes, arrowColumnVectors); } + ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors); + newColumnarBatch.setNumRows(numLiveRows); return newColumnarBatch; } @@ -134,159 +138,17 @@ public class ColumnarBatchReader extends BaseBatchReader { ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder(); for (int i = 0; i < readers.length; i += 1) { - vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); + vectorHolders[i] = readers[i].read(vectorHolders[i], batchSize); int numRowsInVector = vectorHolders[i].numValues(); Preconditions.checkState( - numRowsInVector == numRowsToRead, + numRowsInVector == batchSize, "Number of rows in the vector %s didn't match expected %s ", numRowsInVector, - numRowsToRead); + batchSize); - arrowColumnVectors[i] = - columnVectorBuilder - .withDeletedRows(rowIdMapping, isDeleted) - .build(vectorHolders[i], numRowsInVector); + arrowColumnVectors[i] = columnVectorBuilder.build(vectorHolders[i], numRowsInVector); } return arrowColumnVectors; } - - boolean hasEqDeletes() { - return deletes != null && deletes.hasEqDeletes(); - } - - int initRowIdMapping() { - Pair posDeleteRowIdMapping = posDelRowIdMapping(); - if (posDeleteRowIdMapping != null) { - rowIdMapping = posDeleteRowIdMapping.first(); - return posDeleteRowIdMapping.second(); - } else { - rowIdMapping = initEqDeleteRowIdMapping(); - return numRowsToRead; - } - } - - Pair posDelRowIdMapping() { - if (deletes != null && deletes.hasPosDeletes()) { - return buildPosDelRowIdMapping(deletes.deletedRowPositions()); - } else { - return null; - } - } - - /** - * Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we - * delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the - * row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position - * delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] - * [F,F,T,F,F,F,T,F] -- After applying position deletes - * - * @param deletedRowPositions a set of deleted row positions - * @return the mapping array and the new num of rows in a batch, null if no row is deleted - */ - Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) { - if (deletedRowPositions == null) { - return null; - } - - int[] posDelRowIdMapping = new int[numRowsToRead]; - int originalRowId = 0; - int currentRowId = 0; - while (originalRowId < numRowsToRead) { - if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { - posDelRowIdMapping[currentRowId] = originalRowId; - currentRowId++; - } else { - if (hasIsDeletedColumn) { - isDeleted[originalRowId] = true; - } - - deletes.incrementDeleteCount(); - } - originalRowId++; - } - - if (currentRowId == numRowsToRead) { - // there is no delete in this batch - return null; - } else { - return Pair.of(posDelRowIdMapping, currentRowId); - } - } - - int[] initEqDeleteRowIdMapping() { - int[] eqDeleteRowIdMapping = null; - if (hasEqDeletes()) { - eqDeleteRowIdMapping = new int[numRowsToRead]; - for (int i = 0; i < numRowsToRead; i++) { - eqDeleteRowIdMapping[i] = i; - } - } - - return eqDeleteRowIdMapping; - } - - /** - * Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original - * status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted - * array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num - * records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <= - * 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4] - * [F,T,T,T,F,F,T,F] -- After applying equality deletes - * - * @param columnarBatch the {@link ColumnarBatch} to apply the equality delete - */ - void applyEqDelete(ColumnarBatch columnarBatch) { - Iterator it = columnarBatch.rowIterator(); - int rowId = 0; - int currentRowId = 0; - while (it.hasNext()) { - InternalRow row = it.next(); - if (deletes.eqDeletedRowFilter().test(row)) { - // the row is NOT deleted - // skip deleted rows by pointing to the next undeleted row Id - rowIdMapping[currentRowId] = rowIdMapping[rowId]; - currentRowId++; - } else { - if (hasIsDeletedColumn) { - isDeleted[rowIdMapping[rowId]] = true; - } - - deletes.incrementDeleteCount(); - } - - rowId++; - } - - columnarBatch.setNumRows(currentRowId); - } - - /** - * Removes extra columns added for processing equality delete filters that are not part of the - * final query output. - * - *

During query execution, additional columns may be included in the schema to evaluate - * equality delete filters. For example, if the table schema contains columns C1, C2, C3, C4, - * and C5, and the query is 'SELECT C5 FROM table' while equality delete filters are applied on - * C3 and C4, the processing schema includes C5, C3, and C4. These extra columns (C3 and C4) are - * needed to identify rows to delete but are not included in the final result. - * - *

This method removes these extra columns from the end of {@code arrowColumnVectors}, - * ensuring only the expected columns remain. - * - * @param arrowColumnVectors the array of column vectors representing query result data - * @param columnarBatch the original {@code ColumnarBatch} containing query results - * @return a new {@code ColumnarBatch} with extra columns removed, or the original batch if no - * extra columns were found - */ - ColumnarBatch removeExtraColumns( - ColumnVector[] arrowColumnVectors, ColumnarBatch columnarBatch) { - int expectedColumnSize = deletes.expectedSchema().columns().size(); - if (arrowColumnVectors.length > expectedColumnSize) { - ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, expectedColumnSize); - return new ColumnarBatch(newColumns, columnarBatch.numRows()); - } else { - return columnarBatch; - } - } } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java new file mode 100644 index 0000000000..177f2d168d --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized; + +import java.util.Arrays; +import java.util.function.Predicate; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatchRow; + +public class ColumnarBatchUtil { + + private ColumnarBatchUtil() {} + + /** + * Build a row id mapping inside a batch to skip deleted rows. Here is an example: + * [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original + * status of the isDeleted array Position delete 2, 6 Equality delete 1 <= x <= 3 + * [0,4,5,7,-,-,-,-] -- After applying position and equality deletes [Set Num records to 4] + * [F,T,T,T,F,F,T,F] -- After applying position and equality deletes + * + * @param columnVectors the array of column vectors for the batch. + * @param deletes the delete filter containing delete information. + * @param rowStartPosInBatch the starting position of the row in the batch. + * @param batchSize the size of the batch. + * @return the mapping array and the new num of rows in a batch, null if no row is deleted + */ + public static Pair buildRowIdMapping( + ColumnVector[] columnVectors, + DeleteFilter deletes, + long rowStartPosInBatch, + int batchSize) { + if (deletes == null) { + return null; + } + + PositionDeleteIndex deletedPositions = deletes.deletedRowPositions(); + Predicate eqDeleteFilter = deletes.eqDeletedRowFilter(); + ColumnarBatchRow row = new ColumnarBatchRow(columnVectors); + int[] rowIdMapping = new int[batchSize]; + int liveRowId = 0; + + for (int rowId = 0; rowId < batchSize; rowId++) { + long pos = rowStartPosInBatch + rowId; + row.rowId = rowId; + if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { + deletes.incrementDeleteCount(); + } else { + rowIdMapping[liveRowId] = rowId; + liveRowId++; + } + } + + return liveRowId == batchSize ? null : Pair.of(rowIdMapping, liveRowId); + } + + /** + * Constructs an array indicating whether each row in a batch is deleted based on the specified + * delete filters. This method processes the column vectors and applies the delete filters to + * determine the deleted status for each row starting from a specified row position within the + * batch. + * + * @param columnVectors the array of column vectors for the batch. + * @param deletes the delete filter containing information about which rows should be deleted. + * @param rowStartPosInBatch the starting position of the row in the batch, used to calculate the + * absolute position of the rows in the context of the entire dataset. + * @param batchSize the number of rows in the current batch. + * @return an array of boolean values to indicate if a row is deleted or not + */ + public static boolean[] buildIsDeleted( + ColumnVector[] columnVectors, + DeleteFilter deletes, + long rowStartPosInBatch, + int batchSize) { + boolean[] isDeleted = new boolean[batchSize]; + + if (deletes == null) { + return isDeleted; + } + + PositionDeleteIndex deletedPositions = deletes.deletedRowPositions(); + Predicate eqDeleteFilter = deletes.eqDeletedRowFilter(); + ColumnarBatchRow row = new ColumnarBatchRow(columnVectors); + + for (int rowId = 0; rowId < batchSize; rowId++) { + long pos = rowStartPosInBatch + rowId; + row.rowId = rowId; + if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { + deletes.incrementDeleteCount(); + isDeleted[rowId] = true; + } + } + + return isDeleted; + } + + private static boolean isDeleted( + long pos, + InternalRow row, + PositionDeleteIndex deletedPositions, + Predicate eqDeleteFilter) { + // use separate if statements to reduce the chance of speculative execution for equality tests + if (deletedPositions != null && deletedPositions.isDeleted(pos)) { + return true; + } + + if (eqDeleteFilter != null && !eqDeleteFilter.test(row)) { + return true; + } + + return false; + } + + /** + * Removes extra column vectors added for processing equality delete filters that are not part of + * the final query output. + * + *

During query execution, additional columns may be included in the schema to evaluate + * equality delete filters. For example, if the table schema contains columns C1, C2, C3, C4, and + * C5, and the query is 'SELECT C5 FROM table'. While equality delete filters are applied on C3 + * and C4, the processing schema includes C5, C3, and C4. These extra columns (C3 and C4) are + * needed to identify rows to delete but are not included in the final result. + * + *

This method removes the extra column vectors from the end of column vectors array, ensuring + * only the expected column vectors remain. + * + * @param deletes the delete filter containing delete information. + * @param columnVectors the array of column vectors representing query result data + * @return a new column vectors array with extra column vectors removed, or the original column + * vectors array if no extra column vectors are found + */ + public static ColumnVector[] removeExtraColumns( + DeleteFilter deletes, ColumnVector[] columnVectors) { + int expectedColumnSize = deletes.expectedSchema().columns().size(); + if (columnVectors.length > expectedColumnSize) { + return Arrays.copyOf(columnVectors, expectedColumnSize); + } else { + return columnVectors; + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java index eec6ecb9ac..a453247068 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.data.vectorized; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Type; import org.apache.spark.sql.types.Decimal; @@ -28,12 +27,14 @@ import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.unsafe.types.UTF8String; public class DeletedColumnVector extends ColumnVector { - private final boolean[] isDeleted; + private boolean[] isDeleted; - public DeletedColumnVector(Type type, boolean[] isDeleted) { + public DeletedColumnVector(Type type) { super(SparkSchemaUtil.convert(type)); - Preconditions.checkArgument(isDeleted != null, "Boolean array isDeleted cannot be null"); - this.isDeleted = isDeleted; + } + + public void setValue(boolean[] deleted) { + this.isDeleted = deleted; } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java index dc118aebe3..21b49a9bca 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java @@ -39,11 +39,17 @@ public class IcebergArrowColumnVector extends ColumnVector { private final ArrowVectorAccessor accessor; private final NullabilityHolder nullabilityHolder; + private final VectorHolder holder; public IcebergArrowColumnVector(VectorHolder holder) { super(SparkSchemaUtil.convert(holder.icebergType())); this.nullabilityHolder = holder.nullabilityHolder(); this.accessor = ArrowVectorAccessors.getVectorAccessor(holder); + this.holder = holder; + } + + public VectorHolder vector() { + return holder; } protected ArrowVectorAccessor accessor() {