Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Improve performance of querying system.jdbc.tables for Hive, Iceberg, and Delta #24110

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class DeltaLakeConfig
private boolean queryPartitionFilterRequired;
private boolean deletionVectorsEnabled;
private boolean deltaLogFileSystemCacheDisabled;
private int metadataParallelism = 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 seems pretty low. Did you test bigger number of threads?


public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -566,4 +567,18 @@ public DeltaLakeConfig setDeltaLogFileSystemCacheDisabled(boolean deltaLogFileSy
this.deltaLogFileSystemCacheDisabled = deltaLogFileSystemCacheDisabled;
return this;
}

@Min(1)
public int getMetadataParallelism()
{
return metadataParallelism;
}

@ConfigDescription("Limits metadata enumeration calls parallelism")
@Config("delta.metadata.parallelism")
public DeltaLakeConfig setMetadataParallelism(int metadataParallelism)
{
this.metadataParallelism = metadataParallelism;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.trino.metastore.PrincipalPrivileges;
import io.trino.metastore.StorageFormat;
import io.trino.metastore.Table;
import io.trino.metastore.TableInfo;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.base.filter.UtcConstraintExtractor;
import io.trino.plugin.base.projection.ApplyProjectionUtil;
Expand Down Expand Up @@ -113,6 +114,7 @@
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RelationType;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SaveMode;
Expand Down Expand Up @@ -173,7 +175,10 @@
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -204,6 +209,7 @@
import static io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.base.util.ExecutorUtil.processWithAdditionalThreads;
import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.INCREMENTAL;
Expand Down Expand Up @@ -447,6 +453,7 @@ public class DeltaLakeMetadata
private final Map<SchemaTableName, TableUpdateInfo> tableUpdateInfos = new ConcurrentHashMap<>();
private final Map<SchemaTableName, Long> latestTableVersions = new ConcurrentHashMap<>();
private final Map<QueriedTable, TableSnapshot> queriedSnapshots = new ConcurrentHashMap<>();
private final Executor metadataFetchingExecutor;

private record QueriedTable(SchemaTableName schemaTableName, long version)
{
Expand Down Expand Up @@ -477,7 +484,8 @@ public DeltaLakeMetadata(
CachingExtendedStatisticsAccess statisticsAccess,
DeltaLakeTableMetadataScheduler metadataScheduler,
boolean useUniqueTableLocation,
boolean allowManagedTableRename)
boolean allowManagedTableRename,
Executor metadataFetchingExecutor)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
Expand All @@ -501,6 +509,7 @@ public DeltaLakeMetadata(
this.metadataScheduler = requireNonNull(metadataScheduler, "metadataScheduler is null");
this.useUniqueTableLocation = useUniqueTableLocation;
this.allowManagedTableRename = allowManagedTableRename;
this.metadataFetchingExecutor = requireNonNull(metadataFetchingExecutor, "metadataFetchingExecutor is null");
}

public TableSnapshot getSnapshot(ConnectorSession session, SchemaTableName table, String tableLocation, Optional<Long> atVersion)
Expand Down Expand Up @@ -798,12 +807,40 @@ private List<ColumnMetadata> getTableColumnMetadata(MetadataEntry metadataEntry,
@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
return schemaName.map(Collections::singletonList)
return streamTables(session, schemaName)
.map(TableInfo::tableName)
.collect(toImmutableList());
}

@Override
public Map<SchemaTableName, RelationType> getRelationTypes(ConnectorSession session, Optional<String> schemaName)
{
return streamTables(session, schemaName)
.collect(toImmutableMap(TableInfo::tableName, this::resolveRelationType, (ignore, second) -> second));
}

private Stream<TableInfo> streamTables(ConnectorSession session, Optional<String> optionalSchemaName)
{
List<Callable<List<TableInfo>>> tasks = optionalSchemaName.map(Collections::singletonList)
.orElseGet(() -> listSchemaNames(session))
.stream()
.flatMap(schema -> metastore.getAllTables(schema).stream()
.map(table -> new SchemaTableName(schema, table)))
.map(schemaName -> (Callable<List<TableInfo>>) () -> metastore.getAllTables(schemaName))
.collect(toImmutableList());
try {
return processWithAdditionalThreads(tasks, metadataFetchingExecutor).stream()
.flatMap(Collection::stream);
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}

private RelationType resolveRelationType(TableInfo tableInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piotrrzysko I would drop the question from the commit message.
The question was:

Is this resolution necessary? Could we instead use the
existing mapping between ExtendedRelationType and RelationType that's
already encapsulated in RelationType?

I guess the question here is, is OTHER_VIEW or OTHER_MATERIALIZED_VIEW possible in delta-lake? TRINO_MATERIALIZED_VIEW is not.
@raunaqmorarka @dain Do you know?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw it is ok for me to have the resolveRelationType here to keep the existing functionality

{
if (tableInfo.extendedRelationType() == TableInfo.ExtendedRelationType.TRINO_VIEW) {
return RelationType.VIEW;
}
return RelationType.TABLE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake;

import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.json.JsonCodec;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
Expand All @@ -33,6 +34,8 @@
import io.trino.spi.type.TypeManager;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.createPerTransactionCache;
import static java.util.Objects.requireNonNull;
Expand All @@ -58,7 +61,7 @@ public class DeltaLakeMetadataFactory
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;
private final DeltaLakeTableMetadataScheduler metadataScheduler;

private final Executor metadataFetchingExecutor;
private final boolean allowManagedTableRename;
private final String trinoVersion;

Expand All @@ -79,7 +82,8 @@ public DeltaLakeMetadataFactory(
CachingExtendedStatisticsAccess statisticsAccess,
@AllowDeltaLakeManagedTableRename boolean allowManagedTableRename,
NodeVersion nodeVersion,
DeltaLakeTableMetadataScheduler metadataScheduler)
DeltaLakeTableMetadataScheduler metadataScheduler,
ExecutorService executorService)
{
this.hiveMetastoreFactory = requireNonNull(hiveMetastoreFactory, "hiveMetastore is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
Expand All @@ -102,6 +106,7 @@ public DeltaLakeMetadataFactory(
this.allowManagedTableRename = allowManagedTableRename;
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
this.metadataScheduler = requireNonNull(metadataScheduler, "metadataScheduler is null");
this.metadataFetchingExecutor = new BoundedExecutor(executorService, deltaLakeConfig.getMetadataParallelism());
}

public DeltaLakeMetadata create(ConnectorIdentity identity)
Expand Down Expand Up @@ -141,6 +146,7 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
statisticsAccess,
metadataScheduler,
useUniqueTableLocation,
allowManagedTableRename);
allowManagedTableRename,
metadataFetchingExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.trino.metastore.Database;
import io.trino.metastore.PrincipalPrivileges;
import io.trino.metastore.Table;
import io.trino.metastore.TableInfo;
import io.trino.spi.connector.SchemaTableName;

import java.util.List;
Expand All @@ -27,7 +28,7 @@ public interface DeltaLakeMetastore

Optional<Database> getDatabase(String databaseName);

List<String> getAllTables(String databaseName);
List<TableInfo> getAllTables(String databaseName);

Optional<Table> getRawMetastoreTable(String databaseName, String tableName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import io.trino.metastore.HiveMetastore;
import io.trino.metastore.PrincipalPrivileges;
import io.trino.metastore.Table;
import io.trino.metastore.TableInfo;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.SchemaTableName;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.PATH_PROPERTY;
import static io.trino.plugin.hive.TableType.MANAGED_TABLE;
Expand Down Expand Up @@ -58,11 +58,9 @@ public Optional<Database> getDatabase(String databaseName)
}

@Override
public List<String> getAllTables(String databaseName)
public List<TableInfo> getAllTables(String databaseName)
{
return delegate.getTables(databaseName).stream()
.map(tableInfo -> tableInfo.tableName().getTableName())
.collect(toImmutableList());
return delegate.getTables(databaseName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void testDefaults()
.setProjectionPushdownEnabled(true)
.setQueryPartitionFilterRequired(false)
.setDeletionVectorsEnabled(false)
.setDeltaLogFileSystemCacheDisabled(false));
.setDeltaLogFileSystemCacheDisabled(false)
.setMetadataParallelism(8));
}

@Test
Expand Down Expand Up @@ -116,6 +117,7 @@ public void testExplicitPropertyMappings()
.put("delta.query-partition-filter-required", "true")
.put("delta.deletion-vectors-enabled", "true")
.put("delta.fs.cache.disable-transaction-log-caching", "true")
.put("delta.metadata.parallelism", "10")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand Down Expand Up @@ -153,7 +155,8 @@ public void testExplicitPropertyMappings()
.setProjectionPushdownEnabled(false)
.setQueryPartitionFilterRequired(true)
.setDeletionVectorsEnabled(true)
.setDeltaLogFileSystemCacheDisabled(true);
.setDeltaLogFileSystemCacheDisabled(true)
.setMetadataParallelism(10);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -65,6 +64,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
Expand Down Expand Up @@ -234,7 +234,8 @@ public Stream<AddFileEntry> getActiveFiles(
new CachingExtendedStatisticsAccess(new MetaDirStatisticsAccess(HDFS_FILE_SYSTEM_FACTORY, new JsonCodecFactory().jsonCodec(ExtendedStatistics.class))),
true,
new NodeVersion("test_version"),
new DeltaLakeTableMetadataScheduler(new TestingNodeManager(), TESTING_TYPE_MANAGER, new DeltaLakeFileMetastoreTableOperationsProvider(hiveMetastoreFactory), Integer.MAX_VALUE, new DeltaLakeConfig()));
new DeltaLakeTableMetadataScheduler(new TestingNodeManager(), TESTING_TYPE_MANAGER, new DeltaLakeFileMetastoreTableOperationsProvider(hiveMetastoreFactory), Integer.MAX_VALUE, new DeltaLakeConfig()),
newDirectExecutorService());

ConnectorSession session = testingConnectorSessionWithConfig(deltaLakeConfig);
DeltaLakeTransactionManager deltaLakeTransactionManager = new DeltaLakeTransactionManager(metadataFactory);
Expand All @@ -243,7 +244,7 @@ public Stream<AddFileEntry> getActiveFiles(
return new DeltaLakeSplitManager(
typeManager,
transactionLogAccess,
MoreExecutors.newDirectExecutorService(),
newDirectExecutorService(),
deltaLakeConfig,
HDFS_FILE_SYSTEM_FACTORY,
deltaLakeTransactionManager,
Expand Down