Skip to content

Commit

Permalink
Fix incorrect partitionValues_parsed with id & name column mapping in…
Browse files Browse the repository at this point in the history
… Delta Lake
  • Loading branch information
ebyhr committed Nov 14, 2024
1 parent 00a538f commit ee3627b
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.parquet.ParquetTypeUtils.constructField;
import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
import static io.trino.parquet.ParquetTypeUtils.getDescriptors;
Expand Down Expand Up @@ -103,6 +104,16 @@ public static ParquetWriter createParquetWriter(OutputStream outputStream, Parqu
Optional.empty());
}

public static ParquetReader createParquetReader(
ParquetDataSource input,
ParquetMetadata parquetMetadata,
List<Type> types,
List<String> columnNames)
throws IOException
{
return createParquetReader(input, parquetMetadata, new ParquetReaderOptions(), newSimpleAggregatedMemoryContext(), types, columnNames, TupleDomain.all());
}

public static ParquetReader createParquetReader(
ParquetDataSource input,
ParquetMetadata parquetMetadata,
Expand Down
7 changes: 7 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parquet</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parser</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public RowType getAddEntryType(
List<DeltaLakeColumnHandle> partitionColumns = extractPartitionColumns(metadataEntry, protocolEntry, typeManager);
if (!partitionColumns.isEmpty()) {
List<RowType.Field> partitionValuesParsed = partitionColumns.stream()
.map(column -> RowType.field(column.columnName(), typeManager.getType(getTypeSignature(DeltaHiveTypeTranslator.toHiveType(column.type())))))
.map(column -> RowType.field(column.basePhysicalColumnName(), typeManager.getType(getTypeSignature(DeltaHiveTypeTranslator.toHiveType(column.type())))))
.collect(toImmutableList());
addFields.add(RowType.field("partitionValues_parsed", RowType.from(partitionValuesParsed)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile)
}
List<DeltaLakeColumnHandle> partitionColumns = extractPartitionColumns(entries.metadataEntry(), entries.protocolEntry(), typeManager);
List<RowType.Field> partitionValuesParsedFieldTypes = partitionColumns.stream()
.map(column -> RowType.field(column.columnName(), column.type()))
.map(column -> RowType.field(column.basePhysicalColumnName(), column.type()))
.collect(toImmutableList());
for (AddFileEntry addFileEntry : entries.addFileEntries()) {
writeAddFileEntry(pageBuilder, addEntryType, addFileEntry, entries.metadataEntry(), entries.protocolEntry(), partitionColumns, partitionValuesParsedFieldTypes, writeStatsAsJson, writeStatsAsStruct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,26 @@
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
import io.trino.filesystem.local.LocalInputFile;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.metadata.FileMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.reader.FileParquetDataSource;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.ParquetReader;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimeZoneKey;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedRow;
Expand All @@ -58,6 +66,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -71,13 +80,16 @@
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.parquet.ParquetTestUtils.createParquetReader;
import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION;
import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -270,6 +282,72 @@ private void testAddNestedColumnWithColumnMappingMode(String columnMappingMode)
.containsPattern("(delta\\.columnMapping\\.physicalName.*?){11}");
}

@Test // regression test for https://github.com/trinodb/trino/issues/24121
void testPartitionValuesParsedCheckpoint()
throws Exception
{
testPartitionValuesParsedCheckpoint(ColumnMappingMode.ID);
testPartitionValuesParsedCheckpoint(ColumnMappingMode.NAME);
testPartitionValuesParsedCheckpoint(ColumnMappingMode.NONE);
}

private void testPartitionValuesParsedCheckpoint(ColumnMappingMode columnMappingMode)
throws Exception
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_checkpoint",
"(x int, part int) WITH (checkpoint_interval = 3, column_mapping_mode = '" + columnMappingMode + "', partitioned_by = ARRAY['part'])")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 10)", 1);
assertUpdate("INSERT INTO " + table.getName() + " VALUES (2, 20)", 1);
assertUpdate("INSERT INTO " + table.getName() + " VALUES (3, 30)", 1);

Path tableLocation = Path.of(getTableLocation(table.getName()).replace("file://", ""));
Path checkpoint = tableLocation.resolve("_delta_log/00000000000000000003.checkpoint.parquet");

MetadataEntry metadataEntry = loadMetadataEntry(0, tableLocation);
ProtocolEntry protocolEntry = loadProtocolEntry(0, tableLocation);

DeltaLakeColumnHandle partitionColumn = extractPartitionColumns(metadataEntry, protocolEntry, TESTING_TYPE_MANAGER).stream().collect(onlyElement());
String physicalColumnName = partitionColumn.basePhysicalColumnName();
if (columnMappingMode == ColumnMappingMode.ID || columnMappingMode == ColumnMappingMode.NAME) {
assertThat(physicalColumnName).matches(PHYSICAL_COLUMN_NAME_PATTERN);
}
else {
assertThat(physicalColumnName).isEqualTo("part");
}

int partitionValuesParsedFieldPosition = 6;
RowType addEntryType = new CheckpointSchemaManager(TESTING_TYPE_MANAGER).getAddEntryType(metadataEntry, protocolEntry, _ -> true, true, true, true);

RowType.Field partitionValuesParsedField = addEntryType.getFields().get(partitionValuesParsedFieldPosition);
assertThat(partitionValuesParsedField.getName().orElseThrow()).matches("partitionValues_parsed");
RowType partitionValuesParsedType = (RowType) partitionValuesParsedField.getType();
assertThat(partitionValuesParsedType.getFields().stream().collect(onlyElement()).getName().orElseThrow()).isEqualTo(physicalColumnName);

ParquetMetadata parquetMetadata = MetadataReader.readFooter(
new TrinoParquetDataSource(new LocalInputFile(checkpoint.toFile()), new ParquetReaderOptions(), new FileFormatDataSourceStats()),
Optional.empty());
ParquetDataSource dataSource = new FileParquetDataSource(checkpoint.toFile(), new ParquetReaderOptions());
try (ParquetReader reader = createParquetReader(dataSource, parquetMetadata, ImmutableList.of(addEntryType), List.of("add"))) {
List<Integer> actual = new ArrayList<>();
Page page = reader.nextPage();
while (page != null) {
Block block = page.getBlock(0);
for (int i = 0; i < block.getPositionCount(); i++) {
List<?> add = (List<?>) addEntryType.getObjectValue(SESSION, block, i);
if (add == null) {
continue;
}
actual.add((Integer) ((List<?>) add.get(partitionValuesParsedFieldPosition)).stream().collect(onlyElement()));
}
page = reader.nextPage();
}
assertThat(actual).containsExactlyInAnyOrder(10, 20, 30);
}
}
}

/**
* @see deltalake.column_mapping_mode_id
* @see deltalake.column_mapping_mode_name
Expand Down Expand Up @@ -2136,6 +2214,16 @@ private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocat
return transactionLog.getMetaData();
}

private static ProtocolEntry loadProtocolEntry(long entryNumber, Path tableLocation)
throws IOException
{
TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION);
DeltaLakeTransactionLogEntry transactionLog = getEntriesFromJson(entryNumber, tableLocation.resolve("_delta_log").toString(), fileSystem).orElseThrow().stream()
.filter(log -> log.getProtocol() != null)
.collect(onlyElement());
return transactionLog.getProtocol();
}

private String getTableLocation(String tableName)
{
Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", Pattern.DOTALL);
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parquet</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parquet</artifactId>
Expand Down

0 comments on commit ee3627b

Please sign in to comment.