diff --git a/pkg/bsql/entitlements.go b/pkg/bsql/entitlements.go index e366a2c1..d269ad4b 100644 --- a/pkg/bsql/entitlements.go +++ b/pkg/bsql/entitlements.go @@ -86,25 +86,46 @@ func (s *SQLSyncer) dynamicEntitlements(ctx context.Context, resource *v2.Resour return nil, "", nil, err } - npt, err := s.runQuery(ctx, pToken, s.config.Entitlements.Query, s.config.Entitlements.Pagination, queryVars, func(ctx context.Context, rowMap map[string]any) (bool, error) { - for _, mapping := range s.config.Entitlements.Map { - r, ok, err := s.mapEntitlement(ctx, resource, mapping, rowMap) - if err != nil { - return false, err - } - - if ok { - r.Resource = resource - ret = append(ret, r) + // Use size-aware query execution to prevent exceeding gRPC message size limits + result, err := s.runQueryWithSizeLimit(ctx, pToken, s.config.Entitlements.Query, s.config.Entitlements.Pagination, queryVars, + func(ctx context.Context, rowMap map[string]any) (bool, int64, error) { + var itemSize int64 + for _, mapping := range s.config.Entitlements.Map { + r, ok, err := s.mapEntitlement(ctx, resource, mapping, rowMap) + if err != nil { + return false, 0, err + } + + if ok { + r.Resource = resource + ret = append(ret, r) + itemSize += estimateEntitlementSize(r) + } } - } - return true, nil - }) + return true, itemSize, nil + }) if err != nil { return nil, "", nil, err } - return ret, npt, nil, nil + return ret, result.NextPageToken, nil, nil +} + +// estimateEntitlementSize provides a rough estimate of the serialized size of an entitlement. +func estimateEntitlementSize(e *v2.Entitlement) int64 { + if e == nil { + return 0 + } + var size int64 + size += int64(len(e.Id)) + size += int64(len(e.DisplayName)) + size += int64(len(e.Description)) + size += int64(len(e.Slug)) + // Add overhead for annotations, resource reference, and protobuf encoding + size += int64(len(e.Annotations) * 50) + size += int64(len(e.GrantableTo) * 50) + size += sizeEstimateOverhead + return size } func (s *SQLSyncer) mapEntitlement(ctx context.Context, resource *v2.Resource, mappings *EntitlementMapping, rowMap map[string]any) (*v2.Entitlement, bool, error) { diff --git a/pkg/bsql/grants.go b/pkg/bsql/grants.go index 38abcfa8..5db9d64e 100644 --- a/pkg/bsql/grants.go +++ b/pkg/bsql/grants.go @@ -82,24 +82,48 @@ func (s *SQLSyncer) listGrants(ctx context.Context, resource *v2.Resource, pToke return nil, "", err } - npt, err := s.runQuery(ctx, pToken, grantConfig.Query, grantConfig.Pagination, queryVars, func(ctx context.Context, rowMap map[string]any) (bool, error) { - for _, mapping := range grantConfig.Map { - g, ok, err := s.mapGrant(ctx, resource, mapping, rowMap) - if err != nil { - return false, err - } + // Use size-aware query execution to prevent exceeding gRPC message size limits + result, err := s.runQueryWithSizeLimit(ctx, pToken, grantConfig.Query, grantConfig.Pagination, queryVars, + func(ctx context.Context, rowMap map[string]any) (bool, int64, error) { + var itemSize int64 + for _, mapping := range grantConfig.Map { + g, ok, err := s.mapGrant(ctx, resource, mapping, rowMap) + if err != nil { + return false, 0, err + } - if ok { - ret = append(ret, g) + if ok { + ret = append(ret, g) + itemSize += estimateGrantSize(g) + } } - } - return true, nil - }) + return true, itemSize, nil + }) if err != nil { return nil, "", err } - return ret, npt, nil + return ret, result.NextPageToken, nil +} + +// estimateGrantSize provides a rough estimate of the serialized size of a grant. +func estimateGrantSize(g *v2.Grant) int64 { + if g == nil { + return 0 + } + var size int64 + size += int64(len(g.Id)) + if g.Entitlement != nil { + size += int64(len(g.Entitlement.Id)) + } + if g.Principal != nil && g.Principal.Id != nil { + size += int64(len(g.Principal.Id.Resource)) + size += int64(len(g.Principal.Id.ResourceType)) + } + // Add overhead for annotations and protobuf encoding + size += int64(len(g.Annotations) * 50) + size += sizeEstimateOverhead + return size } func (s *SQLSyncer) mapGrant(ctx context.Context, resource *v2.Resource, mapping *GrantMapping, rowMap map[string]any) (*v2.Grant, bool, error) { diff --git a/pkg/bsql/query.go b/pkg/bsql/query.go index 06ff670c..f2819772 100644 --- a/pkg/bsql/query.go +++ b/pkg/bsql/query.go @@ -26,11 +26,51 @@ const ( cursorKey = "cursor" limitKey = "limit" unquotedKey = "unquoted" + + // gRPC message size limits - we use a conservative threshold to avoid hitting the 4MB limit. + // The actual limit is 4MB, but we use 3MB to leave room for overhead and metadata. + maxResponseSizeBytes = 3 * 1024 * 1024 // 3MB conservative limit + sizeEstimateOverhead = 500 // Overhead per item for protobuf encoding ) var ErrQueryAffectedZeroRows = errors.New("query affected 0 rows, ending and rolling back") var ErrQueryAffectedMoreThanOneRow = errors.New("query affected more than one row, ending and rolling back") +// queryResult contains the result of a query execution along with size tracking information. +type queryResult struct { + // NextPageToken is the token for the next page of results + NextPageToken string + // TotalSize is the approximate total size of all results in bytes + TotalSize int64 + // ItemCount is the number of items returned + ItemCount int + // HitSizeLimit indicates whether the query stopped early due to size limits + HitSizeLimit bool +} + +// estimateRowSize provides a rough estimate of the serialized size of a row map. +// This is used to prevent exceeding gRPC message size limits. +func estimateRowSize(rowMap map[string]interface{}) int64 { + var size int64 + for k, v := range rowMap { + size += int64(len(k)) + switch val := v.(type) { + case string: + size += int64(len(val)) + case []byte: + size += int64(len(val)) + case nil: + // nil values have minimal overhead + default: + // For other types, estimate 8-32 bytes + size += 16 + } + } + // Add overhead for protobuf encoding, traits, and other fields + size += sizeEstimateOverhead + return size +} + type executor interface { ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) @@ -490,6 +530,141 @@ func (s *SQLSyncer) normalizeValue(val any) any { return val } +// runQueryWithSizeLimit executes a query with size-based early termination. +// It returns a queryResult containing pagination info and size tracking. +// The sizeEstimator function should return the estimated serialized size of the processed item. +func (s *SQLSyncer) runQueryWithSizeLimit( + ctx context.Context, + pToken *pagination.Token, + query string, + pOpts *Pagination, + vars map[string]any, + rowCallback func(context.Context, map[string]interface{}) (bool, int64, error), +) (*queryResult, error) { + l := ctxzap.Extract(ctx) + + q, qArgs, pCtx, err := s.prepareQuery(pToken, query, pOpts, vars) + if err != nil { + return nil, err + } + + l.Debug("running query with size limit", zap.String("query", q), zap.Any("args", qArgs)) + + rows, err := s.db.QueryContext(ctx, q, qArgs...) + if err != nil { + l.Error("failed to run query", zap.String("query", q), zap.Any("args", qArgs), zap.Error(err)) + return nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + values := make([]interface{}, len(columns)) + scanArgs := make([]interface{}, len(values)) + for i := range values { + scanArgs[i] = &values[i] + } + + result := &queryResult{} + var lastRowID any + rowCount := 0 + hitPageLimit := false + + for rows.Next() { + rowCount++ + + // Check if we've exceeded the page limit (standard pagination). + if pCtx != nil && rowCount > int(pCtx.Limit) { + hitPageLimit = true + break + } + + if err := rows.Scan(scanArgs...); err != nil { + return nil, err + } + + foundPaginationKey := false + var currentRowID any + rowMap := make(map[string]interface{}) + for i, colName := range columns { + rowMap[colName] = values[i] + if pCtx != nil && pCtx.PrimaryKey == colName { + currentRowID = values[i] + foundPaginationKey = true + } + } + + if pCtx != nil && !foundPaginationKey { + return nil, errors.New("primary key not found in query results") + } + + // Check if adding this item would exceed size limits. + // We check BEFORE processing so that we can stop without data loss. + // The unprocessed row will be picked up by the next page. + if result.TotalSize > 0 && result.TotalSize+estimateRowSize(rowMap) > maxResponseSizeBytes { + result.HitSizeLimit = true + l.Info("stopping query early due to response size limit", + zap.Int64("current_size", result.TotalSize), + zap.Int("items_returned", result.ItemCount)) + break + } + + ok, itemSize, err := rowCallback(ctx, rowMap) + if err != nil { + return nil, err + } + + // Use the actual item size if provided, otherwise use the row estimate. + if itemSize > 0 { + result.TotalSize += itemSize + } else { + result.TotalSize += estimateRowSize(rowMap) + } + result.ItemCount++ + + // Only update lastRowID after the callback succeeds, so that + // the next page token points to the last *processed* row. + lastRowID = currentRowID + + // Post-callback size check: if accumulated size now exceeds the + // limit, stop before processing the next row. + if result.TotalSize > maxResponseSizeBytes { + result.HitSizeLimit = true + l.Info("stopping query after callback due to response size limit", + zap.Int64("current_size", result.TotalSize), + zap.Int("items_returned", result.ItemCount)) + break + } + + if !ok { + break + } + } + + if err := rows.Err(); err != nil { + return nil, err + } + + // Determine if we need a next page token. + // We need one if: we hit the page limit, OR we hit the size limit. + if pCtx != nil && (hitPageLimit || result.HitSizeLimit) { + // For offset strategy when hitting size limits, adjust the limit + // to reflect actual items processed so the next offset is correct. + if result.HitSizeLimit && pCtx.Strategy == offsetKey { + pCtx.Limit = int64(result.ItemCount) + } + result.NextPageToken, err = s.nextPageToken(pCtx, lastRowID) + if err != nil { + return nil, err + } + } + + return result, nil +} + func (s *SQLSyncer) runQuery( ctx context.Context, pToken *pagination.Token, diff --git a/pkg/bsql/resources.go b/pkg/bsql/resources.go index 4713c10f..1bd98e06 100644 --- a/pkg/bsql/resources.go +++ b/pkg/bsql/resources.go @@ -26,19 +26,41 @@ func (s *SQLSyncer) List(ctx context.Context, parentResourceID *v2.ResourceId, p return nil, "", nil, err } - npt, err := s.runQuery(ctx, pToken, s.config.List.Query, s.config.List.Pagination, queryVars, func(ctx context.Context, rowMap map[string]any) (bool, error) { - r, err := s.mapResource(ctx, rowMap) - if err != nil { - return false, err - } - ret = append(ret, r) - return true, nil - }) + // Use size-aware query execution to prevent exceeding gRPC message size limits + result, err := s.runQueryWithSizeLimit(ctx, pToken, s.config.List.Query, s.config.List.Pagination, queryVars, + func(ctx context.Context, rowMap map[string]any) (bool, int64, error) { + r, err := s.mapResource(ctx, rowMap) + if err != nil { + return false, 0, err + } + ret = append(ret, r) + // Estimate size of the serialized resource + size := estimateResourceSize(r) + return true, size, nil + }) if err != nil { return nil, "", nil, err } - return ret, npt, nil, nil + return ret, result.NextPageToken, nil, nil +} + +// estimateResourceSize provides a rough estimate of the serialized size of a resource. +func estimateResourceSize(r *v2.Resource) int64 { + if r == nil { + return 0 + } + var size int64 + size += int64(len(r.DisplayName)) + size += int64(len(r.Description)) + if r.Id != nil { + size += int64(len(r.Id.Resource)) + size += int64(len(r.Id.ResourceType)) + } + // Add overhead for annotations, traits, and protobuf encoding + size += int64(len(r.Annotations) * 100) + size += sizeEstimateOverhead + return size } func (s *SQLSyncer) fetchTraits() map[string]bool {