Spark 3.5: Refactor delete logic in batch reading (#11933)

main
Huaxin Gao 10 hours ago committed by GitHub
parent e86d25f28c
commit be6e9daf89
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -25,19 +25,11 @@ import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnVector;
class ColumnVectorBuilder {
private boolean[] isDeleted;
private int[] rowIdMapping;
public ColumnVectorBuilder withDeletedRows(int[] rowIdMappingArray, boolean[] isDeletedArray) {
this.rowIdMapping = rowIdMappingArray;
this.isDeleted = isDeletedArray;
return this;
}
public ColumnVector build(VectorHolder holder, int numRows) {
if (holder.isDummy()) {
if (holder instanceof VectorHolder.DeletedVectorHolder) {
return new DeletedColumnVector(Types.BooleanType.get(), isDeleted);
return new DeletedColumnVector(Types.BooleanType.get());
} else if (holder instanceof ConstantVectorHolder) {
ConstantVectorHolder<?> constantHolder = (ConstantVectorHolder<?>) holder;
Type icebergType = constantHolder.icebergType();
@ -46,8 +38,6 @@ class ColumnVectorBuilder {
} else {
throw new IllegalStateException("Unknown dummy vector holder: " + holder);
}
} else if (rowIdMapping != null) {
return new ColumnVectorWithFilter(holder, rowIdMapping);
} else {
return new IcebergArrowColumnVector(holder);
}

@ -18,15 +18,12 @@
*/
package org.apache.iceberg.spark.data.vectorized;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.Pair;
@ -88,44 +85,51 @@ public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
}
private class ColumnBatchLoader {
private final int numRowsToRead;
// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when
// there is no deletes
private int[] rowIdMapping;
// the array to indicate if a row is deleted or not, it is null when there is no "_deleted"
// metadata column
private boolean[] isDeleted;
private final int batchSize;
ColumnBatchLoader(int numRowsToRead) {
Preconditions.checkArgument(
numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);
this.numRowsToRead = numRowsToRead;
if (hasIsDeletedColumn) {
isDeleted = new boolean[numRowsToRead];
}
this.batchSize = numRowsToRead;
}
ColumnarBatch loadDataToColumnBatch() {
int numRowsUndeleted = initRowIdMapping();
ColumnVector[] arrowColumnVectors = readDataToColumnVectors();
ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);
newColumnarBatch.setNumRows(numRowsUndeleted);
if (hasEqDeletes()) {
applyEqDelete(newColumnarBatch);
newColumnarBatch = removeExtraColumns(arrowColumnVectors, newColumnarBatch);
int numLiveRows = batchSize;
if (hasIsDeletedColumn) {
boolean[] isDeleted =
ColumnarBatchUtil.buildIsDeleted(
arrowColumnVectors, deletes, rowStartPosInBatch, batchSize);
for (int i = 0; i < arrowColumnVectors.length; i++) {
ColumnVector vector = arrowColumnVectors[i];
if (vector instanceof DeletedColumnVector) {
((DeletedColumnVector) vector).setValue(isDeleted);
}
}
} else {
Pair<int[], Integer> pair =
ColumnarBatchUtil.buildRowIdMapping(
arrowColumnVectors, deletes, rowStartPosInBatch, batchSize);
if (pair != null) {
int[] rowIdMapping = pair.first();
numLiveRows = pair.second();
for (int i = 0; i < arrowColumnVectors.length; i++) {
ColumnVector vector = arrowColumnVectors[i];
if (vector instanceof IcebergArrowColumnVector) {
arrowColumnVectors[i] =
new ColumnVectorWithFilter(
((IcebergArrowColumnVector) vector).vector(), rowIdMapping);
}
}
}
}
if (hasIsDeletedColumn && rowIdMapping != null) {
// reset the row id mapping array, so that it doesn't filter out the deleted rows
for (int i = 0; i < numRowsToRead; i++) {
rowIdMapping[i] = i;
}
newColumnarBatch.setNumRows(numRowsToRead);
if (deletes != null && deletes.hasEqDeletes()) {
arrowColumnVectors = ColumnarBatchUtil.removeExtraColumns(deletes, arrowColumnVectors);
}
ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);
newColumnarBatch.setNumRows(numLiveRows);
return newColumnarBatch;
}
@ -134,159 +138,17 @@ public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder();
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
vectorHolders[i] = readers[i].read(vectorHolders[i], batchSize);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
numRowsInVector == batchSize,
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
batchSize);
arrowColumnVectors[i] =
columnVectorBuilder
.withDeletedRows(rowIdMapping, isDeleted)
.build(vectorHolders[i], numRowsInVector);
arrowColumnVectors[i] = columnVectorBuilder.build(vectorHolders[i], numRowsInVector);
}
return arrowColumnVectors;
}
boolean hasEqDeletes() {
return deletes != null && deletes.hasEqDeletes();
}
int initRowIdMapping() {
Pair<int[], Integer> posDeleteRowIdMapping = posDelRowIdMapping();
if (posDeleteRowIdMapping != null) {
rowIdMapping = posDeleteRowIdMapping.first();
return posDeleteRowIdMapping.second();
} else {
rowIdMapping = initEqDeleteRowIdMapping();
return numRowsToRead;
}
}
Pair<int[], Integer> posDelRowIdMapping() {
if (deletes != null && deletes.hasPosDeletes()) {
return buildPosDelRowIdMapping(deletes.deletedRowPositions());
} else {
return null;
}
}
/**
* Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we
* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the
* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position
* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]
* [F,F,T,F,F,F,T,F] -- After applying position deletes
*
* @param deletedRowPositions a set of deleted row positions
* @return the mapping array and the new num of rows in a batch, null if no row is deleted
*/
Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {
if (deletedRowPositions == null) {
return null;
}
int[] posDelRowIdMapping = new int[numRowsToRead];
int originalRowId = 0;
int currentRowId = 0;
while (originalRowId < numRowsToRead) {
if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) {
posDelRowIdMapping[currentRowId] = originalRowId;
currentRowId++;
} else {
if (hasIsDeletedColumn) {
isDeleted[originalRowId] = true;
}
deletes.incrementDeleteCount();
}
originalRowId++;
}
if (currentRowId == numRowsToRead) {
// there is no delete in this batch
return null;
} else {
return Pair.of(posDelRowIdMapping, currentRowId);
}
}
int[] initEqDeleteRowIdMapping() {
int[] eqDeleteRowIdMapping = null;
if (hasEqDeletes()) {
eqDeleteRowIdMapping = new int[numRowsToRead];
for (int i = 0; i < numRowsToRead; i++) {
eqDeleteRowIdMapping[i] = i;
}
}
return eqDeleteRowIdMapping;
}
/**
* Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original
* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted
* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num
* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <=
* 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]
* [F,T,T,T,F,F,T,F] -- After applying equality deletes
*
* @param columnarBatch the {@link ColumnarBatch} to apply the equality delete
*/
void applyEqDelete(ColumnarBatch columnarBatch) {
Iterator<InternalRow> it = columnarBatch.rowIterator();
int rowId = 0;
int currentRowId = 0;
while (it.hasNext()) {
InternalRow row = it.next();
if (deletes.eqDeletedRowFilter().test(row)) {
// the row is NOT deleted
// skip deleted rows by pointing to the next undeleted row Id
rowIdMapping[currentRowId] = rowIdMapping[rowId];
currentRowId++;
} else {
if (hasIsDeletedColumn) {
isDeleted[rowIdMapping[rowId]] = true;
}
deletes.incrementDeleteCount();
}
rowId++;
}
columnarBatch.setNumRows(currentRowId);
}
/**
* Removes extra columns added for processing equality delete filters that are not part of the
* final query output.
*
* <p>During query execution, additional columns may be included in the schema to evaluate
* equality delete filters. For example, if the table schema contains columns C1, C2, C3, C4,
* and C5, and the query is 'SELECT C5 FROM table' while equality delete filters are applied on
* C3 and C4, the processing schema includes C5, C3, and C4. These extra columns (C3 and C4) are
* needed to identify rows to delete but are not included in the final result.
*
* <p>This method removes these extra columns from the end of {@code arrowColumnVectors},
* ensuring only the expected columns remain.
*
* @param arrowColumnVectors the array of column vectors representing query result data
* @param columnarBatch the original {@code ColumnarBatch} containing query results
* @return a new {@code ColumnarBatch} with extra columns removed, or the original batch if no
* extra columns were found
*/
ColumnarBatch removeExtraColumns(
ColumnVector[] arrowColumnVectors, ColumnarBatch columnarBatch) {
int expectedColumnSize = deletes.expectedSchema().columns().size();
if (arrowColumnVectors.length > expectedColumnSize) {
ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, expectedColumnSize);
return new ColumnarBatch(newColumns, columnarBatch.numRows());
} else {
return columnarBatch;
}
}
}
}

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.data.vectorized;
import java.util.Arrays;
import java.util.function.Predicate;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatchRow;
public class ColumnarBatchUtil {
private ColumnarBatchUtil() {}
/**
* Build a row id mapping inside a batch to skip deleted rows. Here is an example:
* [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original
* status of the isDeleted array Position delete 2, 6 Equality delete 1 &lt;= x &lt;= 3
* [0,4,5,7,-,-,-,-] -- After applying position and equality deletes [Set Num records to 4]
* [F,T,T,T,F,F,T,F] -- After applying position and equality deletes
*
* @param columnVectors the array of column vectors for the batch.
* @param deletes the delete filter containing delete information.
* @param rowStartPosInBatch the starting position of the row in the batch.
* @param batchSize the size of the batch.
* @return the mapping array and the new num of rows in a batch, null if no row is deleted
*/
public static Pair<int[], Integer> buildRowIdMapping(
ColumnVector[] columnVectors,
DeleteFilter<InternalRow> deletes,
long rowStartPosInBatch,
int batchSize) {
if (deletes == null) {
return null;
}
PositionDeleteIndex deletedPositions = deletes.deletedRowPositions();
Predicate<InternalRow> eqDeleteFilter = deletes.eqDeletedRowFilter();
ColumnarBatchRow row = new ColumnarBatchRow(columnVectors);
int[] rowIdMapping = new int[batchSize];
int liveRowId = 0;
for (int rowId = 0; rowId < batchSize; rowId++) {
long pos = rowStartPosInBatch + rowId;
row.rowId = rowId;
if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) {
deletes.incrementDeleteCount();
} else {
rowIdMapping[liveRowId] = rowId;
liveRowId++;
}
}
return liveRowId == batchSize ? null : Pair.of(rowIdMapping, liveRowId);
}
/**
* Constructs an array indicating whether each row in a batch is deleted based on the specified
* delete filters. This method processes the column vectors and applies the delete filters to
* determine the deleted status for each row starting from a specified row position within the
* batch.
*
* @param columnVectors the array of column vectors for the batch.
* @param deletes the delete filter containing information about which rows should be deleted.
* @param rowStartPosInBatch the starting position of the row in the batch, used to calculate the
* absolute position of the rows in the context of the entire dataset.
* @param batchSize the number of rows in the current batch.
* @return an array of boolean values to indicate if a row is deleted or not
*/
public static boolean[] buildIsDeleted(
ColumnVector[] columnVectors,
DeleteFilter<InternalRow> deletes,
long rowStartPosInBatch,
int batchSize) {
boolean[] isDeleted = new boolean[batchSize];
if (deletes == null) {
return isDeleted;
}
PositionDeleteIndex deletedPositions = deletes.deletedRowPositions();
Predicate<InternalRow> eqDeleteFilter = deletes.eqDeletedRowFilter();
ColumnarBatchRow row = new ColumnarBatchRow(columnVectors);
for (int rowId = 0; rowId < batchSize; rowId++) {
long pos = rowStartPosInBatch + rowId;
row.rowId = rowId;
if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) {
deletes.incrementDeleteCount();
isDeleted[rowId] = true;
}
}
return isDeleted;
}
private static boolean isDeleted(
long pos,
InternalRow row,
PositionDeleteIndex deletedPositions,
Predicate<InternalRow> eqDeleteFilter) {
// use separate if statements to reduce the chance of speculative execution for equality tests
if (deletedPositions != null && deletedPositions.isDeleted(pos)) {
return true;
}
if (eqDeleteFilter != null && !eqDeleteFilter.test(row)) {
return true;
}
return false;
}
/**
* Removes extra column vectors added for processing equality delete filters that are not part of
* the final query output.
*
* <p>During query execution, additional columns may be included in the schema to evaluate
* equality delete filters. For example, if the table schema contains columns C1, C2, C3, C4, and
* C5, and the query is 'SELECT C5 FROM table'. While equality delete filters are applied on C3
* and C4, the processing schema includes C5, C3, and C4. These extra columns (C3 and C4) are
* needed to identify rows to delete but are not included in the final result.
*
* <p>This method removes the extra column vectors from the end of column vectors array, ensuring
* only the expected column vectors remain.
*
* @param deletes the delete filter containing delete information.
* @param columnVectors the array of column vectors representing query result data
* @return a new column vectors array with extra column vectors removed, or the original column
* vectors array if no extra column vectors are found
*/
public static ColumnVector[] removeExtraColumns(
DeleteFilter<InternalRow> deletes, ColumnVector[] columnVectors) {
int expectedColumnSize = deletes.expectedSchema().columns().size();
if (columnVectors.length > expectedColumnSize) {
return Arrays.copyOf(columnVectors, expectedColumnSize);
} else {
return columnVectors;
}
}
}

