Skip to content

Commit bb18bf5

Browse files
committed
feat(compact): Support global LookupFileCache for compact lookup mode
1 parent 1322133 commit bb18bf5

24 files changed

Lines changed: 2029 additions & 233 deletions

include/paimon/defs.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,13 @@ struct PAIMON_EXPORT Options {
414414
/// "lookup.cache.high-priority-pool-ratio" - The fraction of cache memory that is reserved for
415415
/// high-priority data like index, filter. Default value is 0.25.
416416
static const char LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[];
417+
/// "lookup.cache-file-retention" - The cached files retention time for lookup.
418+
/// After the file expires, if there is a need for access, it will be re-read from the DFS
419+
/// to build an index on the local disk. Default value is 1 hour.
420+
static const char LOOKUP_CACHE_FILE_RETENTION[];
421+
/// "lookup.cache-max-disk-size" - Max disk size for lookup cache, you can use this option
422+
/// to limit the use of local disks. Default value is unlimited (INT64_MAX).
423+
static const char LOOKUP_CACHE_MAX_DISK_SIZE[];
417424
};
418425

419426
static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();

src/paimon/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ if(PAIMON_BUILD_TESTS)
478478
common/utils/uuid_test.cpp
479479
common/utils/decimal_utils_test.cpp
480480
common/utils/threadsafe_queue_test.cpp
481+
common/utils/generic_lru_cache_test.cpp
481482
STATIC_LINK_LIBS
482483
paimon_shared
483484
test_utils_static

src/paimon/common/defs.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,7 @@ const char Options::LOOKUP_COMPACT[] = "lookup-compact";
117117
const char Options::LOOKUP_COMPACT_MAX_INTERVAL[] = "lookup-compact.max-interval";
118118
const char Options::LOOKUP_CACHE_MAX_MEMORY_SIZE[] = "lookup.cache-max-memory-size";
119119
const char Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[] = "lookup.cache.high-priority-pool-ratio";
120+
const char Options::LOOKUP_CACHE_FILE_RETENTION[] = "lookup.cache-file-retention";
121+
const char Options::LOOKUP_CACHE_MAX_DISK_SIZE[] = "lookup.cache-max-disk-size";
120122

121123
} // namespace paimon

src/paimon/common/io/cache/cache.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ class Cache {
3939
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
4040
supplier) = 0;
4141

42-
virtual void Put(const std::shared_ptr<CacheKey>& key,
43-
const std::shared_ptr<CacheValue>& value) = 0;
42+
virtual Status Put(const std::shared_ptr<CacheKey>& key,
43+
const std::shared_ptr<CacheValue>& value) = 0;
4444

4545
virtual void Invalidate(const std::shared_ptr<CacheKey>& key) = 0;
4646

@@ -65,6 +65,10 @@ class CacheValue {
6565
}
6666
}
6767

68+
bool operator==(const CacheValue& other) const {
69+
return segment_ == other.segment_;
70+
}
71+
6872
private:
6973
MemorySegment segment_;
7074
CacheCallback callback_;

src/paimon/common/io/cache/lru_cache.cpp

Lines changed: 24 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -18,128 +18,51 @@
1818

1919
namespace paimon {
2020

21-
LruCache::LruCache(int64_t max_weight) : max_weight_(max_weight), current_weight_(0) {}
21+
LruCache::LruCache(int64_t max_weight)
22+
: inner_cache_(InnerCache::Options{
23+
.max_weight = max_weight,
24+
.expire_after_access_ms = -1,
25+
.weigh_func = [](const std::shared_ptr<CacheKey>& /*key*/,
26+
const std::shared_ptr<CacheValue>& value) -> int64_t {
27+
return value ? value->GetSegment().Size() : 0;
28+
},
29+
.removal_callback =
30+
[](const std::shared_ptr<CacheKey>& key, const std::shared_ptr<CacheValue>& value,
31+
auto cause) {
32+
if (value) {
33+
value->OnEvict(key);
34+
}
35+
}}) {}
2236

2337
Result<std::shared_ptr<CacheValue>> LruCache::Get(
2438
const std::shared_ptr<CacheKey>& key,
2539
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)> supplier) {
26-
{
27-
std::unique_lock<std::shared_mutex> write_lock(mutex_);
28-
auto cached = FindAndPromote(key);
29-
if (cached) {
30-
return cached.value();
31-
}
32-
}
33-
// Cache miss: load via supplier (outside lock)
34-
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<CacheValue> value, supplier(key));
35-
if (GetWeight(value) > max_weight_) {
36-
return value;
37-
}
38-
39-
std::unique_lock<std::shared_mutex> write_lock(mutex_);
40-
// Another thread may have inserted the key while we were loading
41-
auto cached = FindAndPromote(key);
42-
if (cached) {
43-
return cached.value();
44-
}
45-
46-
Insert(key, value);
47-
EvictIfNeeded();
48-
return value;
40+
return inner_cache_.Get(key, std::move(supplier));
4941
}
5042

