Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,13 @@ struct PAIMON_EXPORT Options {
/// "lookup.cache.high-priority-pool-ratio" - The fraction of cache memory that is reserved for
/// high-priority data like index, filter. Default value is 0.25.
static const char LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[];
/// "lookup.cache-file-retention" - The cached files retention time for lookup.
/// After the file expires, if there is a need for access, it will be re-read from the DFS
/// to build an index on the local disk. Default value is 1 hour.
static const char LOOKUP_CACHE_FILE_RETENTION[];
/// "lookup.cache-max-disk-size" - Max disk size for lookup cache, you can use this option
/// to limit the use of local disks. Default value is unlimited (INT64_MAX).
static const char LOOKUP_CACHE_MAX_DISK_SIZE[];
};

static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ set(PAIMON_CORE_SRCS
core/mergetree/merge_tree_writer.cpp
core/mergetree/write_buffer.cpp
core/mergetree/levels.cpp
core/mergetree/lookup_file.cpp
core/mergetree/lookup_levels.cpp
core/mergetree/lookup/remote_lookup_file_manager.cpp
core/migrate/file_meta_utils.cpp
Expand Down Expand Up @@ -478,6 +479,7 @@ if(PAIMON_BUILD_TESTS)
common/utils/uuid_test.cpp
common/utils/decimal_utils_test.cpp
common/utils/threadsafe_queue_test.cpp
common/utils/generic_lru_cache_test.cpp
STATIC_LINK_LIBS
paimon_shared
test_utils_static
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,7 @@ const char Options::LOOKUP_COMPACT[] = "lookup-compact";
const char Options::LOOKUP_COMPACT_MAX_INTERVAL[] = "lookup-compact.max-interval";
const char Options::LOOKUP_CACHE_MAX_MEMORY_SIZE[] = "lookup.cache-max-memory-size";
const char Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[] = "lookup.cache.high-priority-pool-ratio";
const char Options::LOOKUP_CACHE_FILE_RETENTION[] = "lookup.cache-file-retention";
const char Options::LOOKUP_CACHE_MAX_DISK_SIZE[] = "lookup.cache-max-disk-size";

} // namespace paimon
11 changes: 9 additions & 2 deletions src/paimon/common/io/cache/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class Cache {
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
supplier) = 0;

virtual void Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) = 0;
virtual Status Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) = 0;

virtual void Invalidate(const std::shared_ptr<CacheKey>& key) = 0;

Expand All @@ -65,6 +65,13 @@ class CacheValue {
}
}

bool operator==(const CacheValue& other) const {
if (this == &other) {
return true;
}
return segment_ == other.segment_;
Comment thread
lxy-9602 marked this conversation as resolved.
}

private:
MemorySegment segment_;
CacheCallback callback_;
Expand Down
125 changes: 24 additions & 101 deletions src/paimon/common/io/cache/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,128 +18,51 @@

namespace paimon {

LruCache::LruCache(int64_t max_weight) : max_weight_(max_weight), current_weight_(0) {}
LruCache::LruCache(int64_t max_weight)
: inner_cache_(InnerCache::Options{
.max_weight = max_weight,
.expire_after_access_ms = -1,
.weigh_func = [](const std::shared_ptr<CacheKey>& /*key*/,
const std::shared_ptr<CacheValue>& value) -> int64_t {
return value ? value->GetSegment().Size() : 0;
},
.removal_callback =
[](const std::shared_ptr<CacheKey>& key, const std::shared_ptr<CacheValue>& value,
auto cause) {
if (value) {
value->OnEvict(key);
}
}}) {}

Result<std::shared_ptr<CacheValue>> LruCache::Get(
const std::shared_ptr<CacheKey>& key,
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)> supplier) {
{
std::unique_lock<std::shared_mutex> write_lock(mutex_);
auto cached = FindAndPromote(key);
if (cached) {
return cached.value();
}
}
// Cache miss: load via supplier (outside lock)
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<CacheValue> value, supplier(key));
if (GetWeight(value) > max_weight_) {
return value;
}

std::unique_lock<std::shared_mutex> write_lock(mutex_);
// Another thread may have inserted the key while we were loading
auto cached = FindAndPromote(key);
if (cached) {
return cached.value();
}

Insert(key, value);
EvictIfNeeded();
return value;
return inner_cache_.Get(key, std::move(supplier));
}

void LruCache::Put(const std::shared_ptr<CacheKey>& key, const std::shared_ptr<CacheValue>& value) {
if (GetWeight(value) > max_weight_) {
return;
}
std::unique_lock<std::shared_mutex> write_lock(mutex_);

auto it = lru_map_.find(key);
if (it != lru_map_.end()) {
// Update existing entry: adjust weight
current_weight_ -= GetWeight(it->second->second);
it->second->second = value;
current_weight_ += GetWeight(value);
lru_list_.splice(lru_list_.begin(), lru_list_, it->second);
} else {
Insert(key, value);
}

EvictIfNeeded();
Status LruCache::Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) {
return inner_cache_.Put(key, value);
}

