From 65badd37d59296d79cb61b0f9d0095153887c489 Mon Sep 17 00:00:00 2001 From: Mattias Karlsson Date: Thu, 30 Apr 2026 16:53:05 +0200 Subject: [PATCH 1/2] feat: add asynchronous schema tracking export with ZIP segments and job status API Introduce end-to-end export of change-tracking data for a configured sync table: full rows for inserts/updates and primary keys for deletes, written as streamed JSON-in-ZIP to blob storage with Azure Table coordination and queue-based orchestration. HTTP (Function auth): - POST .../schema/tracking/{tableId}/export with author, referenceId, purpose; persists blobs and exportjobs row, returns 202 with Location to status. - GET .../export/status/{*correlationId} for job detail (optional result SAS when result.json exists) or list when correlation segment is empty. Workers and storage: - Queues: exportjob dispatch; exportjob-{updated,inserted,deleted} segments; matching -done completion queues and -error dead-letter style correlation messages on segment failure. - Blobs: export container (request.json, job.json, response/*.zip, result.json). - Tables: exportjobs partition area_jobId_tableId (sanitized), row key v7 GUID. Implementation: - SchemaTrackingExportService for create, list, status, dispatch, segment processing, optimistic ETag merges, finalize with 7-day read SAS URIs. - SchemaTrackingExportStreamingZip streams SqlDataReader into one ZIP entry. - SqlStatementExtensions: CHANGETABLE segment SELECTs; Guid.CreateVersion7 for new identifiers where applicable. Also: README schema tracking export section; Azure.Data.Tables dependency; Directory.Packages.props bumps (Functions worker, OpenTelemetry, SqlClient, Cronos, etc.). --- README.md | 9 + src/Directory.Packages.props | 15 +- src/SqlBulkSyncFunction/Constants.cs | 70 ++ .../GetSyncJobConfig.SchemaTrackingExport.cs | 161 ++++ .../Functions/GetSyncJobConfig.cs | 3 +- .../Functions/ProcessExportJobQueues.cs | 157 ++++ .../Helpers/SchemaTrackingExportTableKeys.cs | 25 + .../Helpers/SqlStatementExtensions.cs | 68 +- .../Schema/Export/ExportJobCreateResult.cs | 33 + .../Schema/Export/ExportJobTableEntity.cs | 85 ++ .../Schema/Export/ExportJobTableMapper.cs | 42 + .../Schema/Export/SchemaTrackingExportJob.cs | 29 + .../Export/SchemaTrackingExportJobListItem.cs | 33 + .../SchemaTrackingExportJobListItemResult.cs | 36 + .../Export/SchemaTrackingExportJobResult.cs | 39 + .../Export/SchemaTrackingExportJobStatus.cs | 32 + .../Export/SchemaTrackingExportRequestBody.cs | 15 + .../Export/SchemaTrackingExportSegment.cs | 24 + src/SqlBulkSyncFunction/Program.cs | 10 + .../Services/SchemaTrackingExportService.cs | 779 ++++++++++++++++++ .../SchemaTrackingExportStreamingZip.cs | 110 +++ .../SqlBulkSyncFunction.csproj | 1 + 22 files changed, 1766 insertions(+), 10 deletions(-) create mode 100644 src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.SchemaTrackingExport.cs create mode 100644 src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs create mode 100644 src/SqlBulkSyncFunction/Helpers/SchemaTrackingExportTableKeys.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobCreateResult.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJob.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobStatus.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportRequestBody.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs create mode 100644 src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs create mode 100644 src/SqlBulkSyncFunction/Services/SchemaTrackingExportStreamingZip.cs diff --git a/README.md b/README.md index da32523..3148c5f 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,15 @@ The function is configured through Azure App Settings / Environment variables, y > > `Custom` schedule is default if no scheduled is specified in sync job configuration. +## Schema tracking export + +Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`. + +HTTP routes (Function authorization level): + +- `POST /api/config/{area}/{id}/schema/tracking/{tableId}/export` — JSON body `{ "author", "referenceId", "purpose" }`; returns `202 Accepted` with `Location` pointing at status. +- `GET /api/config/{area}/{id}/schema/tracking/{tableId}/export/status/{*correlationId}` — same item shape as list; includes optional `result` when `response/result.json` exists. +- `GET /api/config/{area}/{id}/schema/tracking/{tableId}/export/status` — list jobs for that table partition. ## Development resources diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 18b7254..7e31499 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -9,16 +9,17 @@ - - - - + + + + - + - + + - + \ No newline at end of file diff --git a/src/SqlBulkSyncFunction/Constants.cs b/src/SqlBulkSyncFunction/Constants.cs index 860d692..e5268fd 100644 --- a/src/SqlBulkSyncFunction/Constants.cs +++ b/src/SqlBulkSyncFunction/Constants.cs @@ -21,7 +21,58 @@ public static class Queues /// Sync progress queue name. /// public const string SyncJobProgressQueue = "syncjobprogress"; + + /// + /// Main queue for schema tracking data export jobs (message body: full correlation id path). + /// + public const string ExportJob = "exportjob"; + + /// + /// Queue for export workers that build the updated-rows ZIP from change tracking. + /// + public const string ExportJobUpdated = "exportjob-updated"; + + /// + /// Queue for export workers that build the inserted-rows ZIP from change tracking. + /// + public const string ExportJobInserted = "exportjob-inserted"; + + /// + /// Queue for export workers that build the deleted-rows ZIP from change tracking. + /// + public const string ExportJobDeleted = "exportjob-deleted"; + + /// + /// Signaled when the updated ZIP has been written for an export job. + /// + public const string ExportJobUpdatedDone = ExportJobUpdated + "-done"; + + /// + /// Signaled when the inserted ZIP has been written for an export job. + /// + public const string ExportJobInsertedDone = ExportJobInserted + "-done"; + + /// + /// Signaled when the deleted ZIP has been written for an export job. + /// + public const string ExportJobDeletedDone = ExportJobDeleted + "-done"; + + /// + /// Updated segment only: message body is the correlation id when ProcessExportSegmentAsync throws during processing (SQL/ZIP/blob). Invalid queue bodies are rejected at the trigger with ArgumentException.ThrowIfNullOrEmpty. + /// + public const string ExportJobUpdatedError = ExportJobUpdated + "-error"; + + /// + /// Inserted segment only: correlation id when segment processing throws. + /// + public const string ExportJobInsertedError = ExportJobInserted + "-error"; + + /// + /// Deleted segment only: correlation id when segment processing throws. + /// + public const string ExportJobDeletedError = ExportJobDeleted + "-error"; } + /// /// NCRONTAB expressions and configuration keys for timer triggers. /// @@ -63,6 +114,22 @@ public static class Containers /// Blob container for per-job monitoring aggregates (written by the aggregation timer). /// public const string Monitor = "monitor"; + + /// + /// Blob container for schema tracking export requests, jobs, response ZIPs, and result metadata. + /// + public const string Export = "export"; + } + + /// + /// Azure Table Storage table names used by the function app. + /// + public static class Tables + { + /// + /// Table storing export job state (partition: area_id_tableId, row: export job guid). + /// + public const string ExportJobs = "exportjobs"; } /// @@ -72,5 +139,8 @@ public static class BlobContentTypes { /// JSON documents (UTF-8). public const string Json = "application/json; charset=utf-8"; + + /// ZIP archives. + public const string Zip = "application/zip"; } } diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.SchemaTrackingExport.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.SchemaTrackingExport.cs new file mode 100644 index 0000000..8ddc65f --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.SchemaTrackingExport.cs @@ -0,0 +1,161 @@ +using System; +using System.Globalization; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; +using SqlBulkSyncFunction.Models.Schema.Export; + +namespace SqlBulkSyncFunction.Functions; + +#nullable enable + +public partial class GetSyncJobConfig +{ + private static readonly JsonSerializerOptions ExportJsonOptions = new() + { + PropertyNameCaseInsensitive = true, + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + /// + /// Accepts a schema tracking data export request, persists metadata to blob and table storage, and enqueues processing. + /// + [Function(nameof(GetSyncJobConfig) + nameof(PostSchemaTrackingExport))] + public async Task PostSchemaTrackingExport( + [HttpTrigger( + AuthorizationLevel.Function, + "post", + Route = "config/{area}/{id}/schema/tracking/{tableId}/export" + )] + HttpRequest req, + string area, + string id, + string tableId, + CancellationToken cancellationToken + ) + { + ArgumentNullException.ThrowIfNull(req); + + SchemaTrackingExportRequestBody? body; + try + { + body = await JsonSerializer + .DeserializeAsync(req.Body, ExportJsonOptions, cancellationToken) + .ConfigureAwait(false); + } + catch (JsonException) + { + return new BadRequestObjectResult("Invalid JSON body."); + } + + if (body == null) + { + return new BadRequestObjectResult("Body is required."); + } + + var result = await schemaTrackingExportService + .TryCreateExportJobAsync(area, id, tableId, body, cancellationToken) + .ConfigureAwait(false); + + if (result.Code == ExportJobCreateResultCode.ValidationFailed) + { + return new BadRequestObjectResult("author, referenceId, and purpose must be non-empty strings."); + } + + if (result.Code == ExportJobCreateResultCode.NotFound || result.Job == null) + { + return new NotFoundResult(); + } + + var location = BuildExportStatusLocation(req, area, id, tableId, result.Job.CorrelationId); + return new AcceptedResult(location: location, value: result.Job); + } + + /// + /// Returns detailed status for a single export job when is present in the path, + /// or lists export jobs for the table when the URL ends at .../export/status (catch-all binds empty). + /// + /// + /// A separate route without {*correlationId} would never win in the host: the catch-all matches the same URL with an empty remainder, + /// and returns null for an empty id, producing 404. List behavior is therefore handled here. + /// + [Function(nameof(GetSyncJobConfig) + nameof(GetSchemaTrackingExportStatus))] + public async Task GetSchemaTrackingExportStatus( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route = "config/{area}/{id}/schema/tracking/{tableId}/export/status/{*correlationId}" + )] + HttpRequest req, + string area, + string id, + string tableId, + string correlationId, + CancellationToken cancellationToken + ) + { + ArgumentNullException.ThrowIfNull(req); + + if (string.IsNullOrWhiteSpace(correlationId)) + { + if (string.IsNullOrWhiteSpace(area) || + string.IsNullOrWhiteSpace(id) || + string.IsNullOrWhiteSpace(tableId) || + syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) != true || + jobConfig == null || + !StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig.Area) || + jobConfig.Tables == null || + !jobConfig.Tables.TryGetValue(tableId, out _)) + { + return new NotFoundResult(); + } + + var items = await schemaTrackingExportService + .ListExportJobsAsync(area, id, tableId, cancellationToken) + .ConfigureAwait(false); + + return new OkObjectResult(items); + } + + var status = await schemaTrackingExportService + .TryGetExportStatusAsync(area, id, tableId, correlationId, cancellationToken) + .ConfigureAwait(false); + + if (status == null) + { + return new NotFoundResult(); + } + + return new OkObjectResult(status); + } + + private static string BuildExportStatusLocation( + HttpRequest req, + string area, + string jobId, + string tableId, + string correlationId + ) + { + var encodedCorrelation = string.Join( + "/", + correlationId.Split('/', StringSplitOptions.RemoveEmptyEntries).Select(Uri.EscapeDataString) + ); + var pathBase = req.PathBase.Value?.TrimEnd('/') ?? string.Empty; + var apiRoot = string.IsNullOrEmpty(pathBase) ? "/api" : pathBase; + var path = string.Format( + CultureInfo.InvariantCulture, + "{0}/config/{1}/{2}/schema/tracking/{3}/export/status/{4}", + apiRoot, + Uri.EscapeDataString(area), + Uri.EscapeDataString(jobId), + Uri.EscapeDataString(tableId), + encodedCorrelation + ); + return string.Format(CultureInfo.InvariantCulture, "{0}://{1}{2}", req.Scheme, req.Host.Value, path); + } +} diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs index 56a89f4..a53fa19 100644 --- a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs @@ -8,7 +8,8 @@ namespace SqlBulkSyncFunction.Functions; public partial class GetSyncJobConfig( ILogger logger, IOptions syncJobsConfig, - ITokenCacheService tokenCacheService + ITokenCacheService tokenCacheService, + SchemaTrackingExportService schemaTrackingExportService ) { } diff --git a/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs b/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs new file mode 100644 index 0000000..8ebc2c2 --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs @@ -0,0 +1,157 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Storage.Queues; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Logging; +using SqlBulkSyncFunction.Models.Schema.Export; +using SqlBulkSyncFunction.Services; + +namespace SqlBulkSyncFunction.Functions; + +/// +/// Queue-triggered handlers for schema tracking export orchestration (dispatch, segment ZIPs, finalize). +/// +public sealed class ProcessExportJobQueues( + ILogger logger, + QueueServiceClient queueServiceClient, + SchemaTrackingExportService schemaTrackingExportService + ) +{ + /// + /// Fans out a new export job to the three segment queues. + /// + [Function(nameof(ProcessExportJobQueues) + nameof(DispatchExportJob))] + public async Task DispatchExportJob( + [QueueTrigger(Constants.Queues.ExportJob)] string correlationId, + CancellationToken cancellationToken + ) + { + ArgumentException.ThrowIfNullOrEmpty(correlationId); + + using (logger.BeginScope("CorrelationId={CorrelationId}", correlationId)) + { + await schemaTrackingExportService + .DispatchExportJobAsync(correlationId, cancellationToken) + .ConfigureAwait(false); + } + } + + /// + /// Builds the updated-rows ZIP for an export job. + /// + [Function(nameof(ProcessExportJobQueues) + nameof(ProcessExportUpdated))] + public Task ProcessExportUpdated( + [QueueTrigger(Constants.Queues.ExportJobUpdated)] string correlationId, + CancellationToken cancellationToken + ) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.Updated, cancellationToken); + + /// + /// Builds the inserted-rows ZIP for an export job. + /// + [Function(nameof(ProcessExportJobQueues) + nameof(ProcessExportInserted))] + public Task ProcessExportInserted( + [QueueTrigger(Constants.Queues.ExportJobInserted)] string correlationId, + CancellationToken cancellationToken + ) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.Inserted, cancellationToken); + + /// + /// Builds the deleted-rows ZIP (primary keys) for an export job. + /// + [Function(nameof(ProcessExportJobQueues) + nameof(ProcessExportDeleted))] + public Task ProcessExportDeleted( + [QueueTrigger(Constants.Queues.ExportJobDeleted)] string correlationId, + CancellationToken cancellationToken + ) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken); + + /// + /// Records completion of the updated segment and finalizes the job when all segments are done. + /// + [Function(nameof(ProcessExportJobQueues) + nameof(OnExportUpdatedDone))] + public Task OnExportUpdatedDone( + [QueueTrigger(Constants.Queues.ExportJobUpdatedDone)] string correlationId, + CancellationToken cancellationToken + ) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.Updated, cancellationToken); + + /// + /// Records completion of the inserted segment and finalizes the job when all segments are done. + /// + [Function(nameof(ProcessExportJobQueues) + nameof(OnExportInsertedDone))] + public Task OnExportInsertedDone( + [QueueTrigger(Constants.Queues.ExportJobInsertedDone)] string correlationId, + CancellationToken cancellationToken + ) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.Inserted, cancellationToken); + + /// + /// Records completion of the deleted segment and finalizes the job when all segments are done. + /// + [Function(nameof(ProcessExportJobQueues) + nameof(OnExportDeletedDone))] + public Task OnExportDeletedDone( + [QueueTrigger(Constants.Queues.ExportJobDeletedDone)] string correlationId, + CancellationToken cancellationToken + ) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken); + + private async Task ProcessSegmentAsync( + string correlationId, + SchemaTrackingExportSegment segment, + CancellationToken cancellationToken + ) + { + ArgumentException.ThrowIfNullOrEmpty(correlationId); + + using (logger.BeginScope("CorrelationId={CorrelationId}, Segment={Segment}", correlationId, segment)) + { + try + { + await schemaTrackingExportService + .ProcessExportSegmentAsync(correlationId, segment, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception ex) + { + logger.LogError( + ex, + "Export segment {Segment} failed for {CorrelationId}; sending message to segment error queue.", + segment, + correlationId + ); + await EnqueueSegmentErrorAsync(correlationId, segment, cancellationToken).ConfigureAwait(false); + } + } + } + + private async Task OnSegmentDoneAsync( + string correlationId, + SchemaTrackingExportSegment segment, + CancellationToken cancellationToken + ) + { + ArgumentException.ThrowIfNullOrEmpty(correlationId); + + using (logger.BeginScope("CorrelationId={CorrelationId}, Segment={Segment}", correlationId, segment)) + { + await schemaTrackingExportService + .OnExportSegmentDoneAsync(correlationId, segment, cancellationToken) + .ConfigureAwait(false); + } + } + + private async Task EnqueueSegmentErrorAsync( + string correlationId, + SchemaTrackingExportSegment segment, + CancellationToken cancellationToken + ) + { + var queueName = segment switch + { + SchemaTrackingExportSegment.Updated => Constants.Queues.ExportJobUpdatedError, + SchemaTrackingExportSegment.Inserted => Constants.Queues.ExportJobInsertedError, + SchemaTrackingExportSegment.Deleted => Constants.Queues.ExportJobDeletedError, + _ => throw new ArgumentOutOfRangeException(nameof(segment), segment, null) + }; + + var queue = queueServiceClient.GetQueueClient(queueName); + _ = await queue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + _ = await queue.SendMessageAsync(correlationId, cancellationToken: cancellationToken).ConfigureAwait(false); + } +} diff --git a/src/SqlBulkSyncFunction/Helpers/SchemaTrackingExportTableKeys.cs b/src/SqlBulkSyncFunction/Helpers/SchemaTrackingExportTableKeys.cs new file mode 100644 index 0000000..a7b87a5 --- /dev/null +++ b/src/SqlBulkSyncFunction/Helpers/SchemaTrackingExportTableKeys.cs @@ -0,0 +1,25 @@ +using System.Globalization; +using SqlBulkSyncFunction.Services; + +namespace SqlBulkSyncFunction.Helpers; + +/// +/// Builds Azure Table Storage partition keys for export job entities. +/// +public static class SchemaTrackingExportTableKeys +{ + /// + /// Builds partition key {area}_{id}_{tableId} with per-segment sanitization compatible with Azure Table key rules. + /// + /// Job area segment. + /// Job configuration id segment. + /// Table mapping id segment. + /// Composite partition key string. + public static string GetPartitionKey(string area, string jobId, string tableId) + => string.Format( + CultureInfo.InvariantCulture, + "{0}_{1}_{2}", + SyncMonitoringAggregationService.SanitizeBlobPathSegment(area), + SyncMonitoringAggregationService.SanitizeBlobPathSegment(jobId), + SyncMonitoringAggregationService.SanitizeBlobPathSegment(tableId)); +} diff --git a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs index ef7f8ce..0611105 100644 --- a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs +++ b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using SqlBulkSyncFunction.Models.Schema; +using SqlBulkSyncFunction.Models.Schema.Export; namespace SqlBulkSyncFunction.Helpers; @@ -86,6 +87,69 @@ FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct """; } + /// + /// Builds a query for one export segment: insert/update rows join the live table for full column values; + /// delete rows return primary key columns from CHANGETABLE only. + /// SYS_CHANGE_OPERATION is not projected (segment already implies I/U/D) to reduce payload size. + /// + /// Fully qualified source table name. + /// Source columns (explicit list; no *). + /// Which change operation to return. + /// Parameterized SQL using @FromVersion. + public static string GetChangeTrackingExportSegmentSelectStatement( + string sourceTableName, + Column[] columns, + SchemaTrackingExportSegment segment + ) + { + var primaryKeyColumns = columns.Where(column => column.IsPrimary).ToArray(); + if (primaryKeyColumns.Length == 0) + { + throw new InvalidOperationException($"Missing primary key columns for table {sourceTableName}."); + } + + var operationFilter = segment switch + { + SchemaTrackingExportSegment.Updated => "N'U'", + SchemaTrackingExportSegment.Inserted => "N'I'", + SchemaTrackingExportSegment.Deleted => "N'D'", + _ => throw new ArgumentOutOfRangeException(nameof(segment)) + }; + + if (segment == SchemaTrackingExportSegment.Deleted) + { + var primarySelect = string.Join( + ",\r\n ", + primaryKeyColumns.Select(column => string.Concat("ct.", column.QuoteName, " AS ", column.QuoteName)) + ); + + return $""" + SELECT {primarySelect} + FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct + WHERE ct.SYS_CHANGE_OPERATION = {operationFilter} + """; + } + + var pkJoin = string.Join( + " AND\r\n ", + primaryKeyColumns.Select( + column => string.Concat("ct.", column.QuoteName, " = t.", column.QuoteName) + ) + ); + + var tableColumns = string.Join( + ",\r\n ", + columns.Select(column => string.Concat("t.", column.QuoteName, " AS ", column.QuoteName)) + ); + + return $""" + SELECT {tableColumns} + FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct + INNER JOIN {sourceTableName} AS t ON {pkJoin} + WHERE ct.SYS_CHANGE_OPERATION = {operationFilter} + """; + } + public static string GetDropStatement(this string tableName) => $""" IF OBJECT_ID('{tableName}') IS NOT NULL @@ -262,7 +326,7 @@ CONSTRAINT [PK_{3}] PRIMARY KEY CLUSTERED column => column.QuoteName ) ), - Guid.NewGuid() + Guid.CreateVersion7() ); return statement; } @@ -295,7 +359,7 @@ CONSTRAINT [PK_{2}] PRIMARY KEY CLUSTERED ) ) ), - Guid.NewGuid(), + Guid.CreateVersion7(), string.Join( ",\r\n ", tableSchema.Columns diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobCreateResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobCreateResult.cs new file mode 100644 index 0000000..5b69bc3 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobCreateResult.cs @@ -0,0 +1,33 @@ +using System; + +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// Outcome of attempting to create a schema tracking export job. +/// +/// Whether creation succeeded or why it was rejected. +/// Populated when is . +public record ExportJobCreateResult(ExportJobCreateResultCode Code, SchemaTrackingExportJob? Job); + +/// +/// Result codes for . +/// +public enum ExportJobCreateResultCode +{ + /// + /// Job was persisted and enqueued. + /// + Created = 0, + + /// + /// Request body failed validation (missing or whitespace fields). + /// + ValidationFailed = 1, + + /// + /// Job configuration or table mapping was not found for the route. + /// + NotFound = 2 +} diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs new file mode 100644 index 0000000..f7791cd --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs @@ -0,0 +1,85 @@ +using System; +using Azure.Data.Tables; + +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// Azure Table Storage entity for schema tracking export job coordination. +/// Partition key: area_id_tableId (sanitized segments). Row key: export job id (version 7 GUID, n format, no dashes). +/// +public sealed class ExportJobTableEntity : ITableEntity +{ + /// + public string PartitionKey { get; set; } = string.Empty; + + /// + public string RowKey { get; set; } = string.Empty; + + /// + public DateTimeOffset? Timestamp { get; set; } + + /// + public Azure.ETag ETag { get; set; } + + /// + /// Full correlation id path used as blob prefix and in queue messages. + /// + public string CorrelationId { get; set; } = string.Empty; + + /// + /// Sync job area (configuration). + /// + public string Area { get; set; } = string.Empty; + + /// + /// Sync job configuration id. + /// + public string JobId { get; set; } = string.Empty; + + /// + /// Configured table mapping id. + /// + public string TableId { get; set; } = string.Empty; + + /// + /// Reference id from the export request (denormalized for list queries without reading job.json). + /// + public string ReferenceId { get; set; } = string.Empty; + + /// + /// Serialized value name. + /// + public string Status { get; set; } = nameof(SchemaTrackingExportJobStatus.Pending); + + /// + /// Whether the updated ZIP segment finished successfully. + /// + public bool UpdatedDone { get; set; } + + /// + /// Whether the inserted ZIP segment finished successfully. + /// + public bool InsertedDone { get; set; } + + /// + /// Whether the deleted ZIP segment finished successfully. + /// + public bool DeletedDone { get; set; } + + /// + /// UTC time when the job was accepted. + /// + public DateTimeOffset CreatedUtc { get; set; } + + /// + /// UTC time when the job completed successfully; null if not completed. + /// + public DateTimeOffset? CompletedUtc { get; set; } + + /// + /// Optional failure message. + /// + public string? Error { get; set; } +} diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs new file mode 100644 index 0000000..0e78b46 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs @@ -0,0 +1,42 @@ +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// Maps rows to API DTOs shared by list and single-job status endpoints. +/// +public static class ExportJobTableMapper +{ + /// + /// Maps a table row to ; is set only when loading result.json for detail responses. + /// + /// Persisted export job row. + /// Optional SAS ZIP URIs for status detail when result.json was loaded. + public static SchemaTrackingExportJobListItem ToListItem(ExportJobTableEntity entity, SchemaTrackingExportJobListItemResult? result = null) + => new( + CorrelationId: entity.CorrelationId, + ExportJobId: entity.RowKey, + Status: ParseStatus(entity.Status), + ReferenceId: entity.ReferenceId ?? string.Empty, + Created: entity.CreatedUtc, + Completed: entity.CompletedUtc, + UpdatedDone: entity.UpdatedDone, + InsertedDone: entity.InsertedDone, + DeletedDone: entity.DeletedDone, + Error: entity.Error, + Result: result + ); + + /// + /// Parses the persisted status string from table storage into . + /// + /// Raw value. + public static SchemaTrackingExportJobStatus ParseStatus(string? status) + => status switch + { + nameof(SchemaTrackingExportJobStatus.Running) => SchemaTrackingExportJobStatus.Running, + nameof(SchemaTrackingExportJobStatus.Completed) => SchemaTrackingExportJobStatus.Completed, + nameof(SchemaTrackingExportJobStatus.Failed) => SchemaTrackingExportJobStatus.Failed, + _ => SchemaTrackingExportJobStatus.Pending + }; +} diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJob.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJob.cs new file mode 100644 index 0000000..319a04e --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJob.cs @@ -0,0 +1,29 @@ +using System; + +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// Metadata for an accepted schema tracking export job, stored as job.json under the export blob prefix. +/// +/// Configured sync job area. +/// Configured sync job identifier. +/// Configured table mapping identifier. +/// Blob path prefix segments for this job (date/hour/minute + version 7 id). +/// Version 7 GUID in n format (no dashes); same value as Azure Table row key. +/// Reference from the export request. +/// Author from the export request. +/// Purpose from the export request. +/// UTC creation time when the job was accepted. +public record SchemaTrackingExportJob( + string Area, + string Id, + string TableId, + string CorrelationId, + string ExportJobId, + string ReferenceId, + string Author, + string Purpose, + DateTimeOffset Created +); diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs new file mode 100644 index 0000000..fc2a011 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs @@ -0,0 +1,33 @@ +using System; + +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// Export job row returned by GET .../export/status (list) and GET .../export/status/{correlationId} (detail); is set only on detail when finalized. +/// +/// Full correlation path used for blobs and deep links. +/// Export job id (n-format version 7 GUID); same as Azure Table row key. +/// Current job status. +/// Reference from the export request. +/// UTC creation time. +/// UTC completion time when finished; if not completed. +/// Whether the updated ZIP leg completed. +/// Whether the inserted ZIP leg completed. +/// Whether the deleted ZIP leg completed. +/// Optional error message when status is failed. +/// Populated for single-job status when response/result.json exists; otherwise . +public record SchemaTrackingExportJobListItem( + string CorrelationId, + string ExportJobId, + SchemaTrackingExportJobStatus Status, + string ReferenceId, + DateTimeOffset Created, + DateTimeOffset? Completed, + bool UpdatedDone, + bool InsertedDone, + bool DeletedDone, + string? Error, + SchemaTrackingExportJobListItemResult? Result = null +); diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs new file mode 100644 index 0000000..fefa0d6 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs @@ -0,0 +1,36 @@ +using System; + +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// SAS download details exposed on for status responses. +/// Subset of (blob result.json still stores the full record). +/// +/// UTC time when the returned SAS URLs expire (aligned with SAS token). +/// Read SAS URI for updated.zip. +/// Read SAS URI for inserted.zip. +/// Read SAS URI for deleted.zip. +public record SchemaTrackingExportJobListItemResult( + DateTimeOffset SasExpires, + Uri UpdatedZipSasUri, + Uri InsertedZipSasUri, + Uri DeletedZipSasUri +) +{ + /// + /// Maps a deserialized to the API subset, or returns when is . + /// + /// Full result from result.json, or . + /// List-item result with SAS URIs only, or . + public static SchemaTrackingExportJobListItemResult? FromJobResult(SchemaTrackingExportJobResult? jobResult) + => jobResult == null + ? null + : new SchemaTrackingExportJobListItemResult( + jobResult.SasExpires, + jobResult.UpdatedZipSasUri, + jobResult.InsertedZipSasUri, + jobResult.DeletedZipSasUri + ); +} diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs new file mode 100644 index 0000000..ce330ca --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs @@ -0,0 +1,39 @@ +using System; + +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// Final export outcome written to response/result.json, including time-limited read URLs for ZIP responses. +/// +/// Configured sync job area. +/// Configured sync job identifier. +/// Configured table mapping identifier. +/// Blob path prefix for this job. +/// Export job id (n format); same as Azure Table row key. +/// Reference from the export request. +/// Author from the export request. +/// Purpose from the export request. +/// UTC time when the job was accepted. +/// UTC time when all segments and result metadata finished. +/// UTC time when the returned SAS URLs expire (aligned with SAS token). +/// Read SAS URI for updated.zip. +/// Read SAS URI for inserted.zip. +/// Read SAS URI for deleted.zip. +public record SchemaTrackingExportJobResult( + string Area, + string Id, + string TableId, + string CorrelationId, + string ExportJobId, + string ReferenceId, + string Author, + string Purpose, + DateTimeOffset Created, + DateTimeOffset Completed, + DateTimeOffset SasExpires, + Uri UpdatedZipSasUri, + Uri InsertedZipSasUri, + Uri DeletedZipSasUri +); diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobStatus.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobStatus.cs new file mode 100644 index 0000000..91921f6 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobStatus.cs @@ -0,0 +1,32 @@ +using System.Text.Json.Serialization; + +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// High-level lifecycle state for a schema tracking export job persisted in Azure Table Storage. +/// +[JsonConverter(typeof(JsonStringEnumConverter))] +public enum SchemaTrackingExportJobStatus +{ + /// + /// Job was accepted; work may not have started yet. + /// + Pending = 0, + + /// + /// Dispatcher has enqueued segment workers or segment work is in progress. + /// + Running = 1, + + /// + /// All segments completed and result metadata was written. + /// + Completed = 2, + + /// + /// A fatal error occurred during export or finalize. + /// + Failed = 3 +} diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportRequestBody.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportRequestBody.cs new file mode 100644 index 0000000..1e789c2 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportRequestBody.cs @@ -0,0 +1,15 @@ +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// Request body accepted by POST .../schema/tracking/{tableId}/export to justify and trace a sensitive data export. +/// +/// Person or system initiating the export (required, non-whitespace). +/// External ticket or reference identifier (required, non-whitespace). +/// Business justification for exporting table data (required, non-whitespace). +public record SchemaTrackingExportRequestBody( + string Author, + string ReferenceId, + string Purpose +); diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs new file mode 100644 index 0000000..083b182 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs @@ -0,0 +1,24 @@ +namespace SqlBulkSyncFunction.Models.Schema.Export; + +#nullable enable + +/// +/// Identifies which change-tracking operation bucket an export worker or ZIP file belongs to. +/// +public enum SchemaTrackingExportSegment +{ + /// + /// Rows with SYS_CHANGE_OPERATION = N'U'. + /// + Updated = 0, + + /// + /// Rows with SYS_CHANGE_OPERATION = N'I'. + /// + Inserted = 1, + + /// + /// Rows with SYS_CHANGE_OPERATION = N'D' (primary key columns only). + /// + Deleted = 2 +} diff --git a/src/SqlBulkSyncFunction/Program.cs b/src/SqlBulkSyncFunction/Program.cs index 023d338..5272e1d 100644 --- a/src/SqlBulkSyncFunction/Program.cs +++ b/src/SqlBulkSyncFunction/Program.cs @@ -1,3 +1,5 @@ +using System; +using Azure.Data.Tables; using Azure.Monitor.OpenTelemetry.Exporter; using Microsoft.Azure.Functions.Worker.OpenTelemetry; using Microsoft.Extensions.Azure; @@ -33,6 +35,14 @@ .AddSingleton() .AddSingleton() .AddSingleton() + .AddSingleton( + static _ => + { + var connectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage") + ?? throw new InvalidOperationException("AzureWebJobsStorage is not set."); + return new TableServiceClient(connectionString); + }) + .AddSingleton() .AddAzureClients( static az => { var connectionString = System.Environment.GetEnvironmentVariable("AzureWebJobsStorage"); diff --git a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs new file mode 100644 index 0000000..61611cf --- /dev/null +++ b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs @@ -0,0 +1,779 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Azure; +using Azure.Data.Tables; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using Azure.Storage.Queues; +using Azure.Storage.Sas; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using SqlBulkSyncFunction.Helpers; +using SqlBulkSyncFunction.Models.Job; +using SqlBulkSyncFunction.Models.Schema.Export; + +namespace SqlBulkSyncFunction.Services; + +#nullable enable + +/// +/// Coordinates schema tracking export jobs (blobs, queues, table state, and segment processing). +/// +public sealed class SchemaTrackingExportService( + ILogger logger, + IOptions syncJobsConfig, + ITokenCacheService tokenCacheService, + BlobServiceClient blobServiceClient, + QueueServiceClient queueServiceClient, + TableServiceClient tableServiceClient + ) +{ + private static readonly JsonSerializerOptions JsonOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + PropertyNameCaseInsensitive = true, + WriteIndented = false + }; + + private readonly BlobContainerClient _exportContainer = GetOrCreateBlobContainer(blobServiceClient, Constants.Containers.Export); + private readonly TableClient _exportJobsTable = GetOrCreateTable(tableServiceClient, Constants.Tables.ExportJobs); + + /// + /// Validates configuration, persists request and job blobs, creates the table row, and enqueues the main export queue. + /// + public async Task TryCreateExportJobAsync( + string area, + string jobId, + string tableId, + SchemaTrackingExportRequestBody request, + CancellationToken cancellationToken + ) + { + if (string.IsNullOrWhiteSpace(request.Author) || + string.IsNullOrWhiteSpace(request.ReferenceId) || + string.IsNullOrWhiteSpace(request.Purpose)) + { + return new ExportJobCreateResult(ExportJobCreateResultCode.ValidationFailed, null); + } + + if (!TryResolveJobTable(area, jobId, tableId)) + { + return new ExportJobCreateResult(ExportJobCreateResultCode.NotFound, null); + } + + var utcNow = DateTimeOffset.UtcNow; + var jobGuid = Guid.CreateVersion7(); + var exportJobId = jobGuid.ToString("n"); + var correlationId = FormattableString.Invariant( + $"{utcNow.Year:0000}/{utcNow.Month:00}/{utcNow.Day:00}/{utcNow.Hour:00}/{utcNow.Minute:00}/{exportJobId}"); + + var job = new SchemaTrackingExportJob( + Area: area, + Id: jobId, + TableId: tableId, + CorrelationId: correlationId, + ExportJobId: exportJobId, + ReferenceId: request.ReferenceId, + Author: request.Author, + Purpose: request.Purpose, + Created: utcNow + ); + + var requestBlob = _exportContainer.GetBlobClient($"{correlationId}/request.json"); + var jobBlob = _exportContainer.GetBlobClient($"{correlationId}/job.json"); + + _ = await requestBlob.UploadAsync( + BinaryData.FromObjectAsJson(request, JsonOptions), + new BlobUploadOptions + { + HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json } + }, + cancellationToken + ).ConfigureAwait(false); + + _ = await jobBlob.UploadAsync( + BinaryData.FromObjectAsJson(job, JsonOptions), + new BlobUploadOptions + { + HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json } + }, + cancellationToken + ).ConfigureAwait(false); + + var entity = new ExportJobTableEntity + { + PartitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(area, jobId, tableId), + RowKey = exportJobId, + CorrelationId = correlationId, + Area = area, + JobId = jobId, + TableId = tableId, + ReferenceId = request.ReferenceId, + Status = nameof(SchemaTrackingExportJobStatus.Pending), + CreatedUtc = utcNow, + UpdatedDone = false, + InsertedDone = false, + DeletedDone = false + }; + + _ = await _exportJobsTable.UpsertEntityAsync(entity, TableUpdateMode.Replace, cancellationToken).ConfigureAwait(false); + + var mainQueue = GetOrCreateQueue(Constants.Queues.ExportJob); + _ = await mainQueue.SendMessageAsync(correlationId, cancellationToken: cancellationToken).ConfigureAwait(false); + + return new ExportJobCreateResult(ExportJobCreateResultCode.Created, job); + } + + /// + /// Loads job and table state for a single correlation id under the given route context (same DTO as ). + /// + public async Task TryGetExportStatusAsync( + string area, + string jobId, + string tableId, + string correlationId, + CancellationToken cancellationToken + ) + { + if (!TryResolveJobTable(area, jobId, tableId)) + { + return null; + } + + var normalizedCorrelation = NormalizeCorrelationId(correlationId); + if (string.IsNullOrEmpty(normalizedCorrelation)) + { + return null; + } + + var job = await ReadJobBlobAsync(normalizedCorrelation, cancellationToken).ConfigureAwait(false); + if (job == null) + { + return null; + } + + if (!PartitionMatchesRoute(job, area, jobId, tableId)) + { + return null; + } + + var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(area, jobId, tableId); + var rowKey = job.ExportJobId; + + ExportJobTableEntity? entity; + try + { + var response = await _exportJobsTable + .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken) + .ConfigureAwait(false); + entity = response.Value; + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + entity = null; + } + + var resultBlob = _exportContainer.GetBlobClient($"{normalizedCorrelation}/response/result.json"); + SchemaTrackingExportJobResult? result = null; + if (await resultBlob.ExistsAsync(cancellationToken).ConfigureAwait(false)) + { + try + { + var download = await resultBlob.DownloadContentAsync(cancellationToken).ConfigureAwait(false); + result = download.Value.Content.ToObjectFromJson(JsonOptions); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to deserialize result.json for {CorrelationId}", normalizedCorrelation); + } + } + + var listResult = SchemaTrackingExportJobListItemResult.FromJobResult(result); + + if (entity != null) + { + return ExportJobTableMapper.ToListItem(entity, listResult); + } + + return new SchemaTrackingExportJobListItem( + CorrelationId: job.CorrelationId, + ExportJobId: job.ExportJobId, + Status: ExportJobTableMapper.ParseStatus(null), + ReferenceId: job.ReferenceId, + Created: job.Created, + Completed: null, + UpdatedDone: false, + InsertedDone: false, + DeletedDone: false, + Error: null, + Result: listResult + ); + } + + /// + /// Lists export jobs for the partition derived from , , and . + /// + public async Task> ListExportJobsAsync( + string area, + string jobId, + string tableId, + CancellationToken cancellationToken + ) + { + if (!TryResolveJobTable(area, jobId, tableId)) + { + return []; + } + + var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(area, jobId, tableId); + var filter = FormattableString.Invariant($"PartitionKey eq '{EscapeODataString(partitionKey)}'"); + return await _exportJobsTable + .QueryAsync( + filter, + cancellationToken: cancellationToken + ) + .Select(static e => ExportJobTableMapper.ToListItem(e)) + .OrderByDescending(static x => x.Created) + .ToArrayAsync(cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Dispatches a main-queue message to the three segment queues. + /// + public async Task DispatchExportJobAsync(string correlationId, CancellationToken cancellationToken) + { + var normalized = NormalizeCorrelationId(correlationId); + var job = await ReadJobBlobAsync(normalized, cancellationToken).ConfigureAwait(false); + if (job == null) + { + logger.LogWarning("Dispatch skipped: missing job.json for {CorrelationId}", normalized); + return; + } + + var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(job.Area, job.Id, job.TableId); + var rowKey = job.ExportJobId; + await PatchEntityAsync( + partitionKey, + rowKey, + static e => e.Status = nameof(SchemaTrackingExportJobStatus.Running), + cancellationToken + ).ConfigureAwait(false); + + var qUpdated = GetOrCreateQueue(Constants.Queues.ExportJobUpdated); + var qInserted = GetOrCreateQueue(Constants.Queues.ExportJobInserted); + var qDeleted = GetOrCreateQueue(Constants.Queues.ExportJobDeleted); + + // Do not pass visibilityTimeout on enqueue: non-zero values hide the message from Dequeue/Peek until the timeout elapses (scheduled/delayed work), not a processing lease. + _ = await qUpdated.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false); + _ = await qInserted.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false); + _ = await qDeleted.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false); + } + + /// + /// Builds one segment ZIP from SQL change tracking and notifies the corresponding done queue. + /// + public async Task ProcessExportSegmentAsync( + string correlationId, + SchemaTrackingExportSegment segment, + CancellationToken cancellationToken + ) + { + var normalized = NormalizeCorrelationId(correlationId); + var job = await ReadJobBlobAsync(normalized, cancellationToken).ConfigureAwait(false); + if (job == null) + { + logger.LogWarning("Segment worker: missing job for {CorrelationId}", normalized); + return; + } + + if (!syncJobsConfig.Value.Jobs.TryGetValue(job.Id, out var jobConfig) || + !string.Equals(jobConfig.Area, job.Area, StringComparison.OrdinalIgnoreCase)) + { + await MarkJobFailedAsync(job, "Job configuration not found or area mismatch.", cancellationToken).ConfigureAwait(false); + return; + } + + var syncJob = jobConfig.ToSyncJob( + scheduleCorrelationId: null, + tokenCache: await tokenCacheService.GetTokenCache(jobConfig).ConfigureAwait(false), + timestamp: DateTimeOffset.UtcNow, + expires: DateTimeOffset.UtcNow.AddMinutes(4), + id: job.Id, + schedule: nameof(jobConfig.Manual), + seed: false + ); + + var table = (syncJob.Tables ?? []).FirstOrDefault( + t => string.Equals(t.Id, job.TableId, StringComparison.OrdinalIgnoreCase) + ); + if (table == null) + { + await MarkJobFailedAsync(job, "Table mapping not found.", cancellationToken).ConfigureAwait(false); + return; + } + + try + { + await using var sourceConn = new SqlConnection(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken }; + await using var targetConn = new SqlConnection(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken }; + await sourceConn.OpenAsync(cancellationToken).ConfigureAwait(false); + await targetConn.OpenAsync(cancellationToken).ConfigureAwait(false); + + targetConn.EnsureSyncSchemaAndTableExists( + FormattableString.Invariant($"config/{job.Id}/{job.Area}/schema/tracking/{job.TableId}"), + logger + ); + + var columns = sourceConn.GetColumns(table.Source); + var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns); + var targetVersion = targetConn.GetTargetVersion(table.Target); + var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion; + + var sql = SqlStatementExtensions.GetChangeTrackingExportSegmentSelectStatement(table.Source, columns, segment); + + await using var cmd = new SqlCommand(sql, sourceConn) + { + CommandTimeout = 3600 + }; + _ = cmd.Parameters.Add(new SqlParameter("@FromVersion", SqlDbType.BigInt) { Value = fromVersion }); + + await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken) + .ConfigureAwait(false); + + var (zipPath, jsonPath) = GetZipRelativePath(segment); + var zipBlob = _exportContainer.GetBlobClient($"{normalized}/{zipPath}"); + await using var uploadStream = await zipBlob.OpenWriteAsync( + true, + new BlobOpenWriteOptions + { + HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip } + }, + cancellationToken + ) + .ConfigureAwait(false); + + await SchemaTrackingExportStreamingZip.WriteReaderToZipAsync(reader, uploadStream, jsonPath, cancellationToken) + .ConfigureAwait(false); + + var doneQueueName = GetDoneQueueName(segment); + var doneQueue = GetOrCreateQueue(doneQueueName); + _ = await doneQueue.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + logger.LogError(ex, "Export segment {Segment} failed for {CorrelationId}", segment, normalized); + await MarkJobFailedAsync(job, ex.Message, cancellationToken).ConfigureAwait(false); + throw; + } + } + + /// + /// Records segment completion and finalizes the job when all segments are done. + /// + public async Task OnExportSegmentDoneAsync( + string correlationId, + SchemaTrackingExportSegment segment, + CancellationToken cancellationToken + ) + { + var normalized = NormalizeCorrelationId(correlationId); + var job = await ReadJobBlobAsync(normalized, cancellationToken).ConfigureAwait(false); + if (job == null) + { + logger.LogWarning( + "Segment done handler: missing job.json for {CorrelationId} (check queue message body / encoding vs blob path).", + normalized + ); + return; + } + + var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(job.Area, job.Id, job.TableId); + var rowKey = job.ExportJobId; + + for (var attempt = 0; attempt < 12; attempt++) + { + Response response; + try + { + response = await _exportJobsTable + .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + logger.LogWarning("Finalize: missing table entity for {CorrelationId}", normalized); + return; + } + + var entity = response.Value; + if (IsLegDone(entity, segment)) + { + var alreadyRefreshed = await _exportJobsTable + .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken) + .ConfigureAwait(false); + await TryFinalizeIfCompleteAsync(job, alreadyRefreshed.Value, cancellationToken).ConfigureAwait(false); + return; + } + + SetLegDone(entity, segment, true); + + try + { + _ = await _exportJobsTable.UpdateEntityAsync( + entity, + response.Value.ETag, + TableUpdateMode.Merge, + cancellationToken + ).ConfigureAwait(false); + } + catch (RequestFailedException ex) when (ex.Status == 412) + { + continue; + } + + var refreshed = await _exportJobsTable + .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken) + .ConfigureAwait(false); + await TryFinalizeIfCompleteAsync(job, refreshed.Value, cancellationToken).ConfigureAwait(false); + return; + } + + logger.LogWarning("Finalize: exhausted optimistic retries for {CorrelationId}", normalized); + } + + private async Task TryFinalizeIfCompleteAsync( + SchemaTrackingExportJob job, + ExportJobTableEntity entity, + CancellationToken cancellationToken + ) + { + if (!entity.UpdatedDone || !entity.InsertedDone || !entity.DeletedDone) + { + return; + } + + var correlationId = NormalizeCorrelationId(job.CorrelationId); + var resultBlob = _exportContainer.GetBlobClient($"{correlationId}/response/result.json"); + if (await resultBlob.ExistsAsync(cancellationToken).ConfigureAwait(false)) + { + await MarkTableCompletedAsync(job, cancellationToken).ConfigureAwait(false); + return; + } + + var updatedZip = _exportContainer.GetBlobClient($"{correlationId}/response/updated.zip"); + var insertedZip = _exportContainer.GetBlobClient($"{correlationId}/response/inserted.zip"); + var deletedZip = _exportContainer.GetBlobClient($"{correlationId}/response/deleted.zip"); + + if (!await updatedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) || + !await insertedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) || + !await deletedZip.ExistsAsync(cancellationToken).ConfigureAwait(false)) + { + logger.LogWarning("Finalize: missing one or more ZIP blobs for {CorrelationId}", correlationId); + return; + } + + var sasExpires = DateTimeOffset.UtcNow.AddDays(7); + var updatedUri = GenerateReadSasUri(updatedZip, sasExpires); + var insertedUri = GenerateReadSasUri(insertedZip, sasExpires); + var deletedUri = GenerateReadSasUri(deletedZip, sasExpires); + + var completed = DateTimeOffset.UtcNow; + var result = new SchemaTrackingExportJobResult( + Area: job.Area, + Id: job.Id, + TableId: job.TableId, + CorrelationId: job.CorrelationId, + ExportJobId: job.ExportJobId, + ReferenceId: job.ReferenceId, + Author: job.Author, + Purpose: job.Purpose, + Created: job.Created, + Completed: completed, + SasExpires: sasExpires, + UpdatedZipSasUri: updatedUri, + InsertedZipSasUri: insertedUri, + DeletedZipSasUri: deletedUri + ); + + try + { + _ = await resultBlob.UploadAsync( + BinaryData.FromObjectAsJson(result, JsonOptions), + new BlobUploadOptions + { + HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json }, + Conditions = new BlobRequestConditions { IfNoneMatch = ETag.All } + }, + cancellationToken + ).ConfigureAwait(false); + } + catch (RequestFailedException ex) when (ex.Status == 412) + { + logger.LogInformation("result.json already created for {CorrelationId}", correlationId); + } + + await MarkTableCompletedAsync(job, cancellationToken).ConfigureAwait(false); + } + + private async Task MarkTableCompletedAsync(SchemaTrackingExportJob job, CancellationToken cancellationToken) + { + var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(job.Area, job.Id, job.TableId); + var rowKey = job.ExportJobId; + for (var attempt = 0; attempt < 12; attempt++) + { + Response response; + try + { + response = await _exportJobsTable + .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + return; + } + + var entity = response.Value; + if (string.Equals(entity.Status, nameof(SchemaTrackingExportJobStatus.Completed), StringComparison.Ordinal)) + { + return; + } + + entity.Status = nameof(SchemaTrackingExportJobStatus.Completed); + entity.CompletedUtc = DateTimeOffset.UtcNow; + entity.UpdatedDone = true; + entity.InsertedDone = true; + entity.DeletedDone = true; + + try + { + _ = await _exportJobsTable.UpdateEntityAsync( + entity, + response.Value.ETag, + TableUpdateMode.Merge, + cancellationToken + ).ConfigureAwait(false); + return; + } + catch (RequestFailedException ex) when (ex.Status == 412) + { + continue; + } + } + } + + private async Task MarkJobFailedAsync(SchemaTrackingExportJob job, string error, CancellationToken cancellationToken) + { + var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(job.Area, job.Id, job.TableId); + var rowKey = job.ExportJobId; + for (var attempt = 0; attempt < 8; attempt++) + { + try + { + var response = await _exportJobsTable + .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken) + .ConfigureAwait(false); + var entity = response.Value; + entity.Status = nameof(SchemaTrackingExportJobStatus.Failed); + entity.Error = error; + _ = await _exportJobsTable.UpdateEntityAsync( + entity, + response.Value.ETag, + TableUpdateMode.Merge, + cancellationToken + ).ConfigureAwait(false); + return; + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + return; + } + catch (RequestFailedException ex) when (ex.Status == 412) + { + continue; + } + } + } + + private async Task PatchEntityAsync( + string partitionKey, + string rowKey, + Action patch, + CancellationToken cancellationToken + ) + { + for (var attempt = 0; attempt < 8; attempt++) + { + try + { + var response = await _exportJobsTable + .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken) + .ConfigureAwait(false); + var entity = response.Value; + patch(entity); + _ = await _exportJobsTable.UpdateEntityAsync( + entity, + response.Value.ETag, + TableUpdateMode.Merge, + cancellationToken + ).ConfigureAwait(false); + return; + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + return; + } + catch (RequestFailedException ex) when (ex.Status == 412) + { + continue; + } + } + } + + private static Uri GenerateReadSasUri(BlobClient blob, DateTimeOffset expiresOn) + { + var sas = new BlobSasBuilder + { + Resource = "b", + BlobContainerName = blob.BlobContainerName, + BlobName = blob.Name, + StartsOn = DateTimeOffset.UtcNow.AddMinutes(-5), + ExpiresOn = expiresOn + }; + sas.SetPermissions(BlobSasPermissions.Read); + return blob.GenerateSasUri(sas); + } + + private static bool IsLegDone(ExportJobTableEntity entity, SchemaTrackingExportSegment segment) + => segment switch + { + SchemaTrackingExportSegment.Updated => entity.UpdatedDone, + SchemaTrackingExportSegment.Inserted => entity.InsertedDone, + SchemaTrackingExportSegment.Deleted => entity.DeletedDone, + _ => false + }; + + private static void SetLegDone(ExportJobTableEntity entity, SchemaTrackingExportSegment segment, bool value) + { + switch (segment) + { + case SchemaTrackingExportSegment.Updated: + entity.UpdatedDone = value; + break; + case SchemaTrackingExportSegment.Inserted: + entity.InsertedDone = value; + break; + case SchemaTrackingExportSegment.Deleted: + entity.DeletedDone = value; + break; + default: + throw new ArgumentOutOfRangeException(nameof(segment)); + } + } + + private static string GetDoneQueueName(SchemaTrackingExportSegment segment) + => segment switch + { + SchemaTrackingExportSegment.Updated => Constants.Queues.ExportJobUpdatedDone, + SchemaTrackingExportSegment.Inserted => Constants.Queues.ExportJobInsertedDone, + SchemaTrackingExportSegment.Deleted => Constants.Queues.ExportJobDeletedDone, + _ => throw new ArgumentOutOfRangeException(nameof(segment)) + }; + + /// + /// Blob path under the correlation prefix for the segment ZIP, and the single JSON entry name inside that ZIP. + /// + /// Export segment. + /// ZipPath matches finalize/SAS blob names (response/*.zip); JsonPath is the inner entry file name. + private static (string ZipPath, string JsonPath) GetZipRelativePath(SchemaTrackingExportSegment segment) + => segment switch + { + SchemaTrackingExportSegment.Updated => ("response/updated.zip", "updated.json"), + SchemaTrackingExportSegment.Inserted => ("response/inserted.zip", "inserted.json"), + SchemaTrackingExportSegment.Deleted => ("response/deleted.zip", "deleted.json"), + _ => throw new ArgumentOutOfRangeException(nameof(segment)) + }; + + private async Task ReadJobBlobAsync(string correlationId, CancellationToken cancellationToken) + { + var blob = _exportContainer.GetBlobClient($"{correlationId}/job.json"); + if (!await blob.ExistsAsync(cancellationToken).ConfigureAwait(false)) + { + return null; + } + + var content = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false); + return content.Value.Content.ToObjectFromJson(JsonOptions); + } + + private bool TryResolveJobTable(string area, string jobId, string tableId) + { + var jobs = syncJobsConfig.Value.Jobs; + if (string.IsNullOrWhiteSpace(area) || + string.IsNullOrWhiteSpace(jobId) || + string.IsNullOrWhiteSpace(tableId) || + jobs == null || + !jobs.TryGetValue(jobId, out var jc) || + jc == null || + !string.Equals(jc.Area, area, StringComparison.OrdinalIgnoreCase) || + jc.Tables == null || + !jc.Tables.TryGetValue(tableId, out var sourceTableName) || + string.IsNullOrWhiteSpace(sourceTableName)) + { + return false; + } + + return true; + } + + private static bool PartitionMatchesRoute(SchemaTrackingExportJob job, string area, string jobId, string tableId) + => string.Equals(job.Area, area, StringComparison.OrdinalIgnoreCase) + && string.Equals(job.Id, jobId, StringComparison.OrdinalIgnoreCase) + && string.Equals(job.TableId, tableId, StringComparison.OrdinalIgnoreCase); + + private static string NormalizeCorrelationId(string correlationId) + { + if (string.IsNullOrWhiteSpace(correlationId)) + { + return string.Empty; + } + + var s = correlationId.Trim().Replace('\\', '/').Trim('/'); + if (s.Contains("..", StringComparison.Ordinal)) + { + return string.Empty; + } + + return s; + } + + private static string EscapeODataString(string value) => value.Replace("'", "''", StringComparison.Ordinal); + + private static BlobContainerClient GetOrCreateBlobContainer(BlobServiceClient client, string name) + { + var c = client.GetBlobContainerClient(name); + _ = c.CreateIfNotExists(PublicAccessType.None); + return c; + } + + private static TableClient GetOrCreateTable(TableServiceClient client, string tableName) + { + var t = client.GetTableClient(tableName); + _ = t.CreateIfNotExists(); + return t; + } + + private QueueClient GetOrCreateQueue(string queueName) + { + var q = queueServiceClient.GetQueueClient(queueName); + _ = q.CreateIfNotExists(); + return q; + } +} diff --git a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportStreamingZip.cs b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportStreamingZip.cs new file mode 100644 index 0000000..0d0d6e4 --- /dev/null +++ b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportStreamingZip.cs @@ -0,0 +1,110 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Data.SqlClient; + +namespace SqlBulkSyncFunction.Services; + +#nullable enable + +/// +/// Streams SQL rows into a JSON array inside a single ZIP entry written to (e.g. Azure Blob OpenWriteAsync), avoiding a temp file and full in-memory buffering of the export. +/// +public static class SchemaTrackingExportStreamingZip +{ + private static readonly JsonSerializerOptions RowSerializerOptions = new() + { + WriteIndented = false, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }; + + /// + /// Reads forward-only from and writes a JSON array into one ZIP entry named on . + /// + /// Open data reader; disposed by caller. + /// Writable stream for the ZIP payload (e.g. block blob staged write). Must remain open until this method returns; caller disposes it. + /// Entry name inside the archive (e.g. inserted.json). + /// Cancellation token. + public static async Task WriteReaderToZipAsync( + SqlDataReader reader, + Stream zipDestination, + string zipEntryFileName, + CancellationToken cancellationToken + ) + { + ArgumentNullException.ThrowIfNull(reader); + ArgumentNullException.ThrowIfNull(zipDestination); + ArgumentException.ThrowIfNullOrWhiteSpace(zipEntryFileName); + if (!zipDestination.CanWrite) + { + throw new ArgumentException("Stream must be writable.", nameof(zipDestination)); + } + + using var bufferedStream = new BufferedStream(zipDestination, 8192); + using var zipArchive = new ZipArchive(bufferedStream, ZipArchiveMode.Create, leaveOpen: true); + await WriteReaderAsIndentedJsonArrayAsync(reader, zipArchive, zipEntryFileName, cancellationToken).ConfigureAwait(false); + await bufferedStream.FlushAsync(cancellationToken).ConfigureAwait(false); + } + + /// + /// Creates a single ZIP entry and streams a UTF-8 JSON array: opening bracket, one compact object per SQL row (commas between rows), closing bracket. + /// + /// Forward-only SQL reader; disposed by caller. + /// ZIP archive already opened in create mode. + /// Name of the entry inside the archive (e.g. inserted.json). + /// Cancellation token. + private static async Task WriteReaderAsIndentedJsonArrayAsync( + SqlDataReader reader, + ZipArchive zipArchive, + string zipEntryFileName, + CancellationToken cancellationToken + ) + { + // One deflated entry; JSON is built incrementally so the full export is not held in memory. + var entry = zipArchive.CreateEntry(zipEntryFileName, CompressionLevel.Optimal); + await using var entryStream = entry.Open(); + + // Outer JSON array: '[' then rows; each row is one JsonSerializer object (no extra array indent options). + entryStream.WriteByte(0x5b); // '[' + var rowBuffer = new Dictionary(StringComparer.OrdinalIgnoreCase); + var firstRow = true; + while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + // Human-friendly separation between top-level array elements (objects stay compact via RowSerializerOptions). + if (firstRow) + { + firstRow = false; + entryStream.Write("\n "u8); + } + else + { + entryStream.Write(",\n "u8); + } + + // Reuse one dictionary per row to avoid allocating a new map for every record. + rowBuffer.Clear(); + for (var i = 0; i < reader.FieldCount; i++) + { + // Skip SQL NULLs: omit properties entirely (smaller JSON than null fields). + if (reader.IsDBNull(i)) + { + continue; + } + + rowBuffer.Add(reader.GetName(i), reader.GetValue(i)); + } + + await JsonSerializer + .SerializeAsync(entryStream, rowBuffer, RowSerializerOptions, cancellationToken) + .ConfigureAwait(false); + } + + // Closing bracket: ']' with trailing newline for human readability. + entryStream.Write("\n]"u8); + } +} diff --git a/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj b/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj index 20a1160..14b5cca 100644 --- a/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj +++ b/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj @@ -23,6 +23,7 @@ + From 08b03484b0dc241c696bda47afca379fad531d5a Mon Sep 17 00:00:00 2001 From: Mattias Karlsson Date: Thu, 30 Apr 2026 16:58:05 +0200 Subject: [PATCH 2/2] Update .NET SDK to 10.0.203 --- global.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/global.json b/global.json index 5406078..380e804 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "10.0.201", + "version": "10.0.203", "rollForward": "latestFeature", "allowPrerelease": false }