51-
void LruCache::Put(const std::shared_ptr<CacheKey>& key, const std::shared_ptr<CacheValue>& value) {
52-
if (GetWeight(value) > max_weight_) {
53-
return;
54-
}
55-
std::unique_lock<std::shared_mutex> write_lock(mutex_);
56-
57-
auto it = lru_map_.find(key);
58-
if (it != lru_map_.end()) {
59-
// Update existing entry: adjust weight
60-
current_weight_ -= GetWeight(it->second->second);
61-
it->second->second = value;
62-
current_weight_ += GetWeight(value);
63-
lru_list_.splice(lru_list_.begin(), lru_list_, it->second);
64-
} else {
65-
Insert(key, value);
66-
}
67-
68-
EvictIfNeeded();
43+
Status LruCache::Put(const std::shared_ptr<CacheKey>& key,
44+
const std::shared_ptr<CacheValue>& value) {
45+
return inner_cache_.Put(key, value);
6946
}
7047

7148
void LruCache::Invalidate(const std::shared_ptr<CacheKey>& key) {
72-
std::unique_lock<std::shared_mutex> write_lock(mutex_);
73-
74-
auto it = lru_map_.find(key);
75-
if (it != lru_map_.end()) {
76-
RemoveEntry(it->second);
77-
}
49+
inner_cache_.Invalidate(key);
7850
}
7951

8052
void LruCache::InvalidateAll() {
81-
std::unique_lock<std::shared_mutex> write_lock(mutex_);
82-
83-
while (!lru_list_.empty()) {
84-
RemoveEntry(std::prev(lru_list_.end()));
85-
}
86-
current_weight_ = 0;
53+
inner_cache_.InvalidateAll();
8754
}
8855

8956
size_t LruCache::Size() const {
90-
std::shared_lock<std::shared_mutex> read_lock(mutex_);
91-
return lru_map_.size();
57+
return inner_cache_.Size();
9258
}
9359

9460
int64_t LruCache::GetCurrentWeight() const {
95-
std::shared_lock<std::shared_mutex> read_lock(mutex_);
96-
return current_weight_;
61+
return inner_cache_.GetCurrentWeight();
9762
}
9863

9964
int64_t LruCache::GetMaxWeight() const {
100-
return max_weight_;
101-
}
102-
103-
std::optional<std::shared_ptr<CacheValue>> LruCache::FindAndPromote(
104-
const std::shared_ptr<CacheKey>& key) {
105-
auto it = lru_map_.find(key);
106-
if (it != lru_map_.end()) {
107-
lru_list_.splice(lru_list_.begin(), lru_list_, it->second);
108-
return it->second->second;
109-
}
110-
return std::nullopt;
65+
return inner_cache_.GetMaxWeight();
11166
}
11267

113-
void LruCache::Insert(const std::shared_ptr<CacheKey>& key,
114-
const std::shared_ptr<CacheValue>& value) {
115-
// Insert at front of LRU list
116-
lru_list_.emplace_front(key, value);
117-
lru_map_[key] = lru_list_.begin();
118-
current_weight_ += GetWeight(value);
119-
}
120-
121-
void LruCache::RemoveEntry(LruList::iterator list_it) {
122-
auto entry_key = list_it->first;
123-
auto entry_value = list_it->second;
124-
current_weight_ -= GetWeight(entry_value);
125-
lru_map_.erase(entry_key);
126-
lru_list_.erase(list_it);
127-
128-
if (entry_value) {
129-
entry_value->OnEvict(entry_key);
130-
}
131-
}
132-
133-
void LruCache::EvictIfNeeded() {
134-
while (current_weight_ > max_weight_ && !lru_list_.empty()) {
135-
RemoveEntry(std::prev(lru_list_.end()));
136-
}
137-
}
138-
139-
int64_t LruCache::GetWeight(const std::shared_ptr<CacheValue>& value) {
140-
if (!value) {
141-
return 0;
142-
}
143-
return value->GetSegment().Size();
144-
}
14568
} // namespace paimon

