[FLINK-37182][tests] Save partitionedFile as a class member in the PartitionedFileWriteReadTest.

pull/22926/merge
Ao Li 2 days ago committed by Junrui Lee
parent 085f37bfe6
commit abbfc66c90

@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.IOUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@ -58,6 +59,15 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
*/
class PartitionedFileWriteReadTest {
private @TempDir Path tempPath;
// We need a reference to the PartitionedFile to call deleteQuietly() after the test
private PartitionedFile partitionedFile;
@AfterEach
void tearDown() {
if (partitionedFile != null) {
partitionedFile.deleteQuietly();
}
}
@Test
void testWriteAndReadPartitionedFile() throws Exception {
@ -77,18 +87,17 @@ class PartitionedFileWriteReadTest {
}
int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
PartitionedFile partitionedFile =
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);
FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
@ -111,7 +120,6 @@ class PartitionedFileWriteReadTest {
}
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
assertThat(buffersWritten[subpartition]).hasSameSizeAs(buffersRead[subpartition]);
@ -144,23 +152,20 @@ class PartitionedFileWriteReadTest {
randomSubpartitionOrder
? DataBufferTest.getRandomSubpartitionOrder(numSubpartitions)
: new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
PartitionedFile nonBroadcastPartitionedFile =
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
broadcastRegion,
writeOrder);
FileChannel dataFileChannel =
openFileChannel(nonBroadcastPartitionedFile.getDataFilePath());
FileChannel indexFileChannel =
openFileChannel(nonBroadcastPartitionedFile.getIndexFilePath());
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex,
broadcastRegion,
writeOrder);
FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
verifyReadablePosition(
0,
@ -168,7 +173,7 @@ class PartitionedFileWriteReadTest {
writeOrder[0],
dataFileChannel,
indexFileChannel,
nonBroadcastPartitionedFile,
partitionedFile,
regionStat,
broadcastRegion);
@ -178,7 +183,7 @@ class PartitionedFileWriteReadTest {
writeOrder[0],
dataFileChannel,
indexFileChannel,
nonBroadcastPartitionedFile,
partitionedFile,
regionStat,
broadcastRegion);
@ -188,10 +193,9 @@ class PartitionedFileWriteReadTest {
writeOrder[0],
dataFileChannel,
indexFileChannel,
nonBroadcastPartitionedFile,
partitionedFile,
regionStat,
broadcastRegion);
nonBroadcastPartitionedFile.deleteQuietly();
}
private void verifyReadablePosition(
@ -280,18 +284,17 @@ class PartitionedFileWriteReadTest {
}
int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
PartitionedFile partitionedFile =
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex / 2,
false,
writeOrder);
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(numSubpartitions, writeOrder),
subpartitionIndex -> subpartitionIndex / 2,
false,
writeOrder);
FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
@ -315,7 +318,6 @@ class PartitionedFileWriteReadTest {
}
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
for (int subpartition = 0; subpartition < numSubpartitions; subpartition += 2) {
assertThat(buffersWritten[subpartition / 2])
@ -329,7 +331,7 @@ class PartitionedFileWriteReadTest {
}
}
private PartitionedFile createPartitionedFile(
private void createPartitionedFile(
int numSubpartitions,
int bufferSize,
int numBuffers,
@ -391,7 +393,7 @@ class PartitionedFileWriteReadTest {
}
}
}
return fileWriter.finish();
partitionedFile = fileWriter.finish();
}
private static long getTotalBytes(List<BufferWithSubpartition> bufferWithSubpartitions) {
@ -459,7 +461,7 @@ class PartitionedFileWriteReadTest {
}
}
}
PartitionedFile partitionedFile = fileWriter.finish();
partitionedFile = fileWriter.finish();
FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
@ -486,7 +488,6 @@ class PartitionedFileWriteReadTest {
assertThat(subpartitionBuffers[subpartition]).isEmpty();
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
}
@Test
@ -516,7 +517,7 @@ class PartitionedFileWriteReadTest {
}
}
}
PartitionedFile partitionedFile = fileWriter.finish();
partitionedFile = fileWriter.finish();
FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
@ -549,7 +550,6 @@ class PartitionedFileWriteReadTest {
}
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
}
private void assertBufferEquals(Buffer expected, Buffer actual) {
@ -591,7 +591,7 @@ class PartitionedFileWriteReadTest {
.isInstanceOf(IllegalStateException.class);
} finally {
partitionedFileWriter.finish().deleteQuietly();
partitionedFile = partitionedFileWriter.finish();
}
}
@ -621,7 +621,7 @@ class PartitionedFileWriteReadTest {
int bufferSize = 1024;
int numSubpartitions = 2;
int targetSubpartition = 1;
PartitionedFile partitionedFile = createEmptyPartitionedFile();
createEmptyPartitionedFile();
List<Buffer>[] buffersRead = new List[numSubpartitions];
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
@ -646,7 +646,6 @@ class PartitionedFileWriteReadTest {
buffer -> addReadBuffer(buffer, buffersRead[targetSubpartition]));
assertThat(buffersRead[targetSubpartition]).isEmpty();
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
}
/**
@ -671,22 +670,21 @@ class PartitionedFileWriteReadTest {
}
int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
PartitionedFile partitionedFile =
createPartitionedFile(
createPartitionedFile(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(
numSubpartitions,
bufferSize,
numBuffers,
numRegions,
buffersWritten,
regionStat,
createPartitionedFileWriter(
numSubpartitions,
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions,
writeOrder),
subpartitionIndex -> subpartitionIndex,
random.nextBoolean(),
writeOrder);
FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
@ -721,7 +719,6 @@ class PartitionedFileWriteReadTest {
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
}
private FileChannel openFileChannel(Path path) throws IOException {
@ -733,9 +730,9 @@ class PartitionedFileWriteReadTest {
return Collections.singletonList(new BufferWithSubpartition(buffer, subpartitionIndex));
}
private PartitionedFile createEmptyPartitionedFile() throws IOException {
private void createEmptyPartitionedFile() throws IOException {
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2, new int[0]);
return partitionedFileWriter.finish();
partitionedFile = partitionedFileWriter.finish();
}
private PartitionedFileWriter createPartitionedFileWriter(
@ -762,7 +759,7 @@ class PartitionedFileWriteReadTest {
private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException {
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(1, new int[0]);
partitionedFileWriter.finish().deleteQuietly();
partitionedFile = partitionedFileWriter.finish();
return partitionedFileWriter;
}

Loading…
Cancel
Save