Skip to content

Commit 994b9f5

Browse files
authored
feat(compact): support remote lookup file & add DropFileCallback in Levels (#214)
1 parent 84ddd16 commit 994b9f5

82 files changed

Lines changed: 2092 additions & 96 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

include/paimon/defs.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,13 @@ struct PAIMON_EXPORT Options {
384384
/// "lookup.cache.bloom.filter.fpp" - Define the default false positive probability for lookup
385385
/// cache bloom filters. Default value is 0.05.
386386
static const char LOOKUP_CACHE_BLOOM_FILTER_FPP[];
387+
/// "lookup.remote-file.enabled" - Whether to enable the remote file for lookup.
388+
/// Default value is false.
389+
static const char LOOKUP_REMOTE_FILE_ENABLED[];
390+
/// "lookup.remote-file.level-threshold" - Level threshold of lookup to generate remote lookup
391+
/// files. Level files below this threshold will not generate remote lookup files.
392+
/// Default value is INT32_MIN.
393+
static const char LOOKUP_REMOTE_LEVEL_THRESHOLD[];
387394
/// "lookup.cache-spill-compression" - Spill compression for lookup cache, currently zstd, none,
388395
/// lz4 are supported. Default value is zstd.
389396
/// Noted that java paimon also supports lzo which paimon-cpp does not support for now.

src/paimon/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ set(PAIMON_CORE_SRCS
244244
core/mergetree/write_buffer.cpp
245245
core/mergetree/levels.cpp
246246
core/mergetree/lookup_levels.cpp
247+
core/mergetree/lookup/remote_lookup_file_manager.cpp
247248
core/migrate/file_meta_utils.cpp
248249
core/operation/data_evolution_file_store_scan.cpp
249250
core/operation/data_evolution_split_read.cpp
@@ -596,6 +597,7 @@ if(PAIMON_BUILD_TESTS)
596597
core/mergetree/compact/merge_tree_compact_manager_factory_test.cpp
597598
core/mergetree/compact/merge_tree_compact_rewriter_test.cpp
598599
core/mergetree/lookup/persist_processor_test.cpp
600+
core/mergetree/lookup/remote_lookup_file_manager_test.cpp
599601
core/mergetree/drop_delete_reader_test.cpp
600602
core/mergetree/merge_tree_writer_test.cpp
601603
core/mergetree/write_buffer_test.cpp

src/paimon/common/defs.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ const char Options::COMPACT_OFFPEAK_END_HOUR[] = "compaction.offpeak.end.hour";
9898
const char Options::COMPACTION_OFFPEAK_RATIO[] = "compaction.offpeak-ratio";
9999
const char Options::LOOKUP_CACHE_BLOOM_FILTER_ENABLED[] = "lookup.cache.bloom.filter.enabled";
100100
const char Options::LOOKUP_CACHE_BLOOM_FILTER_FPP[] = "lookup.cache.bloom.filter.fpp";
101+
const char Options::LOOKUP_REMOTE_FILE_ENABLED[] = "lookup.remote-file.enabled";
102+
const char Options::LOOKUP_REMOTE_LEVEL_THRESHOLD[] = "lookup.remote-file.level-threshold";
101103
const char Options::LOOKUP_CACHE_SPILL_COMPRESSION[] = "lookup.cache-spill-compression";
102104
const char Options::SPILL_COMPRESSION_ZSTD_LEVEL[] = "spill-compression.zstd-level";
103105
const char Options::CACHE_PAGE_SIZE[] = "cache-page-size";

src/paimon/core/core_options.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,8 @@ struct CoreOptions::Impl {
420420
int32_t compact_off_peak_ratio = 0;
421421
bool lookup_cache_bloom_filter = true;
422422
double lookup_cache_bloom_filter_fpp = 0.05;
423+
bool lookup_remote_file_enabled = false;
424+
int32_t lookup_remote_level_threshold = INT32_MIN;
423425
CompressOptions lookup_compress_options{"zstd", 1};
424426
int64_t cache_page_size = 64 * 1024; // 64KB
425427
std::map<int32_t, std::shared_ptr<FileFormat>> file_format_per_level;
@@ -676,6 +678,14 @@ Result<CoreOptions> CoreOptions::FromMap(
676678
PAIMON_RETURN_NOT_OK(parser.Parse<double>(Options::LOOKUP_CACHE_BLOOM_FILTER_FPP,
677679
&impl->lookup_cache_bloom_filter_fpp));
678680

681+
// Parse lookup.remote-file.enabled
682+
PAIMON_RETURN_NOT_OK(
683+
parser.Parse<bool>(Options::LOOKUP_REMOTE_FILE_ENABLED, &impl->lookup_remote_file_enabled));
684+
685+
// Parse lookup.remote-file.level-threshold
686+
PAIMON_RETURN_NOT_OK(parser.Parse<int32_t>(Options::LOOKUP_REMOTE_LEVEL_THRESHOLD,
687+
&impl->lookup_remote_level_threshold));
688+
679689
// Parse lookup.cache-spill-compression
680690
std::string lookup_compress_options_compression_str;
681691
PAIMON_RETURN_NOT_OK(parser.ParseString(Options::LOOKUP_CACHE_SPILL_COMPRESSION,
@@ -1187,6 +1197,14 @@ const CompressOptions& CoreOptions::GetLookupCompressOptions() const {
11871197
return impl_->lookup_compress_options;
11881198
}
11891199

1200+
bool CoreOptions::LookupRemoteFileEnabled() const {
1201+
return impl_->lookup_remote_file_enabled;
1202+
}
1203+
1204+
int32_t CoreOptions::GetLookupRemoteLevelThreshold() const {
1205+
return impl_->lookup_remote_level_threshold;
1206+
}
1207+
11901208
int32_t CoreOptions::GetCachePageSize() const {
11911209
return static_cast<int32_t>(impl_->cache_page_size);
11921210
}

src/paimon/core/core_options.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ class PAIMON_EXPORT CoreOptions {
154154
bool LookupCacheBloomFilterEnabled() const;
155155
double GetLookupCacheBloomFilterFpp() const;
156156

157+
bool LookupRemoteFileEnabled() const;
158+
int32_t GetLookupRemoteLevelThreshold() const;
159+
157160
const CompressOptions& GetLookupCompressOptions() const;
158161
int32_t GetCachePageSize() const;
159162

src/paimon/core/core_options_test.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ TEST(CoreOptionsTest, TestDefaultValue) {
128128
ASSERT_EQ(10, core_options.GetLookupCompactMaxInterval());
129129
ASSERT_EQ(256 * 1024 * 1024, core_options.GetLookupCacheMaxMemory());
130130
ASSERT_EQ(0.25, core_options.GetLookupCacheHighPrioPoolRatio());
131+
ASSERT_FALSE(core_options.LookupRemoteFileEnabled());
132+
ASSERT_EQ(core_options.GetLookupRemoteLevelThreshold(), INT32_MIN);
131133
}
132134

133135
TEST(CoreOptionsTest, TestFromMap) {
@@ -214,7 +216,9 @@ TEST(CoreOptionsTest, TestFromMap) {
214216
{Options::FILE_FORMAT_PER_LEVEL, "0:AVRO,3:parquet"},
215217
{Options::FILE_COMPRESSION_PER_LEVEL, "0:lz4,3:none"},
216218
{Options::LOOKUP_CACHE_MAX_MEMORY_SIZE, "1MB"},
217-
{Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, "0.35"}};
219+
{Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, "0.35"},
220+
{Options::LOOKUP_REMOTE_FILE_ENABLED, "True"},
221+
{Options::LOOKUP_REMOTE_LEVEL_THRESHOLD, "2"}};
218222

219223
ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options));
220224
auto fs = core_options.GetFileSystem();
@@ -327,6 +331,8 @@ TEST(CoreOptionsTest, TestFromMap) {
327331
ASSERT_EQ(6 * 1024 * 1024, core_options.GetCachePageSize());
328332
ASSERT_EQ(1024 * 1024, core_options.GetLookupCacheMaxMemory());
329333
ASSERT_EQ(0.35, core_options.GetLookupCacheHighPrioPoolRatio());
334+
ASSERT_TRUE(core_options.LookupRemoteFileEnabled());
335+
ASSERT_EQ(core_options.GetLookupRemoteLevelThreshold(), 2);
330336
}
331337

332338
TEST(CoreOptionsTest, TestInvalidCase) {

src/paimon/core/io/data_file_meta.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ Result<std::shared_ptr<DataFileMeta>> DataFileMeta::Upgrade(int32_t new_level) c
8585
first_row_id, write_cols);
8686
}
8787

88+
std::shared_ptr<DataFileMeta> DataFileMeta::CopyWithExtraFiles(
89+
const std::vector<std::optional<std::string>>& new_extra_files) const {
90+
return std::make_shared<DataFileMeta>(
91+
file_name, file_size, row_count, min_key, max_key, key_stats, value_stats,
92+
min_sequence_number, max_sequence_number, schema_id, level, new_extra_files, creation_time,
93+
delete_row_count, embedded_index, file_source, value_stats_cols, external_path,
94+
first_row_id, write_cols);
95+
}
96+
8897
DataFileMeta::DataFileMeta(
8998
const std::string& _file_name, int64_t _file_size, int64_t _row_count,
9099
const BinaryRow& _min_key, const BinaryRow& _max_key, const SimpleStats& _key_stats,

src/paimon/core/io/data_file_meta.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ struct DataFileMeta {
7777

7878
Result<std::shared_ptr<DataFileMeta>> Upgrade(int32_t new_level) const;
7979

80+
/// Create a copy of this DataFileMeta with the given extra files.
81+
std::shared_ptr<DataFileMeta> CopyWithExtraFiles(
82+
const std::vector<std::optional<std::string>>& new_extra_files) const;
83+
8084
std::optional<int64_t> AddRowCount() const {
8185
return delete_row_count == std::nullopt ? std::optional<int64_t>()
8286
: row_count - delete_row_count.value();

src/paimon/core/mergetree/compact/changelog_merge_tree_rewriter.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,8 @@ Result<CompactResult> ChangelogMergeTreeRewriter::RewriteOrProduceChangelog(
115115
if (rewrite_compact_file) {
116116
NotifyRewriteCompactBefore(before);
117117
}
118+
PAIMON_ASSIGN_OR_RAISE(after, NotifyRewriteCompactAfter(after));
118119
write_guard.Release();
119-
120-
after = NotifyRewriteCompactAfter(after);
121120
return CompactResult(before, after);
122121
}
123122

src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,17 @@ LookupMergeTreeCompactRewriter<T>::LookupMergeTreeCompactRewriter(
3535
std::unique_ptr<MergeFileSplitRead>&& merge_file_split_read,
3636
MergeFunctionWrapperFactory merge_function_wrapper_factory,
3737
const std::shared_ptr<MemoryPool>& pool,
38-
const std::shared_ptr<CancellationController>& cancellation_controller)
38+
const std::shared_ptr<CancellationController>& cancellation_controller,
39+
std::unique_ptr<RemoteLookupFileManager<T>>&& remote_lookup_file_manager)
3940
: ChangelogMergeTreeRewriter(
4041
max_level, /*force_drop_delete=*/dv_maintainer != nullptr, partition, bucket, schema_id,
4142
trimmed_primary_keys, options, data_schema, write_schema,
4243
DeletionVector::CreateFactory(dv_maintainer), path_factory_cache,
4344
std::move(merge_file_split_read), std::move(merge_function_wrapper_factory), pool,
4445
cancellation_controller),
4546
lookup_levels_(std::move(lookup_levels)),
46-
dv_maintainer_(dv_maintainer) {}
47+
dv_maintainer_(dv_maintainer),
48+
remote_lookup_file_manager_(std::move(remote_lookup_file_manager)) {}
4749

4850
template <typename T>
4951
Result<std::unique_ptr<LookupMergeTreeCompactRewriter<T>>>
@@ -54,7 +56,8 @@ LookupMergeTreeCompactRewriter<T>::Create(
5456
const BinaryRow& partition, const std::shared_ptr<TableSchema>& table_schema,
5557
const std::shared_ptr<FileStorePathFactoryCache>& path_factory_cache,
5658
const CoreOptions& options, const std::shared_ptr<MemoryPool>& pool,
57-
const std::shared_ptr<CancellationController>& cancellation_controller) {
59+
const std::shared_ptr<CancellationController>& cancellation_controller,
60+
std::unique_ptr<RemoteLookupFileManager<T>>&& remote_lookup_file_manager) {
5861
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> trimmed_primary_keys,
5962
table_schema->TrimmedPrimaryKeys());
6063
auto data_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields());
@@ -86,7 +89,7 @@ LookupMergeTreeCompactRewriter<T>::Create(
8689
std::move(lookup_levels), dv_maintainer, max_level, partition, bucket, table_schema->Id(),
8790
trimmed_primary_keys, options, data_schema, write_schema, path_factory_cache,
8891
std::move(merge_file_split_read), std::move(merge_function_wrapper_factory), pool,
89-
cancellation_controller));
92+
cancellation_controller, std::move(remote_lookup_file_manager)));
9093
}
9194

9295
template <typename T>
@@ -169,11 +172,21 @@ void LookupMergeTreeCompactRewriter<T>::NotifyRewriteCompactBefore(
169172
}
170173

171174
template <typename T>
172-
std::vector<std::shared_ptr<DataFileMeta>>
175+
Result<std::vector<std::shared_ptr<DataFileMeta>>>
173176
LookupMergeTreeCompactRewriter<T>::NotifyRewriteCompactAfter(
174177
const std::vector<std::shared_ptr<DataFileMeta>>& files) {
175-
// TODO(xinyu.lxy): support remoteLookupFileManager
176-
return files;
178+
if (!remote_lookup_file_manager_) {
179+
return files;
180+
}
181+
182+
std::vector<std::shared_ptr<DataFileMeta>> result;
183+
result.reserve(files.size());
184+
for (const auto& file : files) {
185+
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<DataFileMeta> new_meta,
186+
remote_lookup_file_manager_->GenRemoteLookupFile(file));
187+
result.push_back(std::move(new_meta));
188+
}
189+
return result;
177190
}
178191

179192
template class LookupMergeTreeCompactRewriter<KeyValue>;

0 commit comments

Comments
 (0)