@ -18,7 +18,6 @@
*/
package org.apache.iceberg.spark.data.vectorized;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Type;
import org.apache.spark.sql.types.Decimal;
@ -28,12 +27,14 @@ import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;
public class DeletedColumnVector extends ColumnVector {
private final boolean[] isDeleted;
private boolean[] isDeleted;
public DeletedColumnVector(Type type, boolean[] isDeleted) {
public DeletedColumnVector(Type type) {
super(SparkSchemaUtil.convert(type));
Preconditions.checkArgument(isDeleted != null, "Boolean array isDeleted cannot be null");
this.isDeleted = isDeleted;
}
public void setValue(boolean[] deleted) {
this.isDeleted = deleted;
}
@Override

@ -39,11 +39,17 @@ public class IcebergArrowColumnVector extends ColumnVector {
private final ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> accessor;
private final NullabilityHolder nullabilityHolder;
private final VectorHolder holder;
public IcebergArrowColumnVector(VectorHolder holder) {
super(SparkSchemaUtil.convert(holder.icebergType()));
this.nullabilityHolder = holder.nullabilityHolder();
this.accessor = ArrowVectorAccessors.getVectorAccessor(holder);
this.holder = holder;
}
public VectorHolder vector() {
return holder;
}
protected ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> accessor() {

Loading…
Cancel
Save