Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ The function is configured through Azure App Settings / Environment variables, y
>
> `Custom` schedule is default if no scheduled is specified in sync job configuration.

## Schema tracking export

Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`.

HTTP routes (Function authorization level):

- `POST /api/config/{area}/{id}/schema/tracking/{tableId}/export` — JSON body `{ "author", "referenceId", "purpose" }`; returns `202 Accepted` with `Location` pointing at status.
- `GET /api/config/{area}/{id}/schema/tracking/{tableId}/export/status/{*correlationId}` — same item shape as list; includes optional `result` when `response/result.json` exists.
- `GET /api/config/{area}/{id}/schema/tracking/{tableId}/export/status` — list jobs for that table partition.

## Development resources

Expand Down
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "10.0.201",
"version": "10.0.203",
"rollForward": "latestFeature",
"allowPrerelease": false
}
Expand Down
15 changes: 8 additions & 7 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.1.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.OpenApi" Version="1.6.0" />
<PackageVersion Include="Azure.Monitor.OpenTelemetry.Exporter" Version="1.7.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.OpenTelemetry" Version="1.1.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.15.2" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.15.2" />
<PackageVersion Include="Azure.Monitor.OpenTelemetry.Exporter" Version="1.8.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.OpenTelemetry" Version="1.2.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.15.3" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Timer" Version="4.3.1" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="6.8.1" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.51.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.52.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.7" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="7.0.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="7.0.1" />
<PackageVersion Include="Azure.Data.Tables" Version="12.11.0" />
<PackageVersion Include="NuGetizer" Version="1.4.7" />
<PackageVersion Include="Cronos" Version="0.12.0" />
<PackageVersion Include="Cronos" Version="0.13.0" />
</ItemGroup>
</Project>
70 changes: 70 additions & 0 deletions src/SqlBulkSyncFunction/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,58 @@ public static class Queues
/// Sync progress queue name.
/// </summary>
public const string SyncJobProgressQueue = "syncjobprogress";

/// <summary>
/// Main queue for schema tracking data export jobs (message body: full correlation id path).
/// </summary>
public const string ExportJob = "exportjob";

/// <summary>
/// Queue for export workers that build the updated-rows ZIP from change tracking.
/// </summary>
public const string ExportJobUpdated = "exportjob-updated";

/// <summary>
/// Queue for export workers that build the inserted-rows ZIP from change tracking.
/// </summary>
public const string ExportJobInserted = "exportjob-inserted";

/// <summary>
/// Queue for export workers that build the deleted-rows ZIP from change tracking.
/// </summary>
public const string ExportJobDeleted = "exportjob-deleted";

/// <summary>
/// Signaled when the updated ZIP has been written for an export job.
/// </summary>
public const string ExportJobUpdatedDone = ExportJobUpdated + "-done";

/// <summary>
/// Signaled when the inserted ZIP has been written for an export job.
/// </summary>
public const string ExportJobInsertedDone = ExportJobInserted + "-done";

/// <summary>
/// Signaled when the deleted ZIP has been written for an export job.
/// </summary>
public const string ExportJobDeletedDone = ExportJobDeleted + "-done";

/// <summary>
/// Updated segment only: message body is the correlation id when <c>ProcessExportSegmentAsync</c> throws during processing (SQL/ZIP/blob). Invalid queue bodies are rejected at the trigger with <c>ArgumentException.ThrowIfNullOrEmpty</c>.
/// </summary>
public const string ExportJobUpdatedError = ExportJobUpdated + "-error";

/// <summary>
/// Inserted segment only: correlation id when segment processing throws.
/// </summary>
public const string ExportJobInsertedError = ExportJobInserted + "-error";

/// <summary>
/// Deleted segment only: correlation id when segment processing throws.
/// </summary>
public const string ExportJobDeletedError = ExportJobDeleted + "-error";
}

/// <summary>
/// NCRONTAB expressions and configuration keys for <see cref="Functions.ProcessGlobalChangeTrackingSchedule"/> timer triggers.
/// </summary>
Expand Down Expand Up @@ -63,6 +114,22 @@ public static class Containers
/// Blob container for per-job monitoring aggregates (written by the aggregation timer).
/// </summary>
public const string Monitor = "monitor";

/// <summary>
/// Blob container for schema tracking export requests, jobs, response ZIPs, and result metadata.
/// </summary>
public const string Export = "export";
}

/// <summary>
/// Azure Table Storage table names used by the function app.
/// </summary>
public static class Tables
{
/// <summary>
/// Table storing export job state (partition: area_id_tableId, row: export job guid).
/// </summary>
public const string ExportJobs = "exportjobs";
}

/// <summary>
Expand All @@ -72,5 +139,8 @@ public static class BlobContentTypes
{
/// <summary>JSON documents (UTF-8).</summary>
public const string Json = "application/json; charset=utf-8";

/// <summary>ZIP archives.</summary>
public const string Zip = "application/zip";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
using System;
using System.Globalization;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using SqlBulkSyncFunction.Models.Schema.Export;

namespace SqlBulkSyncFunction.Functions;

#nullable enable

public partial class GetSyncJobConfig
{
private static readonly JsonSerializerOptions ExportJsonOptions = new()
{
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

/// <summary>
/// Accepts a schema tracking data export request, persists metadata to blob and table storage, and enqueues processing.
/// </summary>
[Function(nameof(GetSyncJobConfig) + nameof(PostSchemaTrackingExport))]
public async Task<IActionResult> PostSchemaTrackingExport(
[HttpTrigger(
AuthorizationLevel.Function,
"post",
Route = "config/{area}/{id}/schema/tracking/{tableId}/export"
)]
HttpRequest req,
string area,
string id,
string tableId,
CancellationToken cancellationToken
)
{
ArgumentNullException.ThrowIfNull(req);

SchemaTrackingExportRequestBody? body;
try
{
body = await JsonSerializer
.DeserializeAsync<SchemaTrackingExportRequestBody>(req.Body, ExportJsonOptions, cancellationToken)
.ConfigureAwait(false);
}
catch (JsonException)
{
return new BadRequestObjectResult("Invalid JSON body.");
}

if (body == null)
{
return new BadRequestObjectResult("Body is required.");
}

var result = await schemaTrackingExportService
.TryCreateExportJobAsync(area, id, tableId, body, cancellationToken)
.ConfigureAwait(false);

if (result.Code == ExportJobCreateResultCode.ValidationFailed)
{
return new BadRequestObjectResult("author, referenceId, and purpose must be non-empty strings.");
}

if (result.Code == ExportJobCreateResultCode.NotFound || result.Job == null)
{
return new NotFoundResult();
}

var location = BuildExportStatusLocation(req, area, id, tableId, result.Job.CorrelationId);
return new AcceptedResult(location: location, value: result.Job);
}

/// <summary>
/// Returns detailed status for a single export job when <paramref name="correlationId"/> is present in the path,
/// or lists export jobs for the table when the URL ends at <c>.../export/status</c> (catch-all binds empty).
/// </summary>
/// <remarks>
/// A separate route without <c>{*correlationId}</c> would never win in the host: the catch-all matches the same URL with an empty remainder,
/// and <see cref="SchemaTrackingExportService.TryGetExportStatusAsync"/> returns null for an empty id, producing 404. List behavior is therefore handled here.
/// </remarks>
[Function(nameof(GetSyncJobConfig) + nameof(GetSchemaTrackingExportStatus))]
public async Task<IActionResult> GetSchemaTrackingExportStatus(
[HttpTrigger(
AuthorizationLevel.Function,
"get",
Route = "config/{area}/{id}/schema/tracking/{tableId}/export/status/{*correlationId}"
)]
HttpRequest req,
string area,
string id,
string tableId,
string correlationId,
CancellationToken cancellationToken
)
{
ArgumentNullException.ThrowIfNull(req);

if (string.IsNullOrWhiteSpace(correlationId))
{
if (string.IsNullOrWhiteSpace(area) ||
string.IsNullOrWhiteSpace(id) ||
string.IsNullOrWhiteSpace(tableId) ||
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) != true ||
jobConfig == null ||
!StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig.Area) ||
jobConfig.Tables == null ||
!jobConfig.Tables.TryGetValue(tableId, out _))
{
return new NotFoundResult();
}

var items = await schemaTrackingExportService
.ListExportJobsAsync(area, id, tableId, cancellationToken)
.ConfigureAwait(false);

return new OkObjectResult(items);
}

var status = await schemaTrackingExportService
.TryGetExportStatusAsync(area, id, tableId, correlationId, cancellationToken)
.ConfigureAwait(false);

if (status == null)
{
return new NotFoundResult();
}

return new OkObjectResult(status);
}

private static string BuildExportStatusLocation(
HttpRequest req,
string area,
string jobId,
string tableId,
string correlationId
)
{
var encodedCorrelation = string.Join(
"/",
correlationId.Split('/', StringSplitOptions.RemoveEmptyEntries).Select(Uri.EscapeDataString)
);
var pathBase = req.PathBase.Value?.TrimEnd('/') ?? string.Empty;
var apiRoot = string.IsNullOrEmpty(pathBase) ? "/api" : pathBase;
var path = string.Format(
CultureInfo.InvariantCulture,
"{0}/config/{1}/{2}/schema/tracking/{3}/export/status/{4}",
apiRoot,
Uri.EscapeDataString(area),
Uri.EscapeDataString(jobId),
Uri.EscapeDataString(tableId),
encodedCorrelation
);
return string.Format(CultureInfo.InvariantCulture, "{0}://{1}{2}", req.Scheme, req.Host.Value, path);
}
}
3 changes: 2 additions & 1 deletion src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ namespace SqlBulkSyncFunction.Functions;
public partial class GetSyncJobConfig(
ILogger<GetSyncJobConfig> logger,
IOptions<SyncJobsConfig> syncJobsConfig,
ITokenCacheService tokenCacheService
ITokenCacheService tokenCacheService,
SchemaTrackingExportService schemaTrackingExportService
)
{
}
Loading
Loading