src/paimon/common/io/cache/lru_cache.h

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,25 @@
1515
*/
1616

1717
#pragma once
18+
1819
#include <cstdint>
1920
#include <functional>
20-
#include <list>
2121
#include <memory>
22-
#include <mutex>
23-
#include <optional>
24-
#include <shared_mutex>
25-
#include <string>
26-
#include <unordered_map>
27-
#include <utility>
2822

2923
#include "paimon/common/io/cache/cache.h"
3024
#include "paimon/common/io/cache/cache_key.h"
31-
#include "paimon/common/memory/memory_segment.h"
25+
#include "paimon/common/utils/generic_lru_cache.h"
3226
#include "paimon/result.h"
3327

3428
namespace paimon {
35-
/// LRU Cache implementation with weight-based eviction.
36-
/// Uses std::list + unordered_map for O(1) get/put/evict:
37-
/// list stores entries in LRU order (most recently used at front)
38-
/// map stores key -> list::iterator for O(1) lookup
39-
/// capacity is measured in bytes (sum of MemorySegment sizes)
40-
/// when an entry is evicted, its CacheCallback is invoked to notify the upper layer
41-
/// @note Thread-safe: all public methods are protected by mutex (read-write lock).
29+
30+
/// LRU Cache implementation with weight-based eviction for block cache.
31+
///
32+
/// Wraps GenericLruCache with CacheKey/CacheValue types. Capacity is measured
33+
/// in bytes (sum of MemorySegment sizes). When an entry is evicted, its
34+
/// CacheCallback is invoked to notify the upper layer.
35+
///
36+
/// @note Thread-safe: all public methods are protected by the underlying GenericLruCache lock.
4237
class LruCache : public Cache {
4338
public:
4439
explicit LruCache(int64_t max_weight);
@@ -48,8 +43,8 @@ class LruCache : public Cache {
4843
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
4944
supplier) override;
5045

51-
void Put(const std::shared_ptr<CacheKey>& key,
52-
const std::shared_ptr<CacheValue>& value) override;
46+
Status Put(const std::shared_ptr<CacheKey>& key,
47+
const std::shared_ptr<CacheValue>& value) override;
5348

5449
void Invalidate(const std::shared_ptr<CacheKey>& key) override;
5550

@@ -62,24 +57,10 @@ class LruCache : public Cache {
6257
int64_t GetMaxWeight() const;
6358

6459
private:
65-
using LruEntry = std::pair<std::shared_ptr<CacheKey>, std::shared_ptr<CacheValue>>;
66-
using LruList = std::list<LruEntry>;
67-
using LruMap = std::unordered_map<std::shared_ptr<CacheKey>, LruList::iterator, CacheKeyHash,
68-
CacheKeyEqual>;
69-
70-
std::optional<std::shared_ptr<CacheValue>> FindAndPromote(const std::shared_ptr<CacheKey>& key);
71-
void Insert(const std::shared_ptr<CacheKey>& key, const std::shared_ptr<CacheValue>& value);
72-
void RemoveEntry(LruList::iterator list_it);
73-
74-
void EvictIfNeeded();
75-
76-
static int64_t GetWeight(const std::shared_ptr<CacheValue>& value);
60+
using InnerCache = GenericLruCache<std::shared_ptr<CacheKey>, std::shared_ptr<CacheValue>,
61+
CacheKeyHash, CacheKeyEqual>;
7762

78-
int64_t max_weight_;
79-
int64_t current_weight_;
80-
LruList lru_list_;
81-
LruMap lru_map_;
82-
mutable std::shared_mutex mutex_;
63+
InnerCache inner_cache_;
8364
};
8465

8566
} // namespace paimon

0 commit comments

Comments
 (0)