From 43d587a5e54f6b02ac4e36364298e7aaf987bce1 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 7 Apr 2026 18:01:39 +0000 Subject: [PATCH 1/2] feat: auto-reduce page size when approaching gRPC message size limits Add size-aware query execution that tracks approximate result sizes and automatically stops fetching early when approaching the 4MB gRPC message size limit. This prevents "message too big" errors by: - Adding runQueryWithSizeLimit() that estimates serialized size during query - Stopping early when cumulative size approaches 3MB (conservative threshold) - Returning with a next page token so subsequent calls continue from where we stopped - Adding size estimation functions for resources, entitlements, and grants Fixes CXH-1300 https://claude.ai/code/session_01Gg185kHrjNU2aznuRQ7CsG --- pkg/bsql/entitlements.go | 49 ++++++++---- pkg/bsql/grants.go | 48 +++++++++--- pkg/bsql/query.go | 160 +++++++++++++++++++++++++++++++++++++++ pkg/bsql/resources.go | 40 +++++++--- 4 files changed, 262 insertions(+), 35 deletions(-) diff --git a/pkg/bsql/entitlements.go b/pkg/bsql/entitlements.go index e366a2c1..d269ad4b 100644 --- a/pkg/bsql/entitlements.go +++ b/pkg/bsql/entitlements.go @@ -86,25 +86,46 @@ func (s *SQLSyncer) dynamicEntitlements(ctx context.Context, resource *v2.Resour return nil, "", nil, err } - npt, err := s.runQuery(ctx, pToken, s.config.Entitlements.Query, s.config.Entitlements.Pagination, queryVars, func(ctx context.Context, rowMap map[string]any) (bool, error) { - for _, mapping := range s.config.Entitlements.Map { - r, ok, err := s.mapEntitlement(ctx, resource, mapping, rowMap) - if err != nil { - return false, err - } - - if ok { - r.Resource = resource - ret = append(ret, r) + // Use size-aware query execution to prevent exceeding gRPC message size limits + result, err := s.runQueryWithSizeLimit(ctx, pToken, s.config.Entitlements.Query, s.config.Entitlements.Pagination, queryVars, + func(ctx context.Context, rowMap map[string]any) (bool, int64, error) { + var itemSize int64 + for _, mapping := range s.config.Entitlements.Map { + r, ok, err := s.mapEntitlement(ctx, resource, mapping, rowMap) + if err != nil { + return false, 0, err + } + + if ok { + r.Resource = resource + ret = append(ret, r) + itemSize += estimateEntitlementSize(r) + } } - } - return true, nil - }) + return true, itemSize, nil + }) if err != nil { return nil, "", nil, err } - return ret, npt, nil, nil + return ret, result.NextPageToken, nil, nil +} + +// estimateEntitlementSize provides a rough estimate of the serialized size of an entitlement. +func estimateEntitlementSize(e *v2.Entitlement) int64 { + if e == nil { + return 0 + } + var size int64 + size += int64(len(e.Id)) + size += int64(len(e.DisplayName)) + size += int64(len(e.Description)) + size += int64(len(e.Slug)) + // Add overhead for annotations, resource reference, and protobuf encoding + size += int64(len(e.Annotations) * 50) + size += int64(len(e.GrantableTo) * 50) + size += sizeEstimateOverhead + return size } func (s *SQLSyncer) mapEntitlement(ctx context.Context, resource *v2.Resource, mappings *EntitlementMapping, rowMap map[string]any) (*v2.Entitlement, bool, error) { diff --git a/pkg/bsql/grants.go b/pkg/bsql/grants.go index 38abcfa8..d2b0fa05 100644 --- a/pkg/bsql/grants.go +++ b/pkg/bsql/grants.go @@ -82,24 +82,48 @@ func (s *SQLSyncer) listGrants(ctx context.Context, resource *v2.Resource, pToke return nil, "", err } - npt, err := s.runQuery(ctx, pToken, grantConfig.Query, grantConfig.Pagination, queryVars, func(ctx context.Context, rowMap map[string]any) (bool, error) { - for _, mapping := range grantConfig.Map { - g, ok, err := s.mapGrant(ctx, resource, mapping, rowMap) - if err != nil { - return false, err - } + // Use size-aware query execution to prevent exceeding gRPC message size limits + result, err := s.runQueryWithSizeLimit(ctx, pToken, grantConfig.Query, grantConfig.Pagination, queryVars, + func(ctx context.Context, rowMap map[string]any) (bool, int64, error) { + var itemSize int64 + for _, mapping := range grantConfig.Map { + g, ok, err := s.mapGrant(ctx, resource, mapping, rowMap) + if err != nil { + return false, 0, err + } - if ok { - ret = append(ret, g) + if ok { + ret = append(ret, g) + itemSize += estimateGrantSize(g) + } } - } - return true, nil - }) + return true, itemSize, nil + }) if err != nil { return nil, "", err } - return ret, npt, nil + return ret, result.NextPageToken, nil +} + +// estimateGrantSize provides a rough estimate of the serialized size of a grant. +func estimateGrantSize(g *v2.Grant) int64 { + if g == nil { + return 0 + } + var size int64 + size += int64(len(g.Id)) + if g.Entitlement != nil { + size += int64(len(g.Entitlement.Id)) + } + if g.Principal != nil { + size += int64(len(g.Principal.Id.Resource)) + size += int64(len(g.Principal.Id.ResourceType)) + } + // Add overhead for annotations and protobuf encoding + size += int64(len(g.Annotations) * 50) + size += sizeEstimateOverhead + return size } func (s *SQLSyncer) mapGrant(ctx context.Context, resource *v2.Resource, mapping *GrantMapping, rowMap map[string]any) (*v2.Grant, bool, error) { diff --git a/pkg/bsql/query.go b/pkg/bsql/query.go index 06ff670c..380972cd 100644 --- a/pkg/bsql/query.go +++ b/pkg/bsql/query.go @@ -26,11 +26,53 @@ const ( cursorKey = "cursor" limitKey = "limit" unquotedKey = "unquoted" + + // gRPC message size limits - we use a conservative threshold to avoid hitting the 4MB limit + // The actual limit is 4MB, but we use 3.5MB to leave room for overhead and metadata + maxResponseSizeBytes = 3 * 1024 * 1024 // 3MB conservative limit + sizeEstimateOverhead = 500 // Overhead per item for protobuf encoding + minPageSizeForSizeLimit = 1 // Minimum page size when reducing due to size limits + pageSizeReductionDivisor = 2 // How much to reduce page size when hitting limits ) var ErrQueryAffectedZeroRows = errors.New("query affected 0 rows, ending and rolling back") var ErrQueryAffectedMoreThanOneRow = errors.New("query affected more than one row, ending and rolling back") +// queryResult contains the result of a query execution along with size tracking information. +type queryResult struct { + // NextPageToken is the token for the next page of results + NextPageToken string + // TotalSize is the approximate total size of all results in bytes + TotalSize int64 + // ItemCount is the number of items returned + ItemCount int + // HitSizeLimit indicates whether the query stopped early due to size limits + HitSizeLimit bool +} + +// estimateRowSize provides a rough estimate of the serialized size of a row map. +// This is used to prevent exceeding gRPC message size limits. +func estimateRowSize(rowMap map[string]interface{}) int64 { + var size int64 + for k, v := range rowMap { + size += int64(len(k)) + switch val := v.(type) { + case string: + size += int64(len(val)) + case []byte: + size += int64(len(val)) + case nil: + // nil values have minimal overhead + default: + // For other types, estimate 8-32 bytes + size += 16 + } + } + // Add overhead for protobuf encoding, traits, and other fields + size += sizeEstimateOverhead + return size +} + type executor interface { ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) @@ -490,6 +532,124 @@ func (s *SQLSyncer) normalizeValue(val any) any { return val } +// runQueryWithSizeLimit executes a query with size-based early termination. +// It returns a queryResult containing pagination info and size tracking. +// The sizeEstimator function should return the estimated serialized size of the processed item. +func (s *SQLSyncer) runQueryWithSizeLimit( + ctx context.Context, + pToken *pagination.Token, + query string, + pOpts *Pagination, + vars map[string]any, + rowCallback func(context.Context, map[string]interface{}) (bool, int64, error), +) (*queryResult, error) { + l := ctxzap.Extract(ctx) + + q, qArgs, pCtx, err := s.prepareQuery(pToken, query, pOpts, vars) + if err != nil { + return nil, err + } + + l.Debug("running query with size limit", zap.String("query", q), zap.Any("args", qArgs)) + + rows, err := s.db.QueryContext(ctx, q, qArgs...) + if err != nil { + l.Error("failed to run query", zap.String("query", q), zap.Any("args", qArgs), zap.Error(err)) + return nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + values := make([]interface{}, len(columns)) + scanArgs := make([]interface{}, len(values)) + for i := range values { + scanArgs[i] = &values[i] + } + + result := &queryResult{} + var lastRowID any + rowCount := 0 + hitPageLimit := false + + for rows.Next() { + rowCount++ + + // Check if we've exceeded the page limit (standard pagination) + if pCtx != nil && rowCount > int(pCtx.Limit) { + hitPageLimit = true + break + } + + if err := rows.Scan(scanArgs...); err != nil { + return nil, err + } + + foundPaginationKey := false + rowMap := make(map[string]interface{}) + for i, colName := range columns { + rowMap[colName] = values[i] + if pCtx != nil && pCtx.PrimaryKey == colName { + lastRowID = values[i] + foundPaginationKey = true + } + } + + if pCtx != nil && !foundPaginationKey { + return nil, errors.New("primary key not found in query results") + } + + // Estimate the size of this row before processing + rowSize := estimateRowSize(rowMap) + + // Check if adding this item would exceed size limits + // We check BEFORE processing to avoid returning partial results + if result.TotalSize > 0 && result.TotalSize+rowSize > maxResponseSizeBytes { + result.HitSizeLimit = true + l.Info("stopping query early due to response size limit", + zap.Int64("current_size", result.TotalSize), + zap.Int64("row_size", rowSize), + zap.Int("items_returned", result.ItemCount)) + break + } + + ok, itemSize, err := rowCallback(ctx, rowMap) + if err != nil { + return nil, err + } + + // Use the actual item size if provided, otherwise use the row size estimate + if itemSize > 0 { + result.TotalSize += itemSize + } else { + result.TotalSize += rowSize + } + result.ItemCount++ + + if !ok { + break + } + } + + if err := rows.Err(); err != nil { + return nil, err + } + + // Determine if we need a next page token + // We need one if: we hit the page limit, OR we hit the size limit + if pCtx != nil && (hitPageLimit || result.HitSizeLimit) { + result.NextPageToken, err = s.nextPageToken(pCtx, lastRowID) + if err != nil { + return nil, err + } + } + + return result, nil +} + func (s *SQLSyncer) runQuery( ctx context.Context, pToken *pagination.Token, diff --git a/pkg/bsql/resources.go b/pkg/bsql/resources.go index 4713c10f..1bd98e06 100644 --- a/pkg/bsql/resources.go +++ b/pkg/bsql/resources.go @@ -26,19 +26,41 @@ func (s *SQLSyncer) List(ctx context.Context, parentResourceID *v2.ResourceId, p return nil, "", nil, err } - npt, err := s.runQuery(ctx, pToken, s.config.List.Query, s.config.List.Pagination, queryVars, func(ctx context.Context, rowMap map[string]any) (bool, error) { - r, err := s.mapResource(ctx, rowMap) - if err != nil { - return false, err - } - ret = append(ret, r) - return true, nil - }) + // Use size-aware query execution to prevent exceeding gRPC message size limits + result, err := s.runQueryWithSizeLimit(ctx, pToken, s.config.List.Query, s.config.List.Pagination, queryVars, + func(ctx context.Context, rowMap map[string]any) (bool, int64, error) { + r, err := s.mapResource(ctx, rowMap) + if err != nil { + return false, 0, err + } + ret = append(ret, r) + // Estimate size of the serialized resource + size := estimateResourceSize(r) + return true, size, nil + }) if err != nil { return nil, "", nil, err } - return ret, npt, nil, nil + return ret, result.NextPageToken, nil, nil +} + +// estimateResourceSize provides a rough estimate of the serialized size of a resource. +func estimateResourceSize(r *v2.Resource) int64 { + if r == nil { + return 0 + } + var size int64 + size += int64(len(r.DisplayName)) + size += int64(len(r.Description)) + if r.Id != nil { + size += int64(len(r.Id.Resource)) + size += int64(len(r.Id.ResourceType)) + } + // Add overhead for annotations, traits, and protobuf encoding + size += int64(len(r.Annotations) * 100) + size += sizeEstimateOverhead + return size } func (s *SQLSyncer) fetchTraits() map[string]bool { From fc6c302762e4a44f1b6ff6ee7d15960628d5f7fb Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 8 Apr 2026 15:32:04 +0000 Subject: [PATCH 2/2] fix: address PR review feedback for size-aware pagination - Fix misleading comment (3.5MB -> 3MB) and remove unused constants - Fix critical data loss: track lastRowID separately from currentRowID so unprocessed rows aren't skipped on cursor-based pagination - Fix offset pagination: adjust limit to actual item count when size limit is hit, so next offset is computed correctly - Add post-callback size check for consistent size enforcement - Guard against nil pointer dereference in estimateGrantSize - Fix godot lint issue (comment period) https://claude.ai/code/session_01Gg185kHrjNU2aznuRQ7CsG --- pkg/bsql/grants.go | 2 +- pkg/bsql/query.go | 53 +++++++++++++++++++++++++++++----------------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/pkg/bsql/grants.go b/pkg/bsql/grants.go index d2b0fa05..5db9d64e 100644 --- a/pkg/bsql/grants.go +++ b/pkg/bsql/grants.go @@ -116,7 +116,7 @@ func estimateGrantSize(g *v2.Grant) int64 { if g.Entitlement != nil { size += int64(len(g.Entitlement.Id)) } - if g.Principal != nil { + if g.Principal != nil && g.Principal.Id != nil { size += int64(len(g.Principal.Id.Resource)) size += int64(len(g.Principal.Id.ResourceType)) } diff --git a/pkg/bsql/query.go b/pkg/bsql/query.go index 380972cd..f2819772 100644 --- a/pkg/bsql/query.go +++ b/pkg/bsql/query.go @@ -27,12 +27,10 @@ const ( limitKey = "limit" unquotedKey = "unquoted" - // gRPC message size limits - we use a conservative threshold to avoid hitting the 4MB limit - // The actual limit is 4MB, but we use 3.5MB to leave room for overhead and metadata - maxResponseSizeBytes = 3 * 1024 * 1024 // 3MB conservative limit - sizeEstimateOverhead = 500 // Overhead per item for protobuf encoding - minPageSizeForSizeLimit = 1 // Minimum page size when reducing due to size limits - pageSizeReductionDivisor = 2 // How much to reduce page size when hitting limits + // gRPC message size limits - we use a conservative threshold to avoid hitting the 4MB limit. + // The actual limit is 4MB, but we use 3MB to leave room for overhead and metadata. + maxResponseSizeBytes = 3 * 1024 * 1024 // 3MB conservative limit + sizeEstimateOverhead = 500 // Overhead per item for protobuf encoding ) var ErrQueryAffectedZeroRows = errors.New("query affected 0 rows, ending and rolling back") @@ -578,7 +576,7 @@ func (s *SQLSyncer) runQueryWithSizeLimit( for rows.Next() { rowCount++ - // Check if we've exceeded the page limit (standard pagination) + // Check if we've exceeded the page limit (standard pagination). if pCtx != nil && rowCount > int(pCtx.Limit) { hitPageLimit = true break @@ -589,11 +587,12 @@ func (s *SQLSyncer) runQueryWithSizeLimit( } foundPaginationKey := false + var currentRowID any rowMap := make(map[string]interface{}) for i, colName := range columns { rowMap[colName] = values[i] if pCtx != nil && pCtx.PrimaryKey == colName { - lastRowID = values[i] + currentRowID = values[i] foundPaginationKey = true } } @@ -602,16 +601,13 @@ func (s *SQLSyncer) runQueryWithSizeLimit( return nil, errors.New("primary key not found in query results") } - // Estimate the size of this row before processing - rowSize := estimateRowSize(rowMap) - - // Check if adding this item would exceed size limits - // We check BEFORE processing to avoid returning partial results - if result.TotalSize > 0 && result.TotalSize+rowSize > maxResponseSizeBytes { + // Check if adding this item would exceed size limits. + // We check BEFORE processing so that we can stop without data loss. + // The unprocessed row will be picked up by the next page. + if result.TotalSize > 0 && result.TotalSize+estimateRowSize(rowMap) > maxResponseSizeBytes { result.HitSizeLimit = true l.Info("stopping query early due to response size limit", zap.Int64("current_size", result.TotalSize), - zap.Int64("row_size", rowSize), zap.Int("items_returned", result.ItemCount)) break } @@ -621,14 +617,28 @@ func (s *SQLSyncer) runQueryWithSizeLimit( return nil, err } - // Use the actual item size if provided, otherwise use the row size estimate + // Use the actual item size if provided, otherwise use the row estimate. if itemSize > 0 { result.TotalSize += itemSize } else { - result.TotalSize += rowSize + result.TotalSize += estimateRowSize(rowMap) } result.ItemCount++ + // Only update lastRowID after the callback succeeds, so that + // the next page token points to the last *processed* row. + lastRowID = currentRowID + + // Post-callback size check: if accumulated size now exceeds the + // limit, stop before processing the next row. + if result.TotalSize > maxResponseSizeBytes { + result.HitSizeLimit = true + l.Info("stopping query after callback due to response size limit", + zap.Int64("current_size", result.TotalSize), + zap.Int("items_returned", result.ItemCount)) + break + } + if !ok { break } @@ -638,9 +648,14 @@ func (s *SQLSyncer) runQueryWithSizeLimit( return nil, err } - // Determine if we need a next page token - // We need one if: we hit the page limit, OR we hit the size limit + // Determine if we need a next page token. + // We need one if: we hit the page limit, OR we hit the size limit. if pCtx != nil && (hitPageLimit || result.HitSizeLimit) { + // For offset strategy when hitting size limits, adjust the limit + // to reflect actual items processed so the next offset is correct. + if result.HitSizeLimit && pCtx.Strategy == offsetKey { + pCtx.Limit = int64(result.ItemCount) + } result.NextPageToken, err = s.nextPageToken(pCtx, lastRowID) if err != nil { return nil, err