void LruCache::Invalidate(const std::shared_ptr<CacheKey>& key) {
std::unique_lock<std::shared_mutex> write_lock(mutex_);

auto it = lru_map_.find(key);
if (it != lru_map_.end()) {
RemoveEntry(it->second);
}
inner_cache_.Invalidate(key);
}

void LruCache::InvalidateAll() {
std::unique_lock<std::shared_mutex> write_lock(mutex_);

while (!lru_list_.empty()) {
RemoveEntry(std::prev(lru_list_.end()));
}
current_weight_ = 0;
inner_cache_.InvalidateAll();
}

size_t LruCache::Size() const {
std::shared_lock<std::shared_mutex> read_lock(mutex_);
return lru_map_.size();
return inner_cache_.Size();
}

int64_t LruCache::GetCurrentWeight() const {
std::shared_lock<std::shared_mutex> read_lock(mutex_);
return current_weight_;
return inner_cache_.GetCurrentWeight();
}

int64_t LruCache::GetMaxWeight() const {
return max_weight_;
}

std::optional<std::shared_ptr<CacheValue>> LruCache::FindAndPromote(
const std::shared_ptr<CacheKey>& key) {
auto it = lru_map_.find(key);
if (it != lru_map_.end()) {
lru_list_.splice(lru_list_.begin(), lru_list_, it->second);
return it->second->second;
}
return std::nullopt;
return inner_cache_.GetMaxWeight();
}

void LruCache::Insert(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) {
// Insert at front of LRU list
lru_list_.emplace_front(key, value);
lru_map_[key] = lru_list_.begin();
current_weight_ += GetWeight(value);
}

void LruCache::RemoveEntry(LruList::iterator list_it) {
auto entry_key = list_it->first;
auto entry_value = list_it->second;
current_weight_ -= GetWeight(entry_value);
lru_map_.erase(entry_key);
lru_list_.erase(list_it);

if (entry_value) {
entry_value->OnEvict(entry_key);
}
}

void LruCache::EvictIfNeeded() {
while (current_weight_ > max_weight_ && !lru_list_.empty()) {
RemoveEntry(std::prev(lru_list_.end()));
}
}

int64_t LruCache::GetWeight(const std::shared_ptr<CacheValue>& value) {
if (!value) {
return 0;
}
return value->GetSegment().Size();
}
} // namespace paimon
49 changes: 15 additions & 34 deletions src/paimon/common/io/cache/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,25 @@
*/

#pragma once

#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <utility>

#include "paimon/common/io/cache/cache.h"
#include "paimon/common/io/cache/cache_key.h"
#include "paimon/common/memory/memory_segment.h"
#include "paimon/common/utils/generic_lru_cache.h"
#include "paimon/result.h"

namespace paimon {
/// LRU Cache implementation with weight-based eviction.
/// Uses std::list + unordered_map for O(1) get/put/evict:
/// list stores entries in LRU order (most recently used at front)
/// map stores key -> list::iterator for O(1) lookup
/// capacity is measured in bytes (sum of MemorySegment sizes)
/// when an entry is evicted, its CacheCallback is invoked to notify the upper layer
/// @note Thread-safe: all public methods are protected by mutex (read-write lock).

/// LRU Cache implementation with weight-based eviction for block cache.
///
/// Wraps GenericLruCache with CacheKey/CacheValue types. Capacity is measured
/// in bytes (sum of MemorySegment sizes). When an entry is evicted, its
/// CacheCallback is invoked to notify the upper layer.
///
/// @note Thread-safe: all public methods are protected by the underlying GenericLruCache lock.
class LruCache : public Cache {
public:
explicit LruCache(int64_t max_weight);
Expand All @@ -48,8 +43,8 @@ class LruCache : public Cache {
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
supplier) override;

void Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) override;
Status Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) override;

void Invalidate(const std::shared_ptr<CacheKey>& key) override;

Expand All @@ -62,24 +57,10 @@ class LruCache : public Cache {
int64_t GetMaxWeight() const;

private:
using LruEntry = std::pair<std::shared_ptr<CacheKey>, std::shared_ptr<CacheValue>>;
using LruList = std::list<LruEntry>;
using LruMap = std::unordered_map<std::shared_ptr<CacheKey>, LruList::iterator, CacheKeyHash,
CacheKeyEqual>;

std::optional<std::shared_ptr<CacheValue>> FindAndPromote(const std::shared_ptr<CacheKey>& key);
void Insert(const std::shared_ptr<CacheKey>& key, const std::shared_ptr<CacheValue>& value);
void RemoveEntry(LruList::iterator list_it);

void EvictIfNeeded();

static int64_t GetWeight(const std::shared_ptr<CacheValue>& value);
using InnerCache = GenericLruCache<std::shared_ptr<CacheKey>, std::shared_ptr<CacheValue>,
CacheKeyHash, CacheKeyEqual>;

int64_t max_weight_;
int64_t current_weight_;
LruList lru_list_;
LruMap lru_map_;
mutable std::shared_mutex mutex_;
InnerCache inner_cache_;
};

} // namespace paimon
Loading
Loading