Skip to content

Commit b203f53

Browse files
authored
Move FileIO close from RecordWriter to RecordWriterManager (#37782)
* Move FileIO close from RecordWriter to RecordWriterManager * fix * clarify FileIO ownership comments and verify close
1 parent 9b915fd commit b203f53

3 files changed

Lines changed: 145 additions & 46 deletions

File tree

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@
3131
import org.apache.iceberg.encryption.EncryptedOutputFile;
3232
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
3333
import org.apache.iceberg.io.DataWriter;
34-
import org.apache.iceberg.io.FileIO;
3534
import org.apache.iceberg.io.OutputFile;
3635
import org.apache.iceberg.parquet.Parquet;
37-
import org.checkerframework.checker.nullness.qual.Nullable;
3836
import org.slf4j.Logger;
3937
import org.slf4j.LoggerFactory;
4038

@@ -47,7 +45,6 @@ class RecordWriter {
4745
private final Table table;
4846
private final String absoluteFilename;
4947
private final FileFormat fileFormat;
50-
private @Nullable FileIO io;
5148

5249
RecordWriter(
5350
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
@@ -74,11 +71,9 @@ class RecordWriter {
7471
}
7572
OutputFile outputFile;
7673
EncryptionKeyMetadata keyMetadata;
77-
// Keep FileIO open for the lifetime of this writer to avoid
78-
// premature shutdown of underlying client pools (e.g., S3),
79-
// which manifests as "Connection pool shut down" (Issue #36438).
80-
this.io = table.io();
81-
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
74+
// table.io() may return a shared FileIO instance.
75+
// FileIO lifecycle is managed by RecordWriterManager.close().
76+
OutputFile tmpFile = table.io().newOutputFile(absoluteFilename);
8277
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
8378
outputFile = encryptedOutputFile.encryptingOutputFile();
8479
keyMetadata = encryptedOutputFile.keyMetadata();
@@ -135,20 +130,6 @@ public void close() throws IOException {
135130
fileFormat, table.name(), absoluteFilename),
136131
e);
137132
} finally {
138-
// Always attempt to close FileIO and decrement metrics
139-
if (io != null) {
140-
try {
141-
io.close();
142-
} catch (Exception ioCloseError) {
143-
if (closeError != null) {
144-
closeError.addSuppressed(ioCloseError);
145-
} else {
146-
closeError = new IOException("Failed to close FileIO", ioCloseError);
147-
}
148-
} finally {
149-
io = null;
150-
}
151-
}
152133
activeIcebergWriters.dec();
153134
}
154135

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
import java.time.format.DateTimeFormatter;
3030
import java.time.temporal.ChronoUnit;
3131
import java.util.ArrayList;
32+
import java.util.HashSet;
3233
import java.util.List;
3334
import java.util.Map;
35+
import java.util.Set;
3436
import java.util.UUID;
3537
import java.util.concurrent.TimeUnit;
3638
import org.apache.beam.sdk.schemas.Schema;
@@ -58,6 +60,7 @@
5860
import org.apache.iceberg.data.Record;
5961
import org.apache.iceberg.exceptions.AlreadyExistsException;
6062
import org.apache.iceberg.exceptions.NoSuchTableException;
63+
import org.apache.iceberg.io.FileIO;
6164
import org.apache.iceberg.transforms.Transforms;
6265
import org.checkerframework.checker.nullness.qual.Nullable;
6366
import org.slf4j.Logger;
@@ -403,33 +406,50 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
403406
*/
404407
@Override
405408
public void close() throws IOException {
406-
for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
407-
windowedDestinationAndState : destinations.entrySet()) {
408-
DestinationState state = windowedDestinationAndState.getValue();
409+
try {
410+
for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
411+
windowedDestinationAndState : destinations.entrySet()) {
412+
DestinationState state = windowedDestinationAndState.getValue();
409413

410-
// removing writers from the state's cache will trigger the logic to collect each writer's
411-
// data file.
412-
state.writers.invalidateAll();
413-
// first check for any exceptions swallowed by the cache
414-
if (!state.exceptions.isEmpty()) {
415-
IllegalStateException exception =
416-
new IllegalStateException(
417-
String.format("Encountered %s failed writer(s).", state.exceptions.size()));
418-
for (Exception e : state.exceptions) {
419-
exception.addSuppressed(e);
414+
// removing writers from the state's cache will trigger the logic to collect each writer's
415+
// data file.
416+
state.writers.invalidateAll();
417+
// first check for any exceptions swallowed by the cache
418+
if (!state.exceptions.isEmpty()) {
419+
IllegalStateException exception =
420+
new IllegalStateException(
421+
String.format("Encountered %s failed writer(s).", state.exceptions.size()));
422+
for (Exception e : state.exceptions) {
423+
exception.addSuppressed(e);
424+
}
425+
throw exception;
420426
}
421-
throw exception;
422-
}
423427

424-
if (state.dataFiles.isEmpty()) {
425-
continue;
426-
}
428+
if (state.dataFiles.isEmpty()) {
429+
continue;
430+
}
427431

428-
totalSerializableDataFiles.put(
429-
windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles));
430-
state.dataFiles.clear();
432+
totalSerializableDataFiles.put(
433+
windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles));
434+
state.dataFiles.clear();
435+
}
436+
} finally {
437+
// Close unique FileIO instances now that all writers are done.
438+
// table.io() may return a shared FileIO; we deduplicate by identity
439+
// so we close each underlying connection pool exactly once.
440+
Set<FileIO> closedIOs = new HashSet<>();
441+
for (DestinationState state : destinations.values()) {
442+
FileIO io = state.table.io();
443+
if (io != null && closedIOs.add(io)) {
444+
try {
445+
io.close();
446+
} catch (Exception e) {
447+
LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e);
448+
}
449+
}
450+
}
451+
destinations.clear();
431452
}
432-
destinations.clear();
433453
checkArgument(
434454
openWriters == 0,
435455
"Expected all data writers to be closed, but found %s data writer(s) still open",

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.iceberg.catalog.Catalog;
6666
import org.apache.iceberg.catalog.Namespace;
6767
import org.apache.iceberg.catalog.TableIdentifier;
68+
import org.apache.iceberg.data.Record;
6869
import org.apache.iceberg.hadoop.HadoopCatalog;
6970
import org.apache.iceberg.io.FileIO;
7071
import org.apache.iceberg.io.InputFile;
@@ -957,7 +958,7 @@ public void testDefaultMetrics() throws IOException {
957958
}
958959

959960
@Test
960-
public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
961+
public void testRecordWriterDoesNotCloseSharedFileIO() throws IOException {
961962
TableIdentifier tableId =
962963
TableIdentifier.of(
963964
"default",
@@ -980,7 +981,104 @@ public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
980981
writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
981982
writer.close();
982983

983-
assertTrue("FileIO should be closed after writer close", trackingFileIO.closed);
984+
// RecordWriter must NOT close FileIO — it may be a shared instance.
985+
assertFalse("RecordWriter.close() must not close the shared FileIO", trackingFileIO.closed);
986+
}
987+
988+
/**
989+
* Verifies that when multiple writers share the same FileIO, closing any writer does not close
990+
* the shared FileIO — that is the responsibility of RecordWriterManager.close().
991+
*/
992+
@Test
993+
public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOException {
994+
// Create two tables that share the same FileIO (simulating dynamic destinations
995+
// backed by the same catalog)
996+
TableIdentifier tableId1 =
997+
TableIdentifier.of(
998+
"default",
999+
"table_batch_close_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6));
1000+
TableIdentifier tableId2 =
1001+
TableIdentifier.of(
1002+
"default",
1003+
"table_batch_close_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6));
1004+
Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
1005+
Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);
1006+
1007+
// Both tables share the same CloseTrackingFileIO — mirrors how some catalogs
1008+
// return a shared FileIO instance across tables
1009+
CloseTrackingFileIO sharedFileIO = new CloseTrackingFileIO(table1.io());
1010+
Table spyTable1 = Mockito.spy(table1);
1011+
Table spyTable2 = Mockito.spy(table2);
1012+
Mockito.doReturn(sharedFileIO).when(spyTable1).io();
1013+
Mockito.doReturn(sharedFileIO).when(spyTable2).io();
1014+
1015+
PartitionKey pk1 = new PartitionKey(spyTable1.spec(), spyTable1.schema());
1016+
PartitionKey pk2 = new PartitionKey(spyTable2.spec(), spyTable2.schema());
1017+
1018+
RecordWriter writer1 = new RecordWriter(spyTable1, FileFormat.PARQUET, "file1.parquet", pk1);
1019+
RecordWriter writer2 = new RecordWriter(spyTable2, FileFormat.PARQUET, "file2.parquet", pk2);
1020+
1021+
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
1022+
Record record = IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row);
1023+
1024+
writer1.write(record);
1025+
writer2.write(record);
1026+
1027+
writer1.close();
1028+
assertFalse("FileIO must remain open between batch writer closes", sharedFileIO.closed);
1029+
1030+
writer2.close();
1031+
assertFalse("FileIO must remain open after all writers close", sharedFileIO.closed);
1032+
1033+
// Both writers produced valid data files
1034+
assertNotNull(writer1.getDataFile());
1035+
assertNotNull(writer2.getDataFile());
1036+
}
1037+
1038+
/**
1039+
* Verifies that RecordWriterManager.close() flushes data files from multiple destinations and
1040+
* closes the shared FileIO.
1041+
*/
1042+
@Test
1043+
public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOException {
1044+
String tableName1 =
1045+
"table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
1046+
String tableName2 =
1047+
"table_mgr_io_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
1048+
TableIdentifier tableId1 = TableIdentifier.of("default", tableName1);
1049+
TableIdentifier tableId2 = TableIdentifier.of("default", tableName2);
1050+
1051+
Table realTable1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
1052+
Table realTable2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);
1053+
1054+
CloseTrackingFileIO sharedTrackingIO = new CloseTrackingFileIO(realTable1.io());
1055+
Table spyTable1 = Mockito.spy(realTable1);
1056+
Table spyTable2 = Mockito.spy(realTable2);
1057+
Mockito.doReturn(sharedTrackingIO).when(spyTable1).io();
1058+
Mockito.doReturn(sharedTrackingIO).when(spyTable2).io();
1059+
1060+
Catalog spyCatalog = Mockito.spy(catalog);
1061+
Mockito.doReturn(spyTable1).when(spyCatalog).loadTable(tableId1);
1062+
Mockito.doReturn(spyTable2).when(spyCatalog).loadTable(tableId2);
1063+
1064+
WindowedValue<IcebergDestination> dest1 = getWindowedDestination(tableName1, null);
1065+
WindowedValue<IcebergDestination> dest2 = getWindowedDestination(tableName2, null);
1066+
1067+
RecordWriterManager writerManager =
1068+
new RecordWriterManager(spyCatalog, "test_file_name", 1000, 3);
1069+
1070+
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
1071+
assertTrue(writerManager.write(dest1, row));
1072+
assertTrue(writerManager.write(dest2, row));
1073+
assertEquals(2, writerManager.openWriters);
1074+
1075+
writerManager.close();
1076+
1077+
Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> dataFiles =
1078+
writerManager.getSerializableDataFiles();
1079+
assertTrue("Should have data files for dest1", dataFiles.containsKey(dest1));
1080+
assertTrue("Should have data files for dest2", dataFiles.containsKey(dest2));
1081+
assertTrue("Shared FileIO should be closed", sharedTrackingIO.closed);
9841082
}
9851083

9861084
private static final class CloseTrackingFileIO implements FileIO {

0 commit comments

Comments
 (0)