diff --git a/examples/example.yml b/examples/example.yml index dee124de..31e8c01a 100644 --- a/examples/example.yml +++ b/examples/example.yml @@ -1,6 +1,6 @@ --- # Connector Configuration Reference -# =============================== +# =================================== # This is a reference configuration demonstrating all available options # and their purposes in a connector configuration file. @@ -10,7 +10,7 @@ app_name: Example Application # Connection Configuration # ---------------------- # Specifies how to connect to the data source. Supports various connection methods. -# +# # RECOMMENDED: Use structured fields! connect: scheme: "mysql" @@ -32,135 +32,164 @@ connect: # connect: # dsn: "mysql://${DB_USER}:${DB_PASS}@${DB_HOST}:3306/${DB_NAME}?parseTime=true" +# Incremental Sync Settings (Optional) +# ------------------------------------ +# Global settings for event-feed-based incremental sync. +# When any resource type or grants query has incremental_sync configured, the connector +# automatically exposes an EventFeed to ConductorOne for efficient change detection. +incremental_sync: + default_lookback: 3h # How far back to look on first run; any Go duration string + # Resource Types # ------------- # Defines the resources that can be synchronized from the data source. # Each resource type represents a distinct entity type (e.g., users, groups, roles). resource_types: + # Example User Resource - # ------------------- + # --------------------- user: - name: "User" # Display name for this resource type + name: "User" description: "Represents a user account in the system" + # Targeted Sync — Get Query + # ------------------------- + # Fetches a single resource by ID. Required when incremental_sync is configured + # on this resource type. Uses the same list.map for column mapping. + get: + query: | + SELECT id, username, email, created_at, status, department, updated_at + FROM users + WHERE id = ? + # List Configuration - # ---------------- - # Defines how to retrieve a list of resources + # ------------------ + # Defines how to retrieve a paginated list of resources. list: - # SQL query to fetch resources. Supports multiple query types: - # - Direct SQL queries - # - Stored procedure calls - # - Complex joins and subqueries query: | - SELECT + SELECT id, username, email, created_at, status, - department + department, + updated_at FROM users - WHERE status = 'active' - AND id > ? + WHERE id > ? ORDER BY id ASC LIMIT ? - - # Mapping Configuration - # ------------------- - # Defines how to transform raw data into standardized resource objects map: - # Required Fields - # -------------- - # These fields are required for all resources - id: ".id" # Maps the 'id' column to the resource ID - display_name: ".username" # Human-readable name - description: "string(.department) + ' department user'" # Can use CEL expressions - - # Optional Traits - # -------------- - # Custom attributes specific to this resource type + id: ".id" + display_name: ".username" + description: "string(.department) + ' department user'" traits: user: - # The trait name defines the schema emails: - # Array fields - - ".email" # Direct field mapping - - "lowercase(.email)" # CEL transformation - status: ".status" # Simple field mapping + - ".email" + status: ".status" profile: department: ".department" joined_date: ".created_at" # Complex CEL transformation example full_name: "titleCase(.first_name) + ' ' + titleCase(.last_name)" + pagination: + strategy: "cursor" + primary_key: "id" - # Pagination Configuration - # ---------------------- - # Defines how to handle large result sets + # Resource Incremental Sync + # ------------------------- + # Detects new/modified resources via timestamp filtering. Emits a ResourceChangeEvent + # per row; C1 then calls the get query above to re-fetch the full resource. + # ? is automatically injected with the cursor timestamp for this source. + incremental_sync: + query: | + SELECT id, username, email, created_at, status, department, updated_at + FROM users + WHERE updated_at > ? + ORDER BY updated_at ASC + LIMIT ? OFFSET ? + cursor_column: updated_at # Column whose max value advances this source's cursor pagination: - strategy: "cursor" # Options: "cursor", "offset" - primary_key: "id" # Column used for pagination tracking + strategy: offset + primary_key: id # Static Entitlements - # ------------------ - # Pre-defined permissions that can be granted + # ------------------- + # Pre-defined permissions that can be granted. static_entitlements: - - id: "access" # Unique identifier for this entitlement + - id: "access" display_name: "Basic Access" description: "Provides basic access to the application" - purpose: "access" # Purpose: "access", "assignment", "permission" + purpose: "assignment" grantable_to: - # Resource types that can receive this entitlement - "user" - "service_account" - # Provisioning Configuration - # ------------------------ - # Defines how to implement entitlement changes provisioning: vars: - # Variables available in provisioning queries user_id: "principal.ID" access_level: "'basic'" - - # Grant Operations - # --------------- grant: - # SQL statements to execute when granting queries: - | INSERT INTO user_access (user_id, level) VALUES (?, ?) - - # Revoke Operations - # ---------------- revoke: - # SQL statements to execute when revoking queries: - | DELETE FROM user_access WHERE user_id = ? + # Grants Query Configuration - # ------------------------ - # Defines how to discover existing entitlements + # -------------------------- + # Defines how to discover existing entitlement assignments. grants: - query: | - SELECT + SELECT + id AS grant_id, user_id, access_level, - granted_at + updated_at FROM user_access LIMIT ? OFFSET ? - - # Grant Mapping - # ------------ - # Defines how to interpret grant query results map: - - skip_if: ".access_level != 'basic'" # CEL condition to filter results + - skip_if: ".access_level != 'basic'" principal_id: ".user_id" principal_type: "user" entitlement_id: "access" - # Grants Pagination - # ---------------- pagination: strategy: "offset" - primary_key: "user_id" -# Example: groups, roles, applications, etc. + primary_key: "grant_id" + + # Grants Incremental Sync + # ----------------------- + # Detects new and revoked grants without a full re-sync. + # The parent map is reused for row-to-grant mapping. + # NOTE: CEL expressions in map must not reference resource trait fields + # (display_name, description, etc.) in this context — only row columns are available. + incremental_sync: + resource_id: ".user_id" # CEL expr: extracts resource ID from each row + # (required — incremental queries span ALL resources) + changes_query: | + SELECT id AS grant_id, user_id, access_level, updated_at + FROM user_access + WHERE deleted_at IS NULL + AND updated_at > ? + ORDER BY updated_at ASC + LIMIT ? OFFSET ? + changes_cursor_column: updated_at + + # Optional: only configure when soft-delete is available in the schema. + revokes_query: | + SELECT id AS grant_id, user_id, access_level, deleted_at + FROM user_access + WHERE deleted_at > ? + ORDER BY deleted_at ASC + LIMIT ? OFFSET ? + revokes_cursor_column: deleted_at + + pagination: + strategy: offset + primary_key: grant_id # Must be present in both changes_query and revokes_query SELECTs + +# Example: add groups, roles, applications, etc. following the same pattern. diff --git a/examples/mysql-test.yml b/examples/mysql-test.yml index 47f98707..194affb7 100644 --- a/examples/mysql-test.yml +++ b/examples/mysql-test.yml @@ -11,6 +11,9 @@ connect: user: "${DB_USER}" password: "${DB_PASSWORD}" +incremental_sync: + default_lookback: 3h + actions: update_user_attributes: name: Update User Attributes @@ -208,6 +211,77 @@ resource_types: attr_employee_number: ".attr_employee_number" attr_employment_type: ".attr_employment_type" + # Fetches a single user by ID (required when incremental_sync is configured). + get: + query: | + SELECT + u.id, + u.username, + u.email, + u.employee_id, + u.status, + u.account_type, + u.created_at, + CASE + WHEN u.last_login IS NULL THEN '' + ELSE DATE_FORMAT(u.last_login, '%Y-%m-%dT%H:%i:%sZ') + END as last_login, + u.manager_id, + m.username as manager_username, + m.email as manager_email, + u.attr_first_name, + u.attr_middle_name, + u.attr_last_name, + u.attr_display_name, + u.attr_job_title, + u.attr_department, + u.attr_division, + u.attr_company, + u.attr_employee_number, + u.attr_employment_type + FROM users u + LEFT JOIN users m ON u.manager_id = m.id + WHERE u.username = ? + + # Detects new and modified users via updated_at. + incremental_sync: + query: | + SELECT + u.id, + u.username, + u.email, + u.employee_id, + u.status, + u.account_type, + u.created_at, + CASE + WHEN u.last_login IS NULL THEN '' + ELSE DATE_FORMAT(u.last_login, '%Y-%m-%dT%H:%i:%sZ') + END as last_login, + u.manager_id, + m.username as manager_username, + m.email as manager_email, + u.attr_first_name, + u.attr_middle_name, + u.attr_last_name, + u.attr_display_name, + u.attr_job_title, + u.attr_department, + u.attr_division, + u.attr_company, + u.attr_employee_number, + u.attr_employment_type, + u.updated_at + FROM users u + LEFT JOIN users m ON u.manager_id = m.id + WHERE u.updated_at > ? + ORDER BY u.updated_at ASC + LIMIT ? OFFSET ? + cursor_column: updated_at + pagination: + strategy: offset + primary_key: id + # Account provisioning configuration with password support account_provisioning: schema: @@ -312,6 +386,26 @@ resource_types: profile: role_id: ".id" + # Fetches a single role by ID (required when incremental_sync is configured). + get: + query: | + SELECT id, role_name + FROM roles + WHERE role_name = ? + + # Detects new and modified roles via updated_at. + incremental_sync: + query: | + SELECT id, role_name, updated_at + FROM roles + WHERE updated_at > ? + ORDER BY updated_at ASC + LIMIT ? OFFSET ? + cursor_column: updated_at + pagination: + strategy: offset + primary_key: id + # Define a static entitlement for role membership static_entitlements: - id: "member" @@ -341,3 +435,37 @@ resource_types: principal_id: ".username" principal_type: "user" entitlement_id: "member" + + # Detects new role assignments and soft-deleted revokes. + incremental_sync: + resource_id: ".role_name" + changes_query: | + SELECT + ur.id AS grant_id, + u.username, + r.role_name, + ur.updated_at + FROM user_roles ur + JOIN users u ON u.id = ur.user_id + JOIN roles r ON r.id = ur.role_id + WHERE ur.deleted_at IS NULL + AND ur.updated_at > ? + ORDER BY ur.updated_at ASC + LIMIT ? OFFSET ? + changes_cursor_column: updated_at + revokes_query: | + SELECT + ur.id AS grant_id, + u.username, + r.role_name, + ur.deleted_at + FROM user_roles ur + JOIN users u ON u.id = ur.user_id + JOIN roles r ON r.id = ur.role_id + WHERE ur.deleted_at > ? + ORDER BY ur.deleted_at ASC + LIMIT ? OFFSET ? + revokes_cursor_column: deleted_at + pagination: + strategy: offset + primary_key: grant_id diff --git a/examples/oracle-test.yml b/examples/oracle-test.yml index 49c1fc66..65f4a45c 100644 --- a/examples/oracle-test.yml +++ b/examples/oracle-test.yml @@ -10,6 +10,9 @@ connect: user: "${DB_USER}" password: "${DB_PASSWORD}" +incremental_sync: + default_lookback: 3h + actions: enable_user: name: Enable User @@ -188,6 +191,68 @@ resource_types: attr_company: ".attr_company" attr_employee_number: ".attr_employee_number" attr_employment_type: ".attr_employment_type" + # Fetches a single user by ID (required when incremental_sync is configured). + get: + query: | + SELECT + id as "id", + username as "username", + email as "email", + employee_id as "employee_id", + status as "status", + account_type as "account_type", + created_at as "created_at", + last_login as "last_login", + manager_id as "manager_id", + attr_first_name as "attr_first_name", + attr_middle_name as "attr_middle_name", + attr_last_name as "attr_last_name", + attr_display_name as "attr_display_name", + attr_job_title as "attr_job_title", + attr_department as "attr_department", + attr_division as "attr_division", + attr_company as "attr_company", + attr_employee_number as "attr_employee_number", + attr_employment_type as "attr_employment_type" + FROM users + WHERE username = ? + + # Detects new and modified users via updated_at. + # Note: Oracle role and privilege types are sourced from system views (DBA_ROLES, + # DBA_SYS_PRIVS) which have no modification timestamps; incremental sync is not + # applicable to those resource types. + incremental_sync: + query: | + SELECT + id as "id", + username as "username", + email as "email", + employee_id as "employee_id", + status as "status", + account_type as "account_type", + created_at as "created_at", + last_login as "last_login", + manager_id as "manager_id", + attr_first_name as "attr_first_name", + attr_middle_name as "attr_middle_name", + attr_last_name as "attr_last_name", + attr_display_name as "attr_display_name", + attr_job_title as "attr_job_title", + attr_department as "attr_department", + attr_division as "attr_division", + attr_company as "attr_company", + attr_employee_number as "attr_employee_number", + attr_employment_type as "attr_employment_type", + updated_at as "updated_at" + FROM users + WHERE updated_at > ? + ORDER BY updated_at + OFFSET ? ROWS FETCH NEXT ? ROWS ONLY + cursor_column: updated_at + pagination: + strategy: offset + primary_key: id + account_provisioning: # Schema definition for account creation form schema: diff --git a/examples/postgres-test.yml b/examples/postgres-test.yml index f8a5aacb..aa8d238a 100644 --- a/examples/postgres-test.yml +++ b/examples/postgres-test.yml @@ -10,6 +10,9 @@ connect: user: "${DB_USER}" password: "${DB_PASSWORD}" +incremental_sync: + default_lookback: 3h + actions: enable_user: name: Enable User @@ -232,6 +235,71 @@ resource_types: attr_employee_number: ".attr_employee_number" attr_employment_type: ".attr_employment_type" + # Fetches a single user by ID (required when incremental_sync is configured). + get: + query: | + SELECT + u.id, + u.username, + u.email, + u.employee_id, + u.status, + u.account_type, + u.created_at, + u.last_login, + u.manager_id, + m.username as manager_username, + m.email as manager_email, + u.attr_first_name, + u.attr_middle_name, + u.attr_last_name, + u.attr_display_name, + u.attr_job_title, + u.attr_department, + u.attr_division, + u.attr_company, + u.attr_employee_number, + u.attr_employment_type + FROM users u + LEFT JOIN users m ON u.manager_id = m.id + WHERE u.username = ? + + # Detects new and modified users via updated_at. + incremental_sync: + query: | + SELECT + u.id, + u.username, + u.email, + u.employee_id, + u.status, + u.account_type, + u.created_at, + u.last_login, + u.manager_id, + m.username as manager_username, + m.email as manager_email, + u.attr_first_name, + u.attr_middle_name, + u.attr_last_name, + u.attr_display_name, + u.attr_job_title, + u.attr_department, + u.attr_division, + u.attr_company, + u.attr_employee_number, + u.attr_employment_type, + u.updated_at + FROM users u + LEFT JOIN users m ON u.manager_id = m.id + WHERE u.updated_at > ? + ORDER BY u.updated_at ASC + LIMIT ? OFFSET ? + cursor_column: updated_at + pagination: + strategy: offset + primary_key: id + # Account provisioning configuration with password support account_provisioning: schema: @@ -354,6 +422,26 @@ resource_types: profile: role_id: ".id" + # Fetches a single role by ID (required when incremental_sync is configured). + get: + query: | + SELECT id, role_name + FROM roles + WHERE role_name = ? + + # Detects new and modified roles via updated_at. + incremental_sync: + query: | + SELECT id, role_name, updated_at + FROM roles + WHERE updated_at > ? + ORDER BY updated_at ASC + LIMIT ? OFFSET ? + cursor_column: updated_at + pagination: + strategy: offset + primary_key: id + # Define a static entitlement for role membership static_entitlements: - id: "member" @@ -429,6 +517,40 @@ resource_types: principal_type: "user" entitlement_id: "member" + # Detects new role assignments and soft-deleted revokes. + incremental_sync: + resource_id: ".role_name" + changes_query: | + SELECT + ur.id AS grant_id, + u.username, + r.role_name, + ur.updated_at + FROM user_roles ur + JOIN users u ON u.id = ur.user_id + JOIN roles r ON r.id = ur.role_id + WHERE ur.deleted_at IS NULL + AND ur.updated_at > ? + ORDER BY ur.updated_at ASC + LIMIT ? OFFSET ? + changes_cursor_column: updated_at + revokes_query: | + SELECT + ur.id AS grant_id, + u.username, + r.role_name, + ur.deleted_at + FROM user_roles ur + JOIN users u ON u.id = ur.user_id + JOIN roles r ON r.id = ur.role_id + WHERE ur.deleted_at > ? + ORDER BY ur.deleted_at ASC + LIMIT ? OFFSET ? + revokes_cursor_column: deleted_at + pagination: + strategy: offset + primary_key: grant_id + feature: name: "Feature" description: "A Feature in the App" diff --git a/examples/sap-hana-test.yml b/examples/sap-hana-test.yml index 8f8b2b2f..3e6189ea 100644 --- a/examples/sap-hana-test.yml +++ b/examples/sap-hana-test.yml @@ -7,6 +7,9 @@ user: "${DB_USER}" password: "${DB_PASSWORD}" + incremental_sync: + default_lookback: 3h + resource_types: user: name: "User" @@ -58,6 +61,57 @@ created_at: .CREATE_TIME last_login: .LAST_SUCCESSFUL_CONNECT + # Fetches a single user by ID (required when incremental_sync is configured). + get: + query: | + SELECT + USER_ID, + "SYS"."USERS".USER_NAME AS USER_NAME, + CASE + WHEN USER_DEACTIVATED = 'TRUE' THEN 'inactive' + ELSE 'active' + END as STATUS, + CREATE_TIME, + LAST_SUCCESSFUL_CONNECT, + COMMENTS, + VALUE AS EMAIL_ADDRESS + FROM "SYS"."USERS" + LEFT JOIN "USER_PARAMETERS" + ON "SYS"."USERS".USER_NAME = "USER_PARAMETERS".USER_NAME + AND "USER_PARAMETERS".PARAMETER = 'EMAIL ADDRESS' + WHERE USER_ID = ? + + # Detects newly created users via CREATE_TIME. + # Note: SYS.USERS is a system catalog view. CREATE_TIME records when the user was + # created; there is no general-purpose updated_at column, so this source detects + # new accounts only. Profile changes (password, deactivation) are not captured. + # SYS.USERGROUPS similarly has no modification timestamp; incremental sync is not + # configured for the group resource type. + incremental_sync: + query: | + SELECT + USER_ID, + "SYS"."USERS".USER_NAME AS USER_NAME, + CASE + WHEN USER_DEACTIVATED = 'TRUE' THEN 'inactive' + ELSE 'active' + END as STATUS, + CREATE_TIME, + LAST_SUCCESSFUL_CONNECT, + COMMENTS, + VALUE AS EMAIL_ADDRESS + FROM "SYS"."USERS" + LEFT JOIN "USER_PARAMETERS" + ON "SYS"."USERS".USER_NAME = "USER_PARAMETERS".USER_NAME + AND "USER_PARAMETERS".PARAMETER = 'EMAIL ADDRESS' + WHERE CREATE_TIME > ? + ORDER BY CREATE_TIME ASC + LIMIT ? OFFSET ? + cursor_column: CREATE_TIME + pagination: + strategy: offset + primary_key: USER_ID + group: name: "Group" description: "A group within the SAP HANA database" diff --git a/examples/sqlserver-test.yml b/examples/sqlserver-test.yml index 13f1bd0d..fb1841b9 100644 --- a/examples/sqlserver-test.yml +++ b/examples/sqlserver-test.yml @@ -11,6 +11,9 @@ connect: user: "${DB_USER}" password: "${DB_PASSWORD}" +incremental_sync: + default_lookback: 3h + actions: update_user_attributes: name: Update User Attributes @@ -186,6 +189,77 @@ resource_types: attr_department: ".attr_department" attr_employee_number: ".attr_employee_number != null ? string(.attr_employee_number) : ''" + # Fetches a single user by ID (required when incremental_sync is configured). + get: + query: | + SELECT + u.UserID as id, + u.Username as username, + u.Email as email, + u.EmployeeID as employee_id, + CASE + WHEN u.IsActive = 1 THEN 'active' + ELSE 'inactive' + END as status, + u.AccountType as account_type, + u.CreatedAt as created_at, + CASE + WHEN u.LastLogin IS NULL THEN '' + ELSE FORMAT(u.LastLogin, 'yyyy-MM-ddTHH:mm:ssZ') + END as last_login, + u.ManagerID as manager_id, + m.Username as manager_username, + m.Email as manager_email, + ed.attr_first_name, + ed.attr_middle_name, + ed.attr_last_name, + ed.JobTitle as attr_job_title, + ed.Department as attr_department, + ed.EmployeeNumber as attr_employee_number + FROM Users u + LEFT JOIN Users m ON u.ManagerID = m.UserID + LEFT JOIN EmployeeData ed ON u.UserID = ed.UserID + WHERE u.Username = ? + + # Detects new and modified users via UpdatedAt. + incremental_sync: + query: | + SELECT + u.UserID as id, + u.Username as username, + u.Email as email, + u.EmployeeID as employee_id, + CASE + WHEN u.IsActive = 1 THEN 'active' + ELSE 'inactive' + END as status, + u.AccountType as account_type, + u.CreatedAt as created_at, + CASE + WHEN u.LastLogin IS NULL THEN '' + ELSE FORMAT(u.LastLogin, 'yyyy-MM-ddTHH:mm:ssZ') + END as last_login, + u.ManagerID as manager_id, + m.Username as manager_username, + m.Email as manager_email, + ed.attr_first_name, + ed.attr_middle_name, + ed.attr_last_name, + ed.JobTitle as attr_job_title, + ed.Department as attr_department, + ed.EmployeeNumber as attr_employee_number, + u.UpdatedAt as updated_at + FROM Users u + LEFT JOIN Users m ON u.ManagerID = m.UserID + LEFT JOIN EmployeeData ed ON u.UserID = ed.UserID + WHERE u.UpdatedAt > ? + ORDER BY u.UpdatedAt + OFFSET ? ROWS FETCH NEXT ? ROWS ONLY + cursor_column: updated_at + pagination: + strategy: offset + primary_key: id + # Account provisioning configuration for creating new user accounts account_provisioning: # Schema definition for required and optional fields during user creation @@ -291,6 +365,26 @@ resource_types: profile: role_name: ".role_name" + # Fetches a single role by ID (required when incremental_sync is configured). + get: + query: | + SELECT RoleID as id, RoleName as role_name, Description as description + FROM Roles + WHERE RoleName = ? + + # Detects new and modified roles via UpdatedAt. + incremental_sync: + query: | + SELECT RoleID as id, RoleName as role_name, Description as description, UpdatedAt as updated_at + FROM Roles + WHERE UpdatedAt > ? + ORDER BY UpdatedAt + OFFSET ? ROWS FETCH NEXT ? ROWS ONLY + cursor_column: updated_at + pagination: + strategy: offset + primary_key: id + # Static entitlements that can be granted for roles static_entitlements: - id: "member" @@ -345,3 +439,38 @@ resource_types: pagination: strategy: "offset" primary_key: "role_name" + + # Detects new role assignments and soft-deleted revokes. + incremental_sync: + resource_id: ".role_name" + changes_query: | + SELECT + ur.UserRoleID AS grant_id, + u.Username AS username, + r.RoleName AS role_name, + ur.UpdatedAt AS updated_at + FROM UserRoles ur + INNER JOIN Users u ON ur.UserID = u.UserID + INNER JOIN Roles r ON ur.RoleID = r.RoleID + WHERE ur.IsDeleted = 0 + AND ur.UpdatedAt > ? + ORDER BY ur.UpdatedAt + OFFSET ? ROWS FETCH NEXT ? ROWS ONLY + changes_cursor_column: updated_at + revokes_query: | + SELECT + ur.UserRoleID AS grant_id, + u.Username AS username, + r.RoleName AS role_name, + ur.DeletedAt AS deleted_at + FROM UserRoles ur + INNER JOIN Users u ON ur.UserID = u.UserID + INNER JOIN Roles r ON ur.RoleID = r.RoleID + WHERE ur.IsDeleted = 1 + AND ur.DeletedAt > ? + ORDER BY ur.DeletedAt + OFFSET ? ROWS FETCH NEXT ? ROWS ONLY + revokes_cursor_column: deleted_at + pagination: + strategy: offset + primary_key: grant_id diff --git a/examples/vertica-test.yml b/examples/vertica-test.yml index 77ce94e0..b6904124 100644 --- a/examples/vertica-test.yml +++ b/examples/vertica-test.yml @@ -6,6 +6,9 @@ connect: user: "${DB_USER}" password: "${DB_PASSWORD}" +incremental_sync: + default_lookback: 3h + resource_types: user: name: "User" @@ -51,6 +54,49 @@ resource_types: manager_id: ".manager_id" manager_username: ".manager_username" + # Fetches a single user by ID (required when incremental_sync is configured). + get: + query: | + SELECT + u.id, + u.username, + u.email, + u.employee_id, + u.status, + u.account_type, + u.created_at, + u.last_login, + u.manager_id, + m.username as manager_username + FROM users u + LEFT JOIN users m ON u.manager_id = m.id + WHERE u.username = ? + + # Detects new and modified users via updated_at. + incremental_sync: + query: | + SELECT + u.id, + u.username, + u.email, + u.employee_id, + u.status, + u.account_type, + u.created_at, + u.last_login, + u.manager_id, + m.username as manager_username, + u.updated_at + FROM users u + LEFT JOIN users m ON u.manager_id = m.id + WHERE u.updated_at > ? + ORDER BY u.updated_at ASC + LIMIT ? OFFSET ? + cursor_column: updated_at + pagination: + strategy: offset + primary_key: id + role: name: "Role" description: "A role within the Vertica system" @@ -73,6 +119,26 @@ resource_types: profile: role_id: ".id" + # Fetches a single role by ID (required when incremental_sync is configured). + get: + query: | + SELECT id, role_name + FROM roles + WHERE role_name = ? + + # Detects new and modified roles via updated_at. + incremental_sync: + query: | + SELECT id, role_name, updated_at + FROM roles + WHERE updated_at > ? + ORDER BY updated_at ASC + LIMIT ? OFFSET ? + cursor_column: updated_at + pagination: + strategy: offset + primary_key: id + static_entitlements: - id: "member" display_name: "'Member'" @@ -100,3 +166,37 @@ resource_types: principal_id: ".username" principal_type: "user" entitlement_id: "member" + + # Detects new role assignments and soft-deleted revokes. + incremental_sync: + resource_id: ".role_name" + changes_query: | + SELECT + ur.id AS grant_id, + u.username, + r.role_name, + ur.updated_at + FROM user_roles ur + JOIN users u ON u.id = ur.user_id + JOIN roles r ON r.id = ur.role_id + WHERE ur.deleted_at IS NULL + AND ur.updated_at > ? + ORDER BY ur.updated_at ASC + LIMIT ? OFFSET ? + changes_cursor_column: updated_at + revokes_query: | + SELECT + ur.id AS grant_id, + u.username, + r.role_name, + ur.deleted_at + FROM user_roles ur + JOIN users u ON u.id = ur.user_id + JOIN roles r ON r.id = ur.role_id + WHERE ur.deleted_at > ? + ORDER BY ur.deleted_at ASC + LIMIT ? OFFSET ? + revokes_cursor_column: deleted_at + pagination: + strategy: offset + primary_key: grant_id diff --git a/examples/wordpress-test.yml b/examples/wordpress-test.yml index 0a8e1dc3..38f02255 100644 --- a/examples/wordpress-test.yml +++ b/examples/wordpress-test.yml @@ -10,6 +10,9 @@ connect: user: "${DB_USER}" password: "${DB_PASSWORD}" +incremental_sync: + default_lookback: 3h + resource_types: user: name: "User" @@ -42,6 +45,37 @@ resource_types: pagination: strategy: "offset" primary_key: "user_id" + # Fetches a single user by ID (required when incremental_sync is configured). + get: + query: | + SELECT + u.ID AS user_id, + u.user_login AS username, + u.user_email AS email, + u.user_registered AS created_at + FROM wp_users u + WHERE u.ID = ? + + # Detects newly registered users via user_registered. + # Note: wp_users does not have a general-purpose updated_at column. + # This source detects new user registrations only; profile updates + # are not captured by this cursor. + incremental_sync: + query: | + SELECT + u.ID AS user_id, + u.user_login AS username, + u.user_email AS email, + u.user_registered AS created_at + FROM wp_users u + WHERE u.user_registered > ? + ORDER BY u.user_registered ASC + LIMIT ? OFFSET ? + cursor_column: created_at + pagination: + strategy: offset + primary_key: user_id + account_provisioning: schema: - name: "username" @@ -80,6 +114,11 @@ resource_types: queries: - "INSERT INTO wp_users (user_login, user_email, user_pass) VALUES (?, ?, MD5(?))" + # Note: incremental sync is not configured for WordPress roles because: + # - Role IDs are derived from serialized PHP arrays (no direct DB lookup for get:). + # - Role assignments use a DELETE + INSERT pattern with no soft-delete column, + # so there is no revoke cursor to track. Grant changes can only be detected via + # a full re-sync of the wp_usermeta table. role: name: "Role" description: "A role within the wordpress system that can be assigned to a user" diff --git a/pkg/bcel/bcel.go b/pkg/bcel/bcel.go index 794d3090..32d39dd7 100644 --- a/pkg/bcel/bcel.go +++ b/pkg/bcel/bcel.go @@ -42,6 +42,16 @@ func NewEnv(ctx context.Context) (*Env, error) { }, nil } +// Compile verifies that expr is a valid CEL expression. Returns an error if the expression +// cannot be parsed or type-checked against the environment's declared variables. +func (t *Env) Compile(expr string) error { + _, issues := t.celEnv.Compile(preprocessExpressions(expr)) + if issues != nil && issues.Err() != nil { + return issues.Err() + } + return nil +} + func (t *Env) Evaluate(ctx context.Context, expr string, inputs map[string]any) (any, error) { expr = preprocessExpressions(expr) diff --git a/pkg/bsql/config.go b/pkg/bsql/config.go index e0ebcaa6..cbba8a9f 100644 --- a/pkg/bsql/config.go +++ b/pkg/bsql/config.go @@ -33,12 +33,30 @@ type Config struct { // Actions defines the set of actions configured in the connector. Actions map[string]ActionConfig `yaml:"actions" json:"actions"` + + // IncrementalSync holds global settings for incremental sync behavior. + IncrementalSync *IncrementalSyncConfig `yaml:"incremental_sync,omitempty" json:"incremental_sync,omitempty"` } func (c Config) HasActions() bool { return len(c.Actions) > 0 } +// HasIncrementalSync returns true if any resource type or grants query has incremental sync configured. +func (c Config) HasIncrementalSync() bool { + for _, rt := range c.ResourceTypes { + if rt.IncrementalSync != nil { + return true + } + for _, gq := range rt.Grants { + if gq.IncrementalSync != nil { + return true + } + } + } + return false +} + // DatabaseConfig contains settings required to connect to the database. // You can specify either a complete DSN, or use structured fields, or a combination. // Structured fields override corresponding parts of the DSN when both are provided. @@ -100,6 +118,12 @@ type ResourceType struct { // CredentialRotation defines the configuration for credential rotation CredentialRotation *CredentialRotation `yaml:"credential_rotation,omitempty" json:"credential_rotation,omitempty"` + + // Get defines how to fetch a single resource by ID. Required when IncrementalSync is set. + Get *GetQuery `yaml:"get,omitempty" json:"get,omitempty"` + + // IncrementalSync defines settings for detecting changed resources via the event feed. + IncrementalSync *ResourceIncrementalSync `yaml:"incremental_sync,omitempty" json:"incremental_sync,omitempty"` } // ListQuery defines the structure for configuring resource list queries. @@ -327,6 +351,9 @@ type GrantsQuery struct { // Map contains mappings to interpret each row of the query result as a grant. Map []*GrantMapping `yaml:"map" json:"map"` + + // IncrementalSync defines settings for detecting changed grants via the event feed. + IncrementalSync *GrantsIncrementalSync `yaml:"incremental_sync,omitempty" json:"incremental_sync,omitempty"` } // GrantMapping defines how query results are mapped to an entitlement grant. @@ -438,6 +465,61 @@ type CredentialRotation struct { Update *AccountCreationConfig `yaml:"update" json:"update"` } +// IncrementalSyncConfig holds global settings for incremental sync behavior. +type IncrementalSyncConfig struct { + // DefaultLookback is the maximum time window to look back when initializing a cursor for the + // first time. Accepts any Go time.Duration string (e.g., "3h", "30m", "24h"). Defaults to 3h. + DefaultLookback string `yaml:"default_lookback" json:"default_lookback"` +} + +// GetQuery defines how to fetch a single resource by its ID. +// Required for resource types that configure incremental_sync. +type GetQuery struct { + // Query is the SQL statement to fetch one resource. Must contain ?. + Query string `yaml:"query" json:"query"` + // Vars provides optional variables for the query. + Vars map[string]string `yaml:"vars,omitempty" json:"vars,omitempty"` +} + +// ResourceIncrementalSync defines incremental sync configuration for a resource type. +// When set, the resource type supports emitting ResourceChangeEvents from the event feed. +type ResourceIncrementalSync struct { + // Query is the SQL statement to detect changed resources. Must contain ?. + Query string `yaml:"query" json:"query"` + // CursorColumn is the column whose max value is used to advance the cursor for this source. + CursorColumn string `yaml:"cursor_column" json:"cursor_column"` + // ResourceId is a CEL expression that extracts the resource ID from each row returned by Query. + // When set, this expression is evaluated against the incremental query's columns. + // When not set, falls back to the list mapping's id expression (list.map.id), which requires + // the incremental query to return all columns that expression references. + ResourceId string `yaml:"resource_id,omitempty" json:"resource_id,omitempty"` + // Pagination defines the pagination strategy. Defaults to offset if not set. + Pagination *Pagination `yaml:"pagination,omitempty" json:"pagination,omitempty"` + // Vars provides optional variables for the query. + Vars map[string]string `yaml:"vars,omitempty" json:"vars,omitempty"` +} + +// GrantsIncrementalSync defines incremental sync configuration for a grants query. +// When set, the grants query supports emitting CreateGrantEvent and optionally CreateRevokeEvent. +type GrantsIncrementalSync struct { + // ResourceId is a CEL expression that extracts the resource ID from each row. + // Required because incremental queries span all resources, unlike full sync which is per-resource. + ResourceId string `yaml:"resource_id" json:"resource_id"` + // ChangesQuery is the SQL statement to detect new or modified grants. Must contain ?. + ChangesQuery string `yaml:"changes_query" json:"changes_query"` + // ChangesCursorColumn is the column whose max value advances the changes cursor for this source. + ChangesCursorColumn string `yaml:"changes_cursor_column" json:"changes_cursor_column"` + // RevokesQuery is the SQL statement to detect revoked grants (soft-delete pattern). Optional. + // Must contain ? if set. + RevokesQuery string `yaml:"revokes_query,omitempty" json:"revokes_query,omitempty"` + // RevokesCursorColumn is the column whose max value advances the revokes cursor. + RevokesCursorColumn string `yaml:"revokes_cursor_column,omitempty" json:"revokes_cursor_column,omitempty"` + // Pagination defines the pagination strategy. Defaults to offset if not set. + Pagination *Pagination `yaml:"pagination,omitempty" json:"pagination,omitempty"` + // Vars provides optional variables for the queries. + Vars map[string]string `yaml:"vars,omitempty" json:"vars,omitempty"` +} + type ActionConfig struct { Name string `yaml:"name" json:"name" validate:"required"` Description string `yaml:"description,omitempty" json:"description,omitempty" validate:"omitempty"` diff --git a/pkg/bsql/event_feed.go b/pkg/bsql/event_feed.go new file mode 100644 index 00000000..03d09b83 --- /dev/null +++ b/pkg/bsql/event_feed.go @@ -0,0 +1,450 @@ +package bsql + +import ( + "context" + "fmt" + "strconv" + "time" + + "google.golang.org/protobuf/types/known/timestamppb" + + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + "github.com/conductorone/baton-sdk/pkg/annotations" + "github.com/conductorone/baton-sdk/pkg/connectorbuilder" + "github.com/conductorone/baton-sdk/pkg/pagination" + sdkGrant "github.com/conductorone/baton-sdk/pkg/types/grant" +) + +// SQLEventFeed implements connectorbuilder.EventFeed for the baton-sql connector. +type SQLEventFeed struct { + config Config + syncers map[string]*SQLSyncer // keyed by resource type ID +} + +// NewSQLEventFeed creates a SQLEventFeed from the connector config and resource syncers. +func NewSQLEventFeed(config Config, rawSyncers []connectorbuilder.ResourceSyncer) *SQLEventFeed { + syncers := make(map[string]*SQLSyncer) + for _, s := range rawSyncers { + var syncer *SQLSyncer + switch v := s.(type) { + case *SQLSyncer: + syncer = v + case *userSyncer: + syncer = v.SQLSyncer + } + if syncer != nil && syncer.resourceType != nil { + syncers[syncer.resourceType.Id] = syncer + } + } + return &SQLEventFeed{config: config, syncers: syncers} +} + +// EventFeedMetadata returns metadata describing this event feed. +func (f *SQLEventFeed) EventFeedMetadata(_ context.Context) *v2.EventFeedMetadata { + return v2.EventFeedMetadata_builder{ + Id: "sql_event_feed", + SupportedEventTypes: []v2.EventType{ + v2.EventType_EVENT_TYPE_RESOURCE_CHANGE, + v2.EventType_EVENT_TYPE_CREATE_GRANT, + v2.EventType_EVENT_TYPE_CREATE_REVOKE, + }, + }.Build() +} + +// ListEvents returns one page of events from the current source in the cursor. +// earliestEvent is intentionally ignored: each source tracks its own committed cursor via +// sinceForSource, and overriding it with a cross-source lower bound would re-emit already- +// processed events from sources that have advanced past that point. +func (f *SQLEventFeed) ListEvents( + ctx context.Context, + earliestEvent *timestamppb.Timestamp, + pToken *pagination.StreamToken, +) ([]*v2.Event, *pagination.StreamState, annotations.Annotations, error) { + cursor, err := unmarshalCursor(pToken.Cursor) + if err != nil { + return nil, nil, nil, err + } + + sources := getSources(f.config) + if len(sources) == 0 { + return nil, &pagination.StreamState{HasMore: false}, nil, nil + } + + if cursor.CurrentSourceIdx >= len(sources) { + cursor.CurrentSourceIdx = 0 + } + + source := sources[cursor.CurrentSourceIdx] + + // Determine `since` for this page. If mid-pagination, use CurrentSince so the WHERE clause + // stays constant across all pages of the same scan cycle. + var since time.Time + if cursor.CurrentSince != "" { + if t, parseErr := time.Parse(time.RFC3339Nano, cursor.CurrentSince); parseErr == nil { + since = t + } + } + if since.IsZero() { + since = f.sinceForSource(cursor, source.Key) + } + + // Record CurrentSince on the first page of a new cycle (before processing, so a crash + // mid-page still restarts from the correct since on next invocation). + if cursor.CurrentSince == "" { + cursor.CurrentSince = since.UTC().Format(time.RFC3339Nano) + } + + syncer, hasSyncer := f.syncers[source.ResourceType] + if !hasSyncer { + state, stateErr := f.commitAndAdvance(cursor, sources, "", time.Time{}) + if stateErr != nil { + return nil, nil, nil, stateErr + } + return nil, state, nil, nil + } + + var events []*v2.Event + var nextPageToken string + var maxSeen time.Time + + switch source.Kind { + case incSyncSourceKindResource: + events, nextPageToken, maxSeen, err = f.processResourceChangePage(ctx, syncer, source, since, pToken.Size, cursor.CurrentPageToken) + case incSyncSourceKindGrantChanges: + events, nextPageToken, maxSeen, err = f.processGrantPage(ctx, syncer, source, since, pToken.Size, cursor.CurrentPageToken, false) + case incSyncSourceKindGrantRevokes: + events, nextPageToken, maxSeen, err = f.processGrantPage(ctx, syncer, source, since, pToken.Size, cursor.CurrentPageToken, true) + default: + err = fmt.Errorf("baton-sql: unknown incremental sync source kind: %s", source.Kind) + } + if err != nil { + return nil, nil, nil, err + } + + state, err := f.commitAndAdvance(cursor, sources, nextPageToken, maxSeen) + if err != nil { + return nil, nil, nil, err + } + return events, state, nil, nil +} + +// commitAndAdvance updates cursor state after processing a page and builds the StreamState. +// maxSeen is accumulated into cursor.CurrentMaxSeen; on source exhaustion the committed cursor advances. +func (f *SQLEventFeed) commitAndAdvance( + cursor *eventFeedCursor, + sources []incSyncSource, + nextPageToken string, + maxSeen time.Time, +) (*pagination.StreamState, error) { + source := sources[cursor.CurrentSourceIdx] + + // Accumulate max seen across pages of the same cycle. + if !maxSeen.IsZero() { + existing := time.Time{} + if cursor.CurrentMaxSeen != "" { + if t, err := time.Parse(time.RFC3339Nano, cursor.CurrentMaxSeen); err == nil { + existing = t + } + } + if maxSeen.After(existing) { + cursor.CurrentMaxSeen = maxSeen.UTC().Format(time.RFC3339Nano) + } + } + + hasMore := true + if nextPageToken != "" { + // More pages remain for this source. + cursor.CurrentPageToken = nextPageToken + } else { + // Source exhausted: commit accumulated max seen, then advance to next source. + if cursor.CurrentMaxSeen != "" { + cursor.SourceCursors[source.Key] = cursor.CurrentMaxSeen + } + cursor.CurrentSince = "" + cursor.CurrentMaxSeen = "" + cursor.CurrentPageToken = "" + cursor.CurrentSourceIdx++ + if cursor.CurrentSourceIdx >= len(sources) { + cursor.CurrentSourceIdx = 0 + hasMore = false + } + } + + cursorStr, err := marshalCursor(cursor) + if err != nil { + return nil, err + } + return &pagination.StreamState{Cursor: cursorStr, HasMore: hasMore}, nil +} + +// sinceForSource returns the starting timestamp for the given source key. +// Each source independently tracks its own committed cursor; callers must not override this +// with cross-source values (e.g. earliestEvent) as that would skip unprocessed events. +func (f *SQLEventFeed) sinceForSource(cursor *eventFeedCursor, key string) time.Time { + if ts, ok := cursor.SourceCursors[key]; ok { + if t, err := time.Parse(time.RFC3339Nano, ts); err == nil { + return t + } + } + return time.Now().UTC().Add(-defaultLookback(f.config.IncrementalSync)) +} + +// processResourceChangePage runs one page of the resource incremental query and returns ResourceChangeEvents. +func (f *SQLEventFeed) processResourceChangePage( + ctx context.Context, + s *SQLSyncer, + source incSyncSource, + since time.Time, + pageSize int, + pageToken string, +) ([]*v2.Event, string, time.Time, error) { + rc := source.ResConfig + + vars, err := s.PrepareQueryVars(ctx, nil, rc.Vars) + if err != nil { + return nil, "", time.Time{}, fmt.Errorf("baton-sql: failed to prepare vars for resource incremental sync: %w", err) + } + vars[sinceKey] = since + + pToken := &pagination.Token{Token: pageToken} + if pageSize > 0 { + pToken.Size = pageSize + } + + var events []*v2.Event + var maxSeen time.Time + + resourceIDExpr := s.config.List.Map.Id + if rc.ResourceId != "" { + resourceIDExpr = rc.ResourceId + } + + npt, err := s.runQuery(ctx, pToken, rc.Query, rc.Pagination, vars, func(ctx context.Context, rowMap map[string]any) (bool, error) { + inputs := s.env.SyncInputs(rowMap) + resourceID, evalErr := s.env.EvaluateString(ctx, resourceIDExpr, inputs) + if evalErr != nil { + return false, fmt.Errorf("baton-sql: failed to evaluate resource ID in resource incremental sync: %w", evalErr) + } + + rowTimestamp := since + if ts, ok := rowMap[rc.CursorColumn]; ok { + t, parseErr := toTime(ts) + if parseErr != nil { + return false, fmt.Errorf("baton-sql: failed to parse cursor column %q value %v in resource incremental sync: %w", rc.CursorColumn, ts, parseErr) + } + rowTimestamp = t + if t.After(maxSeen) { + maxSeen = t + } + } + + rowKey := grantRowKey(rowMap, rc.Pagination) + event := v2.Event_builder{ + Id: fmt.Sprintf("resource:%s:%s:%s:%s", source.ResourceType, resourceID, rowTimestamp.UTC().Format(time.RFC3339Nano), rowKey), + OccurredAt: timestamppb.New(rowTimestamp), + ResourceChangeEvent: v2.ResourceChangeEvent_builder{ + ResourceId: &v2.ResourceId{ + ResourceType: source.ResourceType, + Resource: resourceID, + }, + }.Build(), + }.Build() + events = append(events, event) + return true, nil + }) + if err != nil { + return nil, "", time.Time{}, fmt.Errorf("baton-sql: failed to list resource changes: %w", err) + } + + return events, npt, maxSeen, nil +} + +// processGrantPage runs one page of a grant changes or revokes query and returns grant/revoke events. +func (f *SQLEventFeed) processGrantPage( + ctx context.Context, + s *SQLSyncer, + source incSyncSource, + since time.Time, + pageSize int, + pageToken string, + isRevoke bool, +) ([]*v2.Event, string, time.Time, error) { + gc := source.GrantConfig + + query := gc.ChangesQuery + cursorCol := gc.ChangesCursorColumn + if isRevoke { + query = gc.RevokesQuery + cursorCol = gc.RevokesCursorColumn + } + + vars, err := s.PrepareQueryVars(ctx, nil, gc.Vars) + if err != nil { + return nil, "", time.Time{}, fmt.Errorf("baton-sql: failed to prepare vars for grant incremental sync: %w", err) + } + vars[sinceKey] = since + + pToken := &pagination.Token{Token: pageToken} + if pageSize > 0 { + pToken.Size = pageSize + } + + var events []*v2.Event + var maxSeen time.Time + + npt, err := s.runQuery(ctx, pToken, query, gc.Pagination, vars, func(ctx context.Context, rowMap map[string]any) (bool, error) { + rowTimestamp := since + if ts, ok := rowMap[cursorCol]; ok { + t, parseErr := toTime(ts) + if parseErr != nil { + return false, fmt.Errorf("baton-sql: failed to parse cursor column %q value %v in grant incremental sync: %w", cursorCol, ts, parseErr) + } + rowTimestamp = t + if t.After(maxSeen) { + maxSeen = t + } + } + + inputs := s.env.SyncInputs(rowMap) + resourceID, evalErr := s.env.EvaluateString(ctx, gc.ResourceId, inputs) + if evalErr != nil { + return false, fmt.Errorf("baton-sql: failed to evaluate resource_id in grant incremental sync: %w", evalErr) + } + + minimalResource := &v2.Resource{ + Id: &v2.ResourceId{ResourceType: source.ResourceType, Resource: resourceID}, + } + + for _, mapping := range source.GrantMap { + grant, ok, mapErr := f.mapGrantFromRow(ctx, s, minimalResource, mapping, rowMap) + if mapErr != nil { + return false, mapErr + } + if !ok { + continue + } + + principalID := grant.GetPrincipal().GetId().GetResource() + tsStr := rowTimestamp.UTC().Format(time.RFC3339Nano) + rowKey := grantRowKey(rowMap, gc.Pagination) + + var event *v2.Event + if isRevoke { + event = v2.Event_builder{ + Id: fmt.Sprintf("revoke:%s:%s:%s:%s:%s", source.ResourceType, resourceID, principalID, tsStr, rowKey), + OccurredAt: timestamppb.New(rowTimestamp), + CreateRevokeEvent: v2.CreateRevokeEvent_builder{ + Entitlement: grant.GetEntitlement(), + Principal: grant.GetPrincipal(), + }.Build(), + }.Build() + } else { + event = v2.Event_builder{ + Id: fmt.Sprintf("grant:%s:%s:%s:%s:%s", source.ResourceType, resourceID, principalID, tsStr, rowKey), + OccurredAt: timestamppb.New(rowTimestamp), + CreateGrantEvent: v2.CreateGrantEvent_builder{ + Entitlement: grant.GetEntitlement(), + Principal: grant.GetPrincipal(), + }.Build(), + }.Build() + } + events = append(events, event) + } + return true, nil + }) + if err != nil { + return nil, "", time.Time{}, fmt.Errorf("baton-sql: failed to list grant changes: %w", err) + } + + return events, npt, maxSeen, nil +} + +// mapGrantFromRow maps a row to a grant for the incremental path. +// Uses a minimal resource with only ID set — trait fields are unavailable in incremental sync. +// CEL expressions in the grant mapping must not reference resource trait fields (display_name, etc.). +func (f *SQLEventFeed) mapGrantFromRow( + ctx context.Context, + s *SQLSyncer, + minimalResource *v2.Resource, + mapping *GrantMapping, + rowMap map[string]any, +) (*v2.Grant, bool, error) { + inputs := s.env.SyncInputsWithResource(rowMap, minimalResource) + + if mapping.SkipIf != "" { + skip, err := s.env.EvaluateBool(ctx, mapping.SkipIf, inputs) + if err != nil { + return nil, false, err + } + if skip { + return nil, false, nil + } + } + + principalID, err := s.env.EvaluateString(ctx, mapping.PrincipalId, inputs) + if err != nil { + return nil, false, err + } + if principalID == "" { + return nil, false, nil + } + + entitlementID, err := s.env.EvaluateString(ctx, mapping.Entitlement, inputs) + if err != nil { + return nil, false, err + } + if entitlementID == "" { + return nil, false, nil + } + + principal := &v2.ResourceId{ + ResourceType: mapping.PrincipalType, + Resource: principalID, + } + + return sdkGrant.NewGrant(minimalResource, entitlementID, principal), true, nil +} + +// grantRowKey returns the string representation of the pagination primary key value for a row. +// It is used to make grant/revoke event IDs unique when multiple rows share the same +// (resource, principal, timestamp) triple — e.g. bulk imports with second-precision timestamps. +func grantRowKey(rowMap map[string]any, p *Pagination) string { + if p == nil || p.PrimaryKey == "" { + return "" + } + v, ok := rowMap[p.PrimaryKey] + if !ok { + return "" + } + + switch n := v.(type) { + case int64: + return strconv.FormatInt(n, 10) + case float64: + return strconv.FormatInt(int64(n), 10) + default: + return fmt.Sprintf("%v", v) + } +} + +// toTime converts a database column value to time.Time. +func toTime(v any) (time.Time, error) { + switch t := v.(type) { + case time.Time: + return t, nil + case []byte: + pt, err := parseTime(string(t)) + if err != nil { + return time.Time{}, err + } + return *pt, nil + case string: + pt, err := parseTime(t) + if err != nil { + return time.Time{}, err + } + return *pt, nil + default: + return time.Time{}, fmt.Errorf("unsupported time type %T", v) + } +} diff --git a/pkg/bsql/event_feed_test.go b/pkg/bsql/event_feed_test.go new file mode 100644 index 00000000..f052f26c --- /dev/null +++ b/pkg/bsql/event_feed_test.go @@ -0,0 +1,878 @@ +package bsql + +import ( + "database/sql" + "fmt" + "strings" + "testing" + "time" + + _ "github.com/glebarez/go-sqlite" + "github.com/stretchr/testify/require" + + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + "github.com/conductorone/baton-sdk/pkg/pagination" + "github.com/conductorone/baton-sql/pkg/bcel" + "github.com/conductorone/baton-sql/pkg/database" +) + +// newEventTestDB creates an in-memory SQLite database for event feed tests. +func newEventTestDB(t *testing.T) *sql.DB { + t.Helper() + db, err := sql.Open("sqlite", ":memory:") + require.NoError(t, err) + t.Cleanup(func() { _ = db.Close() }) + return db +} + +// newEventTestSyncer creates a minimal SQLSyncer backed by a SQLite DB. +// resourceIDExpr is the CEL expression used as the default resource ID (List.Map.Id fallback). +func newEventTestSyncer(t *testing.T, db *sql.DB, resourceIDExpr string) *SQLSyncer { + t.Helper() + celEnv, err := bcel.NewEnv(t.Context()) + require.NoError(t, err) + return &SQLSyncer{ + db: db, + dbEngine: database.SQLite, + config: ResourceType{ + List: &ListQuery{ + Map: &ResourceMapping{Id: resourceIDExpr}, + }, + }, + env: celEnv, + } +} + +// TestGrantRowKey covers all branches of the rowKey extraction helper. +func TestGrantRowKey(t *testing.T) { + tests := []struct { + name string + row map[string]any + pag *Pagination + want string + }{ + {"nil pagination", map[string]any{"id": "1"}, nil, ""}, + {"empty PrimaryKey", map[string]any{"id": "1"}, &Pagination{}, ""}, + {"key missing from row", map[string]any{"name": "x"}, &Pagination{PrimaryKey: "id"}, ""}, + {"int64 value", map[string]any{"id": int64(42)}, &Pagination{PrimaryKey: "id"}, "42"}, + {"float64 value", map[string]any{"id": float64(99)}, &Pagination{PrimaryKey: "id"}, "99"}, + {"string value", map[string]any{"id": "abc-123"}, &Pagination{PrimaryKey: "id"}, "abc-123"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.want, grantRowKey(tt.row, tt.pag)) + }) + } +} + +// TestGetSources verifies deterministic source enumeration. +func TestGetSources(t *testing.T) { + resSync := &ResourceIncrementalSync{Query: "SELECT 1", CursorColumn: "updated_at"} + grantSync := &GrantsIncrementalSync{ + ResourceId: "cols.id", + ChangesQuery: "SELECT 1", + ChangesCursorColumn: "updated_at", + } + grantSyncWithRevokes := &GrantsIncrementalSync{ + ResourceId: "cols.id", + ChangesQuery: "SELECT 1", + ChangesCursorColumn: "updated_at", + RevokesQuery: "SELECT 1", + RevokesCursorColumn: "deleted_at", + } + + t.Run("empty config produces no sources", func(t *testing.T) { + sources := getSources(Config{ResourceTypes: map[string]ResourceType{ + "user": {Name: "User"}, + }}) + require.Empty(t, sources) + }) + + t.Run("resource-only incremental sync", func(t *testing.T) { + sources := getSources(Config{ResourceTypes: map[string]ResourceType{ + "user": {IncrementalSync: resSync}, + }}) + require.Len(t, sources, 1) + require.Equal(t, "user:resource", sources[0].Key) + require.Equal(t, incSyncSourceKindResource, sources[0].Kind) + require.Equal(t, "user", sources[0].ResourceType) + }) + + t.Run("grant changes without revokes produces one source", func(t *testing.T) { + sources := getSources(Config{ResourceTypes: map[string]ResourceType{ + "role": {Grants: []*GrantsQuery{{IncrementalSync: grantSync}}}, + }}) + require.Len(t, sources, 1) + require.Equal(t, "role:grants:0:changes", sources[0].Key) + require.Equal(t, incSyncSourceKindGrantChanges, sources[0].Kind) + }) + + t.Run("grant changes with revokes produces two sources", func(t *testing.T) { + sources := getSources(Config{ResourceTypes: map[string]ResourceType{ + "role": {Grants: []*GrantsQuery{{IncrementalSync: grantSyncWithRevokes}}}, + }}) + require.Len(t, sources, 2) + require.Equal(t, incSyncSourceKindGrantChanges, sources[0].Kind) + require.Equal(t, "role:grants:0:revokes", sources[1].Key) + require.Equal(t, incSyncSourceKindGrantRevokes, sources[1].Kind) + }) + + t.Run("multiple resource types sorted lexicographically", func(t *testing.T) { + sources := getSources(Config{ResourceTypes: map[string]ResourceType{ + "role": {IncrementalSync: resSync}, + "group": {IncrementalSync: resSync}, + "user": {IncrementalSync: resSync}, + }}) + require.Len(t, sources, 3) + require.Equal(t, "group", sources[0].ResourceType) + require.Equal(t, "role", sources[1].ResourceType) + require.Equal(t, "user", sources[2].ResourceType) + }) + + t.Run("resource and grants both present", func(t *testing.T) { + sources := getSources(Config{ResourceTypes: map[string]ResourceType{ + "group": { + IncrementalSync: resSync, + Grants: []*GrantsQuery{{IncrementalSync: grantSyncWithRevokes}}, + }, + }}) + require.Len(t, sources, 3) + require.Equal(t, incSyncSourceKindResource, sources[0].Kind) + require.Equal(t, incSyncSourceKindGrantChanges, sources[1].Kind) + require.Equal(t, incSyncSourceKindGrantRevokes, sources[2].Kind) + }) +} + +// TestCommitAndAdvance exercises cursor state transitions. +func TestCommitAndAdvance(t *testing.T) { + src := func(key string) incSyncSource { return incSyncSource{Key: key} } + + t.Run("nextPageToken keeps current source with hasMore=true", func(t *testing.T) { + f := &SQLEventFeed{} + cursor := &eventFeedCursor{SourceCursors: make(map[string]string)} + sources := []incSyncSource{src("a"), src("b")} + + state, err := f.commitAndAdvance(cursor, sources, "page2", time.Time{}) + require.NoError(t, err) + require.True(t, state.HasMore) + require.Equal(t, 0, cursor.CurrentSourceIdx) + require.Equal(t, "page2", cursor.CurrentPageToken) + }) + + t.Run("exhausted source advances index and commits maxSeen", func(t *testing.T) { + f := &SQLEventFeed{} + cursor := &eventFeedCursor{SourceCursors: make(map[string]string)} + sources := []incSyncSource{src("src-a"), src("src-b")} + ts := time.Date(2025, 3, 1, 12, 0, 0, 0, time.UTC) + + state, err := f.commitAndAdvance(cursor, sources, "", ts) + require.NoError(t, err) + require.True(t, state.HasMore) + require.Equal(t, 1, cursor.CurrentSourceIdx) + require.Equal(t, ts.UTC().Format(time.RFC3339Nano), cursor.SourceCursors["src-a"]) + require.Empty(t, cursor.CurrentSince) + require.Empty(t, cursor.CurrentMaxSeen) + require.Empty(t, cursor.CurrentPageToken) + }) + + t.Run("last source exhausted sets hasMore=false and wraps index", func(t *testing.T) { + f := &SQLEventFeed{} + cursor := &eventFeedCursor{SourceCursors: make(map[string]string)} + sources := []incSyncSource{src("only")} + + state, err := f.commitAndAdvance(cursor, sources, "", time.Time{}) + require.NoError(t, err) + require.False(t, state.HasMore) + require.Equal(t, 0, cursor.CurrentSourceIdx) + }) + + t.Run("zero maxSeen does not update committed cursor", func(t *testing.T) { + f := &SQLEventFeed{} + cursor := &eventFeedCursor{SourceCursors: make(map[string]string)} + sources := []incSyncSource{src("src-a")} + + _, err := f.commitAndAdvance(cursor, sources, "", time.Time{}) + require.NoError(t, err) + require.Empty(t, cursor.SourceCursors["src-a"]) + }) + + t.Run("maxSeen accumulates across pages and only advances when newer", func(t *testing.T) { + f := &SQLEventFeed{} + cursor := &eventFeedCursor{SourceCursors: make(map[string]string)} + sources := []incSyncSource{src("src-a"), src("src-b")} + ts1 := time.Date(2025, 1, 1, 10, 0, 0, 0, time.UTC) + ts2 := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + + _, err := f.commitAndAdvance(cursor, sources, "p2", ts1) + require.NoError(t, err) + require.Equal(t, ts1.UTC().Format(time.RFC3339Nano), cursor.CurrentMaxSeen) + + _, err = f.commitAndAdvance(cursor, sources, "p3", ts2) + require.NoError(t, err) + require.Equal(t, ts2.UTC().Format(time.RFC3339Nano), cursor.CurrentMaxSeen) + + // Older timestamp does not regress CurrentMaxSeen; source exhausted here + _, err = f.commitAndAdvance(cursor, sources, "", ts1) + require.NoError(t, err) + require.Equal(t, ts2.UTC().Format(time.RFC3339Nano), cursor.SourceCursors["src-a"]) + }) +} + +// TestSinceForSource verifies committed-cursor lookup and default-lookback fallback. +func TestSinceForSource(t *testing.T) { + t.Run("returns committed cursor when present", func(t *testing.T) { + ts := time.Date(2025, 3, 1, 8, 0, 0, 0, time.UTC) + f := &SQLEventFeed{config: Config{}} + cursor := &eventFeedCursor{ + SourceCursors: map[string]string{"user:resource": ts.UTC().Format(time.RFC3339Nano)}, + } + got := f.sinceForSource(cursor, "user:resource") + require.True(t, got.Equal(ts)) + }) + + t.Run("falls back to default lookback when no committed cursor", func(t *testing.T) { + f := &SQLEventFeed{config: Config{}} + cursor := &eventFeedCursor{SourceCursors: map[string]string{}} + before := time.Now().UTC().Add(-defaultLookbackDuration) + got := f.sinceForSource(cursor, "user:resource") + require.False(t, got.Before(before.Add(-time.Second))) + require.False(t, got.After(time.Now().UTC())) + }) + + t.Run("uses configured DefaultLookback duration", func(t *testing.T) { + f := &SQLEventFeed{config: Config{ + IncrementalSync: &IncrementalSyncConfig{DefaultLookback: "30m"}, + }} + cursor := &eventFeedCursor{SourceCursors: map[string]string{}} + before := time.Now().UTC().Add(-30 * time.Minute) + got := f.sinceForSource(cursor, "user:resource") + require.False(t, got.Before(before.Add(-time.Second))) + require.False(t, got.After(time.Now().UTC())) + }) +} + +// --- SQLite integration tests --- + +// since is a fixed reference point used across integration tests. +var testSince = time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + +// ts returns a UTC timestamp t days after testSince. +func ts(days int) time.Time { + return testSince.AddDate(0, 0, days) +} + +// tsStr returns the MySQL-format string ("2006-01-02 15:04:05") for t days after testSince. +// Insert these into SQLite TEXT columns: glebarez parses this format back to time.Time on read, +// and parseTime also handles this format — so both the happy path and error detection work. +func tsStr(days int) string { + return ts(days).UTC().Format("2006-01-02 15:04:05") +} + +func TestProcessResourceChangePage(t *testing.T) { + const schema = `CREATE TABLE resources (id TEXT NOT NULL, updated_at TEXT NOT NULL)` + + t.Run("emits ResourceChangeEvent per matching row", func(t *testing.T) { + db := newEventTestDB(t) + _, err := db.ExecContext(t.Context(),schema) + require.NoError(t, err) + _, err = db.ExecContext(t.Context(),`INSERT INTO resources VALUES (?,?),(?,?)`, "user-1", tsStr(1), "user-2", tsStr(2)) + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.id") + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindResource, + ResourceType: "user", + ResConfig: &ResourceIncrementalSync{ + Query: "SELECT id, updated_at FROM resources WHERE updated_at > ?", + CursorColumn: "updated_at", + }, + } + + events, npt, maxSeen, err := f.processResourceChangePage(t.Context(), s, source, testSince, 0, "") + require.NoError(t, err) + require.Len(t, events, 2) + require.Empty(t, npt) + require.True(t, maxSeen.Equal(ts(2))) + + for _, ev := range events { + require.NotNil(t, ev.GetResourceChangeEvent(), "expected ResourceChangeEvent, got %v", ev) + require.True(t, strings.HasPrefix(ev.Id, "resource:user:"), "unexpected event ID prefix: %s", ev.Id) + } + }) + + t.Run("event ID includes rowKey when PrimaryKey is configured", func(t *testing.T) { + db := newEventTestDB(t) + _, err := db.ExecContext(t.Context(),schema) + require.NoError(t, err) + _, err = db.ExecContext(t.Context(),`INSERT INTO resources VALUES (?,?)`, "user-42", tsStr(1)) + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.id") + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindResource, + ResourceType: "user", + ResConfig: &ResourceIncrementalSync{ + Query: "SELECT id, updated_at FROM resources WHERE updated_at > ?", + CursorColumn: "updated_at", + Pagination: &Pagination{Strategy: "offset", PrimaryKey: "id"}, + }, + } + + events, _, _, err := f.processResourceChangePage(t.Context(), s, source, testSince, 0, "") + require.NoError(t, err) + require.Len(t, events, 1) + require.True(t, strings.HasSuffix(events[0].Id, ":user-42"), + "expected ID ending in :user-42, got %s", events[0].Id) + }) + + t.Run("custom ResourceId expression overrides List.Map.Id", func(t *testing.T) { + db := newEventTestDB(t) + _, err := db.ExecContext(t.Context(),`CREATE TABLE things (ext_id TEXT, row_id TEXT, updated_at TEXT)`) + require.NoError(t, err) + _, err = db.ExecContext(t.Context(),`INSERT INTO things VALUES (?,?,?)`, "ext-99", "row-1", tsStr(1)) + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.row_id") // default would pick row_id + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindResource, + ResourceType: "thing", + ResConfig: &ResourceIncrementalSync{ + Query: "SELECT ext_id, row_id, updated_at FROM things WHERE updated_at > ?", + CursorColumn: "updated_at", + ResourceId: "cols.ext_id", // override: use ext_id instead + }, + } + + events, _, _, err := f.processResourceChangePage(t.Context(), s, source, testSince, 0, "") + require.NoError(t, err) + require.Len(t, events, 1) + require.Equal(t, "ext-99", events[0].GetResourceChangeEvent().GetResourceId().GetResource()) + require.Contains(t, events[0].Id, "ext-99") + }) + + t.Run("no rows returns empty events and zero maxSeen", func(t *testing.T) { + db := newEventTestDB(t) + _, err := db.ExecContext(t.Context(),schema) + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.id") + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindResource, + ResourceType: "user", + ResConfig: &ResourceIncrementalSync{ + Query: "SELECT id, updated_at FROM resources WHERE updated_at > ?", + CursorColumn: "updated_at", + }, + } + + events, npt, maxSeen, err := f.processResourceChangePage(t.Context(), s, source, testSince, 0, "") + require.NoError(t, err) + require.Empty(t, events) + require.Empty(t, npt) + require.True(t, maxSeen.IsZero()) + }) + + t.Run("unparseable cursor column value returns error", func(t *testing.T) { + db := newEventTestDB(t) + _, err := db.ExecContext(t.Context(),schema) + require.NoError(t, err) + // "not-a-timestamp" lexicographically > "2025-..." so the WHERE clause passes it through. + _, err = db.ExecContext(t.Context(),`INSERT INTO resources VALUES (?,?)`, "user-bad", "not-a-timestamp") + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.id") + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindResource, + ResourceType: "user", + ResConfig: &ResourceIncrementalSync{ + Query: "SELECT id, updated_at FROM resources WHERE updated_at > ?", + CursorColumn: "updated_at", + }, + } + + _, _, _, err = f.processResourceChangePage(t.Context(), s, source, testSince, 0, "") + require.Error(t, err) + require.Contains(t, err.Error(), "updated_at") + }) +} + +func TestProcessGrantPage(t *testing.T) { + const schema = ` +CREATE TABLE memberships ( + id INTEGER PRIMARY KEY, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + role TEXT NOT NULL, + updated_at TEXT NOT NULL, + deleted_at TEXT +)` + + setupDB := func(t *testing.T) *sql.DB { + t.Helper() + db := newEventTestDB(t) + _, err := db.ExecContext(t.Context(),schema) + require.NoError(t, err) + return db + } + + grantMap := []*GrantMapping{ + {PrincipalId: "cols.user_id", PrincipalType: "user", Entitlement: "cols.role"}, + } + changesConfig := &GrantsIncrementalSync{ + ResourceId: "cols.group_id", + ChangesQuery: "SELECT id, group_id, user_id, role, updated_at FROM memberships WHERE updated_at > ?", + ChangesCursorColumn: "updated_at", + } + + t.Run("emits CreateGrantEvent per row", func(t *testing.T) { + db := setupDB(t) + _, err := db.ExecContext(t.Context(), + `INSERT INTO memberships(group_id, user_id, role, updated_at) VALUES (?,?,?,?),(?,?,?,?)`, + "group-1", "user-a", "member", tsStr(1), + "group-1", "user-b", "member", tsStr(2), + ) + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.id") + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindGrantChanges, + ResourceType: "group", + GrantConfig: changesConfig, + GrantMap: grantMap, + } + + events, npt, maxSeen, err := f.processGrantPage(t.Context(), s, source, testSince, 0, "", false) + require.NoError(t, err) + require.Len(t, events, 2) + require.Empty(t, npt) + require.True(t, maxSeen.Equal(ts(2))) + + for _, ev := range events { + require.NotNil(t, ev.GetCreateGrantEvent(), "expected CreateGrantEvent, got %v", ev) + require.True(t, strings.HasPrefix(ev.Id, "grant:group:group-1:"), "unexpected event ID: %s", ev.Id) + } + }) + + t.Run("emits CreateRevokeEvent when isRevoke=true", func(t *testing.T) { + db := setupDB(t) + _, err := db.ExecContext(t.Context(), + `INSERT INTO memberships(group_id, user_id, role, updated_at, deleted_at) VALUES (?,?,?,?,?)`, + "group-1", "user-a", "member", tsStr(1), tsStr(1), + ) + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.id") + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindGrantRevokes, + ResourceType: "group", + GrantConfig: &GrantsIncrementalSync{ + ResourceId: "cols.group_id", + RevokesQuery: "SELECT id, group_id, user_id, role, deleted_at, updated_at FROM memberships WHERE deleted_at > ?", + RevokesCursorColumn: "deleted_at", + }, + GrantMap: grantMap, + } + + events, _, _, err := f.processGrantPage(t.Context(), s, source, testSince, 0, "", true) + require.NoError(t, err) + require.Len(t, events, 1) + require.NotNil(t, events[0].GetCreateRevokeEvent()) + require.True(t, strings.HasPrefix(events[0].Id, "revoke:group:"), "unexpected event ID: %s", events[0].Id) + }) + + t.Run("event ID format is grant:rtType:resourceID:principalID:timestamp:rowKey", func(t *testing.T) { + db := setupDB(t) + _, err := db.ExecContext(t.Context(), + `INSERT INTO memberships(id, group_id, user_id, role, updated_at) VALUES (?,?,?,?,?)`, + int64(7), "grp-1", "usr-1", "admin", tsStr(166), // well after testSince + ) + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.id") + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindGrantChanges, + ResourceType: "group", + GrantConfig: &GrantsIncrementalSync{ + ResourceId: "cols.group_id", + ChangesQuery: "SELECT id, group_id, user_id, role, updated_at FROM memberships WHERE updated_at > ?", + ChangesCursorColumn: "updated_at", + Pagination: &Pagination{Strategy: "offset", PrimaryKey: "id"}, + }, + GrantMap: grantMap, + } + + events, _, _, err := f.processGrantPage(t.Context(), s, source, testSince, 0, "", false) + require.NoError(t, err) + require.Len(t, events, 1) + + parts := strings.Split(events[0].Id, ":") + // Format: grant:group:grp-1:usr-1:: + require.Equal(t, "grant", parts[0]) + require.Equal(t, "group", parts[1]) + require.Equal(t, "grp-1", parts[2]) + require.Equal(t, "usr-1", parts[3]) + require.NotEmpty(t, parts[4]) // timestamp + require.Equal(t, "7", parts[len(parts)-1]) // rowKey = primary key value + }) + + t.Run("SkipIf expression skips matching rows", func(t *testing.T) { + db := setupDB(t) + _, err := db.ExecContext(t.Context(), + `INSERT INTO memberships(group_id, user_id, role, updated_at) VALUES (?,?,?,?),(?,?,?,?)`, + "group-1", "skip-me", "member", tsStr(1), + "group-1", "keep-me", "member", tsStr(2), + ) + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.id") + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindGrantChanges, + ResourceType: "group", + GrantConfig: changesConfig, + GrantMap: []*GrantMapping{ + { + PrincipalId: "cols.user_id", + PrincipalType: "user", + Entitlement: "cols.role", + SkipIf: "cols.user_id == 'skip-me'", + }, + }, + } + + events, _, _, err := f.processGrantPage(t.Context(), s, source, testSince, 0, "", false) + require.NoError(t, err) + require.Len(t, events, 1) + require.Equal(t, "keep-me", events[0].GetCreateGrantEvent().GetPrincipal().GetId().GetResource()) + }) + + t.Run("unparseable cursor column returns error", func(t *testing.T) { + db := setupDB(t) + // "not-a-timestamp" passes the WHERE clause due to string ordering. + _, err := db.ExecContext(t.Context(), + `INSERT INTO memberships(group_id, user_id, role, updated_at) VALUES (?,?,?,?)`, + "group-1", "user-a", "member", "not-a-timestamp", + ) + require.NoError(t, err) + + s := newEventTestSyncer(t, db, "cols.id") + f := &SQLEventFeed{} + source := incSyncSource{ + Kind: incSyncSourceKindGrantChanges, + ResourceType: "group", + GrantConfig: changesConfig, + GrantMap: grantMap, + } + + _, _, _, err = f.processGrantPage(t.Context(), s, source, testSince, 0, "", false) + require.Error(t, err) + require.Contains(t, err.Error(), "updated_at") + }) +} + +// newListEventsSyncer builds a SQLSyncer for use in ListEvents integration tests. +func newListEventsSyncer(t *testing.T, db *sql.DB, rt ResourceType) *SQLSyncer { + t.Helper() + celEnv, err := bcel.NewEnv(t.Context()) + require.NoError(t, err) + return &SQLSyncer{ + db: db, + dbEngine: database.SQLite, + config: rt, + env: celEnv, + } +} + +// TestListEvents covers the main ListEvents entry point end-to-end. +func TestListEvents(t *testing.T) { + t.Run("no incremental sync sources returns hasMore=false", func(t *testing.T) { + f := &SQLEventFeed{config: Config{ResourceTypes: map[string]ResourceType{ + "user": {Name: "User"}, + }}} + events, state, _, err := f.ListEvents(t.Context(), nil, &pagination.StreamToken{}) + require.NoError(t, err) + require.Empty(t, events) + require.False(t, state.HasMore) + }) + + t.Run("missing syncer skips source and returns hasMore=false", func(t *testing.T) { + config := Config{ResourceTypes: map[string]ResourceType{ + "user": {IncrementalSync: &ResourceIncrementalSync{ + Query: "SELECT id, updated_at FROM t WHERE updated_at > ?", + CursorColumn: "updated_at", + }}, + }} + f := &SQLEventFeed{config: config, syncers: map[string]*SQLSyncer{}} + + events, state, _, err := f.ListEvents(t.Context(), nil, &pagination.StreamToken{}) + require.NoError(t, err) + require.Empty(t, events) + require.False(t, state.HasMore) + }) + + t.Run("returns resource change events from database", func(t *testing.T) { + db := newEventTestDB(t) + _, err := db.ExecContext(t.Context(),`CREATE TABLE res (id TEXT NOT NULL, updated_at TEXT NOT NULL)`) + require.NoError(t, err) + _, err = db.ExecContext(t.Context(),`INSERT INTO res VALUES (?,?),(?,?)`, "r1", tsStr(1), "r2", tsStr(2)) + require.NoError(t, err) + + rt := ResourceType{ + List: &ListQuery{Map: &ResourceMapping{Id: "cols.id"}}, + IncrementalSync: &ResourceIncrementalSync{ + Query: "SELECT id, updated_at FROM res WHERE updated_at > ?", + CursorColumn: "updated_at", + }, + } + syncer := newListEventsSyncer(t, db, rt) + config := Config{ResourceTypes: map[string]ResourceType{"res": rt}} + f := &SQLEventFeed{config: config, syncers: map[string]*SQLSyncer{"res": syncer}} + + // Pre-set the committed cursor so `since` is before our test data. + initCursor, err := marshalCursor(&eventFeedCursor{ + SourceCursors: map[string]string{"res:resource": testSince.UTC().Format(time.RFC3339Nano)}, + }) + require.NoError(t, err) + + events, state, _, err := f.ListEvents(t.Context(), nil, &pagination.StreamToken{Cursor: initCursor}) + require.NoError(t, err) + require.Len(t, events, 2) + require.False(t, state.HasMore) + for _, ev := range events { + require.NotNil(t, ev.GetResourceChangeEvent()) + } + }) + + t.Run("CurrentSince is pinned across pages of the same scan cycle", func(t *testing.T) { + db := newEventTestDB(t) + _, err := db.ExecContext(t.Context(),`CREATE TABLE items (id TEXT NOT NULL, updated_at TEXT NOT NULL)`) + require.NoError(t, err) + for i := 1; i <= 3; i++ { + _, err = db.ExecContext(t.Context(),`INSERT INTO items VALUES (?,?)`, fmt.Sprintf("i%d", i), tsStr(i)) + require.NoError(t, err) + } + + rt := ResourceType{ + List: &ListQuery{Map: &ResourceMapping{Id: "cols.id"}}, + IncrementalSync: &ResourceIncrementalSync{ + Query: "SELECT id, updated_at FROM items WHERE updated_at > ? ORDER BY id LIMIT ? OFFSET ?", + CursorColumn: "updated_at", + Pagination: &Pagination{Strategy: "offset", PrimaryKey: "id", PageSize: 1}, + }, + } + syncer := newListEventsSyncer(t, db, rt) + config := Config{ResourceTypes: map[string]ResourceType{"item": rt}} + f := &SQLEventFeed{config: config, syncers: map[string]*SQLSyncer{"item": syncer}} + + initCursor, err := marshalCursor(&eventFeedCursor{ + SourceCursors: map[string]string{"item:resource": testSince.UTC().Format(time.RFC3339Nano)}, + }) + require.NoError(t, err) + + // Page 1. + _, state1, _, err := f.ListEvents(t.Context(), nil, &pagination.StreamToken{Cursor: initCursor}) + require.NoError(t, err) + require.True(t, state1.HasMore) + + cursor1, err := unmarshalCursor(state1.Cursor) + require.NoError(t, err) + require.NotEmpty(t, cursor1.CurrentSince) + + // Page 2: CurrentSince must stay constant within the cycle. + _, state2, _, err := f.ListEvents(t.Context(), nil, &pagination.StreamToken{Cursor: state1.Cursor}) + require.NoError(t, err) + + cursor2, err := unmarshalCursor(state2.Cursor) + require.NoError(t, err) + require.Equal(t, cursor1.CurrentSince, cursor2.CurrentSince) + }) + + t.Run("out-of-bounds CurrentSourceIdx is clamped to 0", func(t *testing.T) { + config := Config{ResourceTypes: map[string]ResourceType{ + "user": {IncrementalSync: &ResourceIncrementalSync{ + Query: "SELECT id, updated_at FROM t WHERE updated_at > ?", + CursorColumn: "updated_at", + }}, + }} + f := &SQLEventFeed{config: config, syncers: map[string]*SQLSyncer{}} + + cursorStr, err := marshalCursor(&eventFeedCursor{ + SourceCursors: map[string]string{}, + CurrentSourceIdx: 999, + }) + require.NoError(t, err) + + _, state, _, err := f.ListEvents(t.Context(), nil, &pagination.StreamToken{Cursor: cursorStr}) + require.NoError(t, err) + // Clamped to 0 → single source with no syncer → skip → wrap → hasMore=false. + require.False(t, state.HasMore) + }) +} + +// TestMarshalUnmarshalCursor covers cursor serialization round-trips and error paths. +func TestMarshalUnmarshalCursor(t *testing.T) { + t.Run("roundtrip preserves all fields", func(t *testing.T) { + c := &eventFeedCursor{ + SourceCursors: map[string]string{"user:resource": "2025-01-01T00:00:00Z"}, + CurrentSourceIdx: 2, + CurrentPageToken: "tok", + CurrentSince: "2025-01-01T00:00:00Z", + CurrentMaxSeen: "2025-01-02T00:00:00Z", + } + s, err := marshalCursor(c) + require.NoError(t, err) + require.NotEmpty(t, s) + + got, err := unmarshalCursor(s) + require.NoError(t, err) + require.Equal(t, c.CurrentSourceIdx, got.CurrentSourceIdx) + require.Equal(t, c.CurrentPageToken, got.CurrentPageToken) + require.Equal(t, c.CurrentSince, got.CurrentSince) + require.Equal(t, c.CurrentMaxSeen, got.CurrentMaxSeen) + require.Equal(t, c.SourceCursors, got.SourceCursors) + }) + + t.Run("empty string returns initialized empty cursor", func(t *testing.T) { + got, err := unmarshalCursor("") + require.NoError(t, err) + require.NotNil(t, got.SourceCursors) + require.Equal(t, 0, got.CurrentSourceIdx) + }) + + t.Run("malformed JSON returns error with baton-sql prefix", func(t *testing.T) { + _, err := unmarshalCursor("{not-json") + require.Error(t, err) + require.Contains(t, err.Error(), "baton-sql") + }) + + t.Run("nil cursor marshals to empty string", func(t *testing.T) { + s, err := marshalCursor(nil) + require.NoError(t, err) + require.Empty(t, s) + }) + + t.Run("null SourceCursors is repaired to empty map on unmarshal", func(t *testing.T) { + s := `{"source_cursors":null,"current_source_idx":0,"current_page_token":""}` + got, err := unmarshalCursor(s) + require.NoError(t, err) + require.NotNil(t, got.SourceCursors) + }) +} + +// TestToTime covers all type branches of the toTime converter. +func TestToTime(t *testing.T) { + ref := time.Date(2025, 3, 1, 12, 0, 0, 0, time.UTC) + + t.Run("time.Time passthrough", func(t *testing.T) { + got, err := toTime(ref) + require.NoError(t, err) + require.True(t, got.Equal(ref)) + }) + + t.Run("string in MySQL format", func(t *testing.T) { + got, err := toTime(ref.UTC().Format("2006-01-02 15:04:05")) + require.NoError(t, err) + require.True(t, got.Equal(ref)) + }) + + t.Run("[]byte in MySQL format", func(t *testing.T) { + got, err := toTime([]byte(ref.UTC().Format("2006-01-02 15:04:05"))) + require.NoError(t, err) + require.True(t, got.Equal(ref)) + }) + + t.Run("unsupported type returns error", func(t *testing.T) { + _, err := toTime(12345) + require.Error(t, err) + }) +} + +// TestDefaultLookback covers all branches of the defaultLookback helper. +func TestDefaultLookback(t *testing.T) { + t.Run("nil config returns default duration", func(t *testing.T) { + require.Equal(t, defaultLookbackDuration, defaultLookback(nil)) + }) + + t.Run("empty DefaultLookback returns default duration", func(t *testing.T) { + require.Equal(t, defaultLookbackDuration, defaultLookback(&IncrementalSyncConfig{})) + }) + + t.Run("invalid duration string returns default duration", func(t *testing.T) { + require.Equal(t, defaultLookbackDuration, defaultLookback(&IncrementalSyncConfig{DefaultLookback: "not-a-duration"})) + }) + + t.Run("valid duration string is parsed", func(t *testing.T) { + require.Equal(t, 30*time.Minute, defaultLookback(&IncrementalSyncConfig{DefaultLookback: "30m"})) + }) +} + +// TestMapGrantFromRow covers the skip branches of the grant-row mapping helper. +func TestMapGrantFromRow(t *testing.T) { + celEnv, err := bcel.NewEnv(t.Context()) + require.NoError(t, err) + + s := &SQLSyncer{env: celEnv} + f := &SQLEventFeed{} + resource := &v2.Resource{ + Id: &v2.ResourceId{ResourceType: "group", Resource: "grp-1"}, + } + + t.Run("empty principalID expression skips row", func(t *testing.T) { + mapping := &GrantMapping{ + PrincipalId: `""`, + PrincipalType: "user", + Entitlement: `"member"`, + } + grant, ok, err := f.mapGrantFromRow(t.Context(), s, resource, mapping, map[string]any{}) + require.NoError(t, err) + require.False(t, ok) + require.Nil(t, grant) + }) + + t.Run("empty entitlementID expression skips row", func(t *testing.T) { + mapping := &GrantMapping{ + PrincipalId: `"user-1"`, + PrincipalType: "user", + Entitlement: `""`, + } + grant, ok, err := f.mapGrantFromRow(t.Context(), s, resource, mapping, map[string]any{}) + require.NoError(t, err) + require.False(t, ok) + require.Nil(t, grant) + }) + + t.Run("SkipIf=true skips row", func(t *testing.T) { + mapping := &GrantMapping{ + SkipIf: "true", + PrincipalId: `"user-1"`, + PrincipalType: "user", + Entitlement: `"member"`, + } + grant, ok, err := f.mapGrantFromRow(t.Context(), s, resource, mapping, map[string]any{}) + require.NoError(t, err) + require.False(t, ok) + require.Nil(t, grant) + }) + + t.Run("valid mapping returns grant with correct principal", func(t *testing.T) { + mapping := &GrantMapping{ + PrincipalId: `"user-42"`, + PrincipalType: "user", + Entitlement: `"member"`, + } + grant, ok, err := f.mapGrantFromRow(t.Context(), s, resource, mapping, map[string]any{}) + require.NoError(t, err) + require.True(t, ok) + require.NotNil(t, grant) + require.Equal(t, "user-42", grant.GetPrincipal().GetId().GetResource()) + require.Equal(t, "user", grant.GetPrincipal().GetId().GetResourceType()) + }) +} diff --git a/pkg/bsql/event_pagination.go b/pkg/bsql/event_pagination.go new file mode 100644 index 00000000..c7c89a70 --- /dev/null +++ b/pkg/bsql/event_pagination.go @@ -0,0 +1,127 @@ +package bsql + +import ( + "encoding/json" + "fmt" + "sort" + "time" +) + +const defaultLookbackDuration = 3 * time.Hour + +// eventFeedCursor tracks pagination state across multiple ListEvents() calls. +// +// SourceCursors holds a committed "since" timestamp per source key, advanced only when the source +// is fully exhausted (all pages processed). CurrentSince and CurrentMaxSeen are transient state +// for the in-progress scan cycle so that `since` stays constant across pages of the same cycle. +type eventFeedCursor struct { + SourceCursors map[string]string `json:"source_cursors"` + CurrentSourceIdx int `json:"current_source_idx"` + CurrentPageToken string `json:"current_page_token"` + // CurrentSince is the since value in use for the current scan cycle (set on first page). + CurrentSince string `json:"current_since,omitempty"` + // CurrentMaxSeen accumulates the max cursor-column timestamp seen so far in the current cycle. + CurrentMaxSeen string `json:"current_max_seen,omitempty"` +} + +func marshalCursor(c *eventFeedCursor) (string, error) { + if c == nil { + return "", nil + } + data, err := json.Marshal(c) + if err != nil { + return "", fmt.Errorf("baton-sql: failed to marshal event feed cursor: %w", err) + } + return string(data), nil +} + +func unmarshalCursor(s string) (*eventFeedCursor, error) { + if s == "" { + return &eventFeedCursor{SourceCursors: make(map[string]string)}, nil + } + c := &eventFeedCursor{} + if err := json.Unmarshal([]byte(s), c); err != nil { + return nil, fmt.Errorf("baton-sql: failed to unmarshal event feed cursor: %w", err) + } + if c.SourceCursors == nil { + c.SourceCursors = make(map[string]string) + } + return c, nil +} + +type incSyncSourceKind string + +const ( + incSyncSourceKindResource incSyncSourceKind = "resource" + incSyncSourceKindGrantChanges incSyncSourceKind = "grant_changes" + incSyncSourceKindGrantRevokes incSyncSourceKind = "grant_revokes" +) + +type incSyncSource struct { + Key string + Kind incSyncSourceKind + ResourceType string // resource type ID + GrantIdx int // index into rt.Grants (grant event sources only) + ResConfig *ResourceIncrementalSync + GrantConfig *GrantsIncrementalSync + GrantMap []*GrantMapping +} + +// getSources enumerates all configured incremental sources in deterministic order. +// Order: resource type IDs sorted lexicographically, then per-type: resource, grant changes, grant revokes. +func getSources(config Config) []incSyncSource { + rtIDs := make([]string, 0, len(config.ResourceTypes)) + for id := range config.ResourceTypes { + rtIDs = append(rtIDs, id) + } + sort.Strings(rtIDs) + + var sources []incSyncSource + for _, rtID := range rtIDs { + rt := config.ResourceTypes[rtID] + if rt.IncrementalSync != nil { + sources = append(sources, incSyncSource{ + Key: fmt.Sprintf("%s:resource", rtID), + Kind: incSyncSourceKindResource, + ResourceType: rtID, + ResConfig: rt.IncrementalSync, + }) + } + for i, gq := range rt.Grants { + if gq.IncrementalSync == nil { + continue + } + gs := gq.IncrementalSync + sources = append(sources, incSyncSource{ + Key: fmt.Sprintf("%s:grants:%d:changes", rtID, i), + Kind: incSyncSourceKindGrantChanges, + ResourceType: rtID, + GrantIdx: i, + GrantConfig: gs, + GrantMap: gq.Map, + }) + if gs.RevokesQuery != "" { + sources = append(sources, incSyncSource{ + Key: fmt.Sprintf("%s:grants:%d:revokes", rtID, i), + Kind: incSyncSourceKindGrantRevokes, + ResourceType: rtID, + GrantIdx: i, + GrantConfig: gs, + GrantMap: gq.Map, + }) + } + } + } + return sources +} + +// defaultLookback returns the configured lookback duration, defaulting to 3 hours. +func defaultLookback(config *IncrementalSyncConfig) time.Duration { + if config != nil && config.DefaultLookback != "" { + d, err := time.ParseDuration(config.DefaultLookback) + if err == nil { + return d + } + } + return defaultLookbackDuration +} diff --git a/pkg/bsql/query.go b/pkg/bsql/query.go index 4c05568c..3e300267 100644 --- a/pkg/bsql/query.go +++ b/pkg/bsql/query.go @@ -26,6 +26,10 @@ const ( cursorKey = "cursor" limitKey = "limit" unquotedKey = "unquoted" + // sinceKey and idKey are reserved placeholders injected by the incremental sync path. + // Users must not define vars with these names. + sinceKey = "since" + idKey = "id" ) var ErrQueryAffectedZeroRows = errors.New("query affected 0 rows, ending and rolling back") diff --git a/pkg/bsql/sql_syncer.go b/pkg/bsql/sql_syncer.go index dae8011f..2b9cee6a 100644 --- a/pkg/bsql/sql_syncer.go +++ b/pkg/bsql/sql_syncer.go @@ -6,9 +6,13 @@ import ( "fmt" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + "github.com/conductorone/baton-sdk/pkg/annotations" "github.com/conductorone/baton-sdk/pkg/connectorbuilder" + "github.com/conductorone/baton-sdk/pkg/pagination" "github.com/conductorone/baton-sql/pkg/bcel" "github.com/conductorone/baton-sql/pkg/database" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -18,6 +22,8 @@ const ( roleTraitType = "role" ) +var _ connectorbuilder.ResourceTargetedSyncer = (*SQLSyncer)(nil) + type SQLSyncer struct { resourceType *v2.ResourceType db *sql.DB @@ -31,6 +37,44 @@ func (s *SQLSyncer) ResourceType(ctx context.Context) *v2.ResourceType { return s.resourceType } +// ResourceTypeID returns the resource type ID for this syncer. +func (s *SQLSyncer) ResourceTypeID() string { + if s.resourceType == nil { + return "" + } + return s.resourceType.Id +} + +// Get implements ResourceTargetedSyncer, fetching a single resource by its ID. +func (s *SQLSyncer) Get(ctx context.Context, resourceId *v2.ResourceId, parentResourceId *v2.ResourceId) (*v2.Resource, annotations.Annotations, error) { + if s.config.Get == nil { + return nil, nil, fmt.Errorf("baton-sql: get not configured for resource type %s", resourceId.GetResourceType()) + } + + vars, err := s.PrepareQueryVars(ctx, nil, s.config.Get.Vars) + if err != nil { + return nil, nil, fmt.Errorf("baton-sql: failed to prepare vars for get query: %w", err) + } + vars[idKey] = resourceId.GetResource() + + var result *v2.Resource + _, err = s.runQuery(ctx, &pagination.Token{}, s.config.Get.Query, nil, vars, func(ctx context.Context, row map[string]interface{}) (bool, error) { + r, err := s.mapResource(ctx, row) + if err != nil { + return false, err + } + result = r + return false, nil // stop after first row + }) + if err != nil { + return nil, nil, fmt.Errorf("baton-sql: failed to execute get query for %s/%s: %w", resourceId.GetResourceType(), resourceId.GetResource(), err) + } + if result == nil { + return nil, nil, status.Errorf(codes.NotFound, "baton-sql: resource %s/%s not found", resourceId.GetResourceType(), resourceId.GetResource()) + } + return result, nil, nil +} + func (c Config) GetSQLSyncers(ctx context.Context, db *sql.DB, dbEngine database.DbEngine, celEnv *bcel.Env) ([]connectorbuilder.ResourceSyncer, error) { var ret []connectorbuilder.ResourceSyncer for rtID, rtConfig := range c.ResourceTypes { @@ -101,6 +145,12 @@ func (s *SQLSyncer) Validate(ctx context.Context) error { } } + if s.fullConfig.IncrementalSync != nil { + if err := s.validateInternal(ctx, s.fullConfig.IncrementalSync); err != nil { + return fmt.Errorf("validation error for incremental_sync: %w", err) + } + } + if err := s.validateInternal(ctx, s.config.List); err != nil { return s.validateFormatErr("list", err) } @@ -127,6 +177,21 @@ func (s *SQLSyncer) Validate(ctx context.Context) error { } } + if s.config.Get != nil { + if err := s.validateInternal(ctx, s.config.Get); err != nil { + return s.validateFormatErr("get", err) + } + } + + if s.config.IncrementalSync != nil { + if s.config.Get == nil { + return s.validateFormatErr("incremental_sync", fmt.Errorf("get query is required when incremental_sync is configured")) + } + if err := s.validateInternal(ctx, s.config.IncrementalSync); err != nil { + return s.validateFormatErr("incremental_sync", err) + } + } + if s.config.AccountProvisioning != nil { if err := s.validateInternal(ctx, s.config.AccountProvisioning); err != nil { return s.validateFormatErr("account_provisioning", err) diff --git a/pkg/bsql/validate.go b/pkg/bsql/validate.go index 42572181..17c5dcda 100644 --- a/pkg/bsql/validate.go +++ b/pkg/bsql/validate.go @@ -4,8 +4,19 @@ import ( "context" "errors" "fmt" + "time" ) +// queryUsesVar reports whether key appears in the vars slice returned by queryVars. +func queryUsesVar(vars []string, key string) bool { + for _, v := range vars { + if v == key { + return true + } + } + return false +} + func validateVarsInQuery(s *SQLSyncer, query string, vars map[string]string) error { if query == "" { return fmt.Errorf("query is required") @@ -22,7 +33,7 @@ func validateVarsInQuery(s *SQLSyncer, query string, vars map[string]string) err for _, v := range usedVars { if _, ok := vars[v]; !ok { - if v == limitKey || v == offsetKey || v == cursorKey { + if v == limitKey || v == offsetKey || v == cursorKey || v == sinceKey || v == idKey { continue } return fmt.Errorf("query uses variable '%s' which is not defined in vars", v) @@ -91,7 +102,15 @@ func (l *EntitlementMapping) staticValidate(ctx context.Context, s *SQLSyncer) e } func (l *GrantsQuery) staticValidate(ctx context.Context, s *SQLSyncer) error { - return validateVarsInQuery(s, l.Query, l.Vars) + if err := validateVarsInQuery(s, l.Query, l.Vars); err != nil { + return err + } + if l.IncrementalSync != nil { + if err := l.IncrementalSync.staticValidate(ctx, s); err != nil { + return err + } + } + return nil } func (l *AccountProvisioning) staticValidate(ctx context.Context, s *SQLSyncer) error { @@ -155,6 +174,101 @@ func (l *CredentialRotation) staticValidate(ctx context.Context, s *SQLSyncer) e return nil } +func (l *GetQuery) staticValidate(ctx context.Context, s *SQLSyncer) error { + if err := validateVarsInQuery(s, l.Query, l.Vars); err != nil { + return err + } + usedVars, err := s.queryVars(l.Query) + if err != nil { + return err + } + if !queryUsesVar(usedVars, idKey) { + return fmt.Errorf("get query must contain ?<%s> placeholder", idKey) + } + for k := range l.Vars { + if k == sinceKey || k == idKey { + return fmt.Errorf("vars must not use reserved key %q", k) + } + } + return nil +} + +func (l *ResourceIncrementalSync) staticValidate(ctx context.Context, s *SQLSyncer) error { + if l.CursorColumn == "" { + return errors.New("incremental_sync.cursor_column is required") + } + if err := validateVarsInQuery(s, l.Query, l.Vars); err != nil { + return err + } + usedVars, err := s.queryVars(l.Query) + if err != nil { + return err + } + if !queryUsesVar(usedVars, sinceKey) { + return fmt.Errorf("incremental_sync.query must contain ?<%s> placeholder", sinceKey) + } + for k := range l.Vars { + if k == sinceKey || k == idKey { + return fmt.Errorf("vars must not use reserved key %q", k) + } + } + if l.ResourceId != "" { + if err := s.env.Compile(l.ResourceId); err != nil { + return fmt.Errorf("incremental_sync.resource_id: invalid CEL expression: %w", err) + } + } + return nil +} + +func (l *GrantsIncrementalSync) staticValidate(ctx context.Context, s *SQLSyncer) error { + if l.ResourceId == "" { + return errors.New("incremental_sync.resource_id is required") + } + if l.ChangesCursorColumn == "" { + return errors.New("incremental_sync.changes_cursor_column is required") + } + if err := validateVarsInQuery(s, l.ChangesQuery, l.Vars); err != nil { + return fmt.Errorf("incremental_sync.changes_query: %w", err) + } + usedVars, err := s.queryVars(l.ChangesQuery) + if err != nil { + return err + } + if !queryUsesVar(usedVars, sinceKey) { + return fmt.Errorf("incremental_sync.changes_query must contain ?<%s> placeholder", sinceKey) + } + if l.RevokesQuery != "" { + if l.RevokesCursorColumn == "" { + return errors.New("incremental_sync.revokes_cursor_column is required when revokes_query is set") + } + if err := validateVarsInQuery(s, l.RevokesQuery, l.Vars); err != nil { + return fmt.Errorf("incremental_sync.revokes_query: %w", err) + } + usedVars, err = s.queryVars(l.RevokesQuery) + if err != nil { + return err + } + if !queryUsesVar(usedVars, sinceKey) { + return fmt.Errorf("incremental_sync.revokes_query must contain ?<%s> placeholder", sinceKey) + } + } + for k := range l.Vars { + if k == sinceKey || k == idKey { + return fmt.Errorf("vars must not use reserved key %q", k) + } + } + return nil +} + +func (l *IncrementalSyncConfig) staticValidate(_ context.Context, _ *SQLSyncer) error { + if l.DefaultLookback != "" { + if _, err := time.ParseDuration(l.DefaultLookback); err != nil { + return fmt.Errorf("invalid incremental_sync.default_lookback %q: %w", l.DefaultLookback, err) + } + } + return nil +} + func (l *ActionConfig) staticValidate(ctx context.Context, s *SQLSyncer) error { availableVars := make(map[string]string) for k, v := range l.Vars { diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index 4a9b6131..cc6f807d 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -9,6 +9,8 @@ import ( v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" "github.com/conductorone/baton-sdk/pkg/annotations" "github.com/conductorone/baton-sdk/pkg/connectorbuilder" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.uber.org/zap" "github.com/conductorone/baton-sql/pkg/bcel" "github.com/conductorone/baton-sql/pkg/bsql" @@ -43,6 +45,23 @@ func (c *Connector) ResourceSyncers(ctx context.Context) []connectorbuilder.Reso return syncers } +// EventFeeds implements EventProviderV2. Returns the SQL event feed when incremental sync is configured. +// GetSQLSyncers is called here separately from ResourceSyncers, producing distinct syncer instances. +// This is safe because syncers are stateless — they hold pointers to the shared *sql.DB and *bcel.Env. +// The SDK calls EventFeeds exactly once at initialization, so there is no ongoing allocation overhead. +func (c *Connector) EventFeeds(ctx context.Context) []connectorbuilder.EventFeed { + if !c.config.HasIncrementalSync() { + return nil + } + syncers, err := c.config.GetSQLSyncers(ctx, c.db, c.dbEngine, c.celEnv) + if err != nil { + l := ctxzap.Extract(ctx) + l.Debug("baton-sql: failed to create syncers for event feed; incremental sync will be unavailable", zap.Error(err)) + return nil + } + return []connectorbuilder.EventFeed{bsql.NewSQLEventFeed(*c.config, syncers)} +} + // Asset takes an input AssetRef and attempts to fetch it using the connector's authenticated http client // It streams a response, always starting with a metadata object, following by chunked payloads for the asset. func (c *Connector) Asset(ctx context.Context, asset *v2.AssetRef) (string, io.ReadCloser, error) {