diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java index 1a328457a90..e9c10ca7b92 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.util.IOUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -58,6 +59,15 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; */ class PartitionedFileWriteReadTest { private @TempDir Path tempPath; + // We need a reference to the PartitionedFile to call deleteQuietly() after the test + private PartitionedFile partitionedFile; + + @AfterEach + void tearDown() { + if (partitionedFile != null) { + partitionedFile.deleteQuietly(); + } + } @Test void testWriteAndReadPartitionedFile() throws Exception { @@ -77,18 +87,17 @@ class PartitionedFileWriteReadTest { } int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions); - PartitionedFile partitionedFile = - createPartitionedFile( - numSubpartitions, - bufferSize, - numBuffers, - numRegions, - buffersWritten, - regionStat, - createPartitionedFileWriter(numSubpartitions, writeOrder), - subpartitionIndex -> subpartitionIndex, - random.nextBoolean(), - writeOrder); + createPartitionedFile( + numSubpartitions, + bufferSize, + numBuffers, + numRegions, + buffersWritten, + regionStat, + createPartitionedFileWriter(numSubpartitions, writeOrder), + subpartitionIndex -> subpartitionIndex, + random.nextBoolean(), + writeOrder); FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); @@ -111,7 +120,6 @@ class PartitionedFileWriteReadTest { } } IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); - partitionedFile.deleteQuietly(); for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { assertThat(buffersWritten[subpartition]).hasSameSizeAs(buffersRead[subpartition]); @@ -144,23 +152,20 @@ class PartitionedFileWriteReadTest { randomSubpartitionOrder ? DataBufferTest.getRandomSubpartitionOrder(numSubpartitions) : new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - PartitionedFile nonBroadcastPartitionedFile = - createPartitionedFile( - numSubpartitions, - bufferSize, - numBuffers, - numRegions, - buffersWritten, - regionStat, - createPartitionedFileWriter(numSubpartitions, writeOrder), - subpartitionIndex -> subpartitionIndex, - broadcastRegion, - writeOrder); - - FileChannel dataFileChannel = - openFileChannel(nonBroadcastPartitionedFile.getDataFilePath()); - FileChannel indexFileChannel = - openFileChannel(nonBroadcastPartitionedFile.getIndexFilePath()); + createPartitionedFile( + numSubpartitions, + bufferSize, + numBuffers, + numRegions, + buffersWritten, + regionStat, + createPartitionedFileWriter(numSubpartitions, writeOrder), + subpartitionIndex -> subpartitionIndex, + broadcastRegion, + writeOrder); + + FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); + FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); verifyReadablePosition( 0, @@ -168,7 +173,7 @@ class PartitionedFileWriteReadTest { writeOrder[0], dataFileChannel, indexFileChannel, - nonBroadcastPartitionedFile, + partitionedFile, regionStat, broadcastRegion); @@ -178,7 +183,7 @@ class PartitionedFileWriteReadTest { writeOrder[0], dataFileChannel, indexFileChannel, - nonBroadcastPartitionedFile, + partitionedFile, regionStat, broadcastRegion); @@ -188,10 +193,9 @@ class PartitionedFileWriteReadTest { writeOrder[0], dataFileChannel, indexFileChannel, - nonBroadcastPartitionedFile, + partitionedFile, regionStat, broadcastRegion); - nonBroadcastPartitionedFile.deleteQuietly(); } private void verifyReadablePosition( @@ -280,18 +284,17 @@ class PartitionedFileWriteReadTest { } int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions); - PartitionedFile partitionedFile = - createPartitionedFile( - numSubpartitions, - bufferSize, - numBuffers, - numRegions, - buffersWritten, - regionStat, - createPartitionedFileWriter(numSubpartitions, writeOrder), - subpartitionIndex -> subpartitionIndex / 2, - false, - writeOrder); + createPartitionedFile( + numSubpartitions, + bufferSize, + numBuffers, + numRegions, + buffersWritten, + regionStat, + createPartitionedFileWriter(numSubpartitions, writeOrder), + subpartitionIndex -> subpartitionIndex / 2, + false, + writeOrder); FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); @@ -315,7 +318,6 @@ class PartitionedFileWriteReadTest { } } IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); - partitionedFile.deleteQuietly(); for (int subpartition = 0; subpartition < numSubpartitions; subpartition += 2) { assertThat(buffersWritten[subpartition / 2]) @@ -329,7 +331,7 @@ class PartitionedFileWriteReadTest { } } - private PartitionedFile createPartitionedFile( + private void createPartitionedFile( int numSubpartitions, int bufferSize, int numBuffers, @@ -391,7 +393,7 @@ class PartitionedFileWriteReadTest { } } } - return fileWriter.finish(); + partitionedFile = fileWriter.finish(); } private static long getTotalBytes(List bufferWithSubpartitions) { @@ -459,7 +461,7 @@ class PartitionedFileWriteReadTest { } } } - PartitionedFile partitionedFile = fileWriter.finish(); + partitionedFile = fileWriter.finish(); FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); @@ -486,7 +488,6 @@ class PartitionedFileWriteReadTest { assertThat(subpartitionBuffers[subpartition]).isEmpty(); } IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); - partitionedFile.deleteQuietly(); } @Test @@ -516,7 +517,7 @@ class PartitionedFileWriteReadTest { } } } - PartitionedFile partitionedFile = fileWriter.finish(); + partitionedFile = fileWriter.finish(); FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); @@ -549,7 +550,6 @@ class PartitionedFileWriteReadTest { } } IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); - partitionedFile.deleteQuietly(); } private void assertBufferEquals(Buffer expected, Buffer actual) { @@ -591,7 +591,7 @@ class PartitionedFileWriteReadTest { .isInstanceOf(IllegalStateException.class); } finally { - partitionedFileWriter.finish().deleteQuietly(); + partitionedFile = partitionedFileWriter.finish(); } } @@ -621,7 +621,7 @@ class PartitionedFileWriteReadTest { int bufferSize = 1024; int numSubpartitions = 2; int targetSubpartition = 1; - PartitionedFile partitionedFile = createEmptyPartitionedFile(); + createEmptyPartitionedFile(); List[] buffersRead = new List[numSubpartitions]; for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { @@ -646,7 +646,6 @@ class PartitionedFileWriteReadTest { buffer -> addReadBuffer(buffer, buffersRead[targetSubpartition])); assertThat(buffersRead[targetSubpartition]).isEmpty(); IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); - partitionedFile.deleteQuietly(); } /** @@ -671,22 +670,21 @@ class PartitionedFileWriteReadTest { } int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions); - PartitionedFile partitionedFile = - createPartitionedFile( + createPartitionedFile( + numSubpartitions, + bufferSize, + numBuffers, + numRegions, + buffersWritten, + regionStat, + createPartitionedFileWriter( numSubpartitions, - bufferSize, - numBuffers, - numRegions, - buffersWritten, - regionStat, - createPartitionedFileWriter( - numSubpartitions, - PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions, - PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions, - writeOrder), - subpartitionIndex -> subpartitionIndex, - random.nextBoolean(), - writeOrder); + PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions, + PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions, + writeOrder), + subpartitionIndex -> subpartitionIndex, + random.nextBoolean(), + writeOrder); FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); @@ -721,7 +719,6 @@ class PartitionedFileWriteReadTest { } IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); - partitionedFile.deleteQuietly(); } private FileChannel openFileChannel(Path path) throws IOException { @@ -733,9 +730,9 @@ class PartitionedFileWriteReadTest { return Collections.singletonList(new BufferWithSubpartition(buffer, subpartitionIndex)); } - private PartitionedFile createEmptyPartitionedFile() throws IOException { + private void createEmptyPartitionedFile() throws IOException { PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2, new int[0]); - return partitionedFileWriter.finish(); + partitionedFile = partitionedFileWriter.finish(); } private PartitionedFileWriter createPartitionedFileWriter( @@ -762,7 +759,7 @@ class PartitionedFileWriteReadTest { private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException { PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(1, new int[0]); - partitionedFileWriter.finish().deleteQuietly(); + partitionedFile = partitionedFileWriter.finish(); return partitionedFileWriter; }