Skip to content

Commit 3acbd1d

Browse files
committed
incremental upload
1 parent b899f45 commit 3acbd1d

3 files changed

Lines changed: 205 additions & 17 deletions

File tree

backend/src/main.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,16 @@ struct ChunkNeedResponse {
134134
missing: Vec<String>,
135135
}
136136

137+
#[derive(Debug, Deserialize)]
138+
struct ContentNeedRequest {
139+
hashes: Vec<String>,
140+
}
141+
142+
#[derive(Debug, Serialize)]
143+
struct ContentNeedResponse {
144+
missing: Vec<String>,
145+
}
146+
137147
#[derive(Debug, Deserialize)]
138148
struct UniqueChunkUploadRequest {
139149
chunks: Vec<UniqueChunk>,
@@ -239,10 +249,12 @@ async fn main() -> Result<()> {
239249
.route("/api/v1/chunks/need", post(chunks_need))
240250
.route("/api/v1/chunks/upload", post(chunks_upload))
241251
.route("/api/v1/mappings/upload", post(mappings_upload))
252+
.route("/api/v1/blobs/need", post(blobs_need))
242253
.route("/api/v1/index/blobs/upload", post(blobs_upload))
243254
.route("/api/v1/index/chunks/need", post(chunks_need))
244255
.route("/api/v1/index/chunks/upload", post(chunks_upload))
245256
.route("/api/v1/index/mappings/upload", post(mappings_upload))
257+
.route("/api/v1/index/blobs/need", post(blobs_need))
246258
.route("/api/v1/manifest/shard", post(manifest_shard))
247259
.route("/api/v1/index/manifest/shard", post(manifest_shard))
248260
// Manifest upload routes
@@ -380,6 +392,33 @@ async fn chunks_need(
380392
Ok(Json(ChunkNeedResponse { missing }))
381393
}
382394

395+
async fn blobs_need(
396+
State(state): State<AppState>,
397+
Json(payload): Json<ContentNeedRequest>,
398+
) -> ApiResult<Json<ContentNeedResponse>> {
399+
if payload.hashes.is_empty() {
400+
return Ok(Json(ContentNeedResponse {
401+
missing: Vec::new(),
402+
}));
403+
}
404+
405+
let existing: Vec<(String,)> =
406+
sqlx::query_as("SELECT hash FROM content_blobs WHERE hash = ANY($1)")
407+
.bind(&payload.hashes)
408+
.fetch_all(&state.pool)
409+
.await
410+
.map_err(ApiErrorKind::from)?;
411+
412+
let present: HashSet<String> = existing.into_iter().map(|row| row.0).collect();
413+
let missing: Vec<String> = payload
414+
.hashes
415+
.into_iter()
416+
.filter(|h| !present.contains(h))
417+
.collect();
418+
419+
Ok(Json(ContentNeedResponse { missing }))
420+
}
421+
383422
async fn chunks_upload(
384423
State(state): State<AppState>,
385424
Json(payload): Json<UniqueChunkUploadRequest>,

indexer/src/cli.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ pub struct IndexArgs {
6060
/// API key used when uploading to the backend (sent as a Bearer token).
6161
#[arg(long)]
6262
pub upload_api_key: Option<String>,
63+
/// Upload all symbol and reference records, even if content hashes already exist.
64+
#[arg(long, action = ArgAction::SetTrue)]
65+
pub full_symbol_upload: bool,
6366
/// Mark this branch as the live branch for the repository.
6467
#[arg(long = "live", action = ArgAction::SetTrue, conflicts_with = "not_live")]
6568
pub live: bool,
@@ -110,7 +113,10 @@ fn run_index(args: IndexArgs) -> Result<()> {
110113

111114
if let Some(url) = args.upload_url.as_deref() {
112115
info!(%url, "uploading index to backend");
113-
upload::upload_index(url, args.upload_api_key.as_deref(), &artifacts)?;
116+
let options = upload::UploadOptions {
117+
incremental_symbols: !args.full_symbol_upload,
118+
};
119+
upload::upload_index_with_options(url, args.upload_api_key.as_deref(), &artifacts, &options)?;
114120
}
115121

116122
info!(repo = repository, output = ?output_dir, files = artifacts.file_pointer_count(), "indexing complete");

indexer/src/upload.rs

Lines changed: 159 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
1414
use tracing::info;
1515
use zstd::stream::Encoder;
1616

17-
use crate::models::{ChunkMapping, IndexArtifacts, UniqueChunk};
17+
use crate::models::{ChunkMapping, IndexArtifacts, ReferenceRecord, SymbolRecord, UniqueChunk};
1818

1919
const REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
2020
const MANIFEST_SHARD_RECORD_LIMIT: usize = 50_000;
@@ -28,13 +28,46 @@ struct ManifestShard {
2828
}
2929

3030
pub fn upload_index(url: &str, api_key: Option<&str>, artifacts: &IndexArtifacts) -> Result<()> {
31+
upload_index_with_options(url, api_key, artifacts, &UploadOptions::default())
32+
}
33+
34+
pub struct UploadOptions {
35+
pub incremental_symbols: bool,
36+
}
37+
38+
impl Default for UploadOptions {
39+
fn default() -> Self {
40+
Self {
41+
incremental_symbols: true,
42+
}
43+
}
44+
}
45+
46+
pub fn upload_index_with_options(
47+
url: &str,
48+
api_key: Option<&str>,
49+
artifacts: &IndexArtifacts,
50+
options: &UploadOptions,
51+
) -> Result<()> {
3152
let client = Client::builder()
3253
.timeout(REQUEST_TIMEOUT)
3354
.build()
3455
.context("failed to build HTTP client")?;
3556

3657
let endpoints = Arc::new(Endpoints::new(url));
3758

59+
let needed_hashes = if options.incremental_symbols {
60+
let content_hashes = collect_content_hashes(artifacts)?;
61+
Some(request_needed_content_hashes(
62+
&client,
63+
&endpoints,
64+
api_key,
65+
&content_hashes,
66+
)?)
67+
} else {
68+
None
69+
};
70+
3871
// 1. Upload all content blob metadata
3972
upload_content_blobs(&client, &endpoints, api_key, artifacts)?;
4073

@@ -60,14 +93,15 @@ pub fn upload_index(url: &str, api_key: Option<&str>, artifacts: &IndexArtifacts
6093

6194
// 5. Upload manifest shards per section
6295
info!("uploading manifest shards");
63-
upload_manifest_shards(&client, &endpoints, api_key, artifacts)?;
96+
upload_manifest_shards(&client, &endpoints, api_key, artifacts, needed_hashes.as_ref())?;
6497

6598
Ok(())
6699
}
67100

68101
#[derive(Clone)]
69102
struct Endpoints {
70103
blobs_upload: String,
104+
blobs_need: String,
71105
chunks_need: String,
72106
chunks_upload: String,
73107
mappings_upload: String,
@@ -79,6 +113,7 @@ impl Endpoints {
79113
let trimmed = base.trim_end_matches('/');
80114
Self {
81115
blobs_upload: format!("{}/blobs/upload", trimmed),
116+
blobs_need: format!("{}/blobs/need", trimmed),
82117
chunks_need: format!("{}/chunks/need", trimmed),
83118
chunks_upload: format!("{}/chunks/upload", trimmed),
84119
mappings_upload: format!("{}/mappings/upload", trimmed),
@@ -160,6 +195,47 @@ fn request_needed_chunks(
160195
Ok(response.missing.into_iter().collect())
161196
}
162197

198+
fn request_needed_content_hashes(
199+
client: &Client,
200+
endpoints: &Arc<Endpoints>,
201+
api_key: Option<&str>,
202+
content_hashes: &[String],
203+
) -> Result<HashSet<String>> {
204+
if content_hashes.is_empty() {
205+
return Ok(HashSet::new());
206+
}
207+
208+
info!(
209+
count = content_hashes.len(),
210+
"checking for needed content hashes"
211+
);
212+
213+
let request = ContentNeedRequest {
214+
hashes: content_hashes.to_vec(),
215+
};
216+
217+
let response: ContentNeedResponse =
218+
post_json(client, &endpoints.blobs_need, api_key, &request)?
219+
.json()
220+
.context("failed to deserialize content need response")?;
221+
222+
info!(needed = response.missing.len(), "found content hashes to upload");
223+
Ok(response.missing.into_iter().collect())
224+
}
225+
226+
fn collect_content_hashes(artifacts: &IndexArtifacts) -> Result<Vec<String>> {
227+
let mut stream = artifacts.content_blobs_stream()?;
228+
let mut hashes = Vec::new();
229+
loop {
230+
let batch = stream.next_batch(1000)?;
231+
if batch.is_empty() {
232+
break;
233+
}
234+
hashes.extend(batch.into_iter().map(|blob| blob.hash));
235+
}
236+
Ok(hashes)
237+
}
238+
163239
fn upload_unique_chunks(
164240
client: &Client,
165241
endpoints: &Arc<Endpoints>,
@@ -267,6 +343,7 @@ fn upload_manifest_shards(
267343
endpoints: &Arc<Endpoints>,
268344
api_key: Option<&str>,
269345
artifacts: &IndexArtifacts,
346+
needed_hashes: Option<&HashSet<String>>,
270347
) -> Result<()> {
271348
upload_record_store_shards(
272349
client,
@@ -276,13 +353,32 @@ fn upload_manifest_shards(
276353
"file_pointer",
277354
)?;
278355

279-
upload_record_store_shards(
280-
client,
281-
endpoints,
282-
api_key,
283-
artifacts.symbol_records_path(),
284-
"symbol_record",
285-
)?;
356+
if let Some(needed) = needed_hashes {
357+
if !needed.is_empty() {
358+
upload_filtered_record_store_shards(
359+
client,
360+
endpoints,
361+
api_key,
362+
artifacts.symbol_records_path(),
363+
"symbol_record",
364+
|line| {
365+
let record: SymbolRecord =
366+
serde_json::from_str(line).context("failed to parse symbol record")?;
367+
Ok(needed.contains(&record.content_hash))
368+
},
369+
)?;
370+
} else {
371+
info!("no new content hashes; skipping symbol record upload");
372+
}
373+
} else {
374+
upload_record_store_shards(
375+
client,
376+
endpoints,
377+
api_key,
378+
artifacts.symbol_records_path(),
379+
"symbol_record",
380+
)?;
381+
}
286382

287383
upload_record_store_shards(
288384
client,
@@ -292,13 +388,32 @@ fn upload_manifest_shards(
292388
"symbol_namespace",
293389
)?;
294390

295-
upload_record_store_shards(
296-
client,
297-
endpoints,
298-
api_key,
299-
artifacts.reference_records_path(),
300-
"reference_record",
301-
)?;
391+
if let Some(needed) = needed_hashes {
392+
if !needed.is_empty() {
393+
upload_filtered_record_store_shards(
394+
client,
395+
endpoints,
396+
api_key,
397+
artifacts.reference_records_path(),
398+
"reference_record",
399+
|line| {
400+
let record: ReferenceRecord = serde_json::from_str(line)
401+
.context("failed to parse reference record")?;
402+
Ok(needed.contains(&record.content_hash))
403+
},
404+
)?;
405+
} else {
406+
info!("no new content hashes; skipping reference record upload");
407+
}
408+
} else {
409+
upload_record_store_shards(
410+
client,
411+
endpoints,
412+
api_key,
413+
artifacts.reference_records_path(),
414+
"reference_record",
415+
)?;
416+
}
302417

303418
upload_branch_heads(client, endpoints, api_key, &artifacts.branches)?;
304419

@@ -318,6 +433,20 @@ fn upload_record_store_shards(
318433
path: &std::path::Path,
319434
section: &str,
320435
) -> Result<()> {
436+
upload_filtered_record_store_shards(client, endpoints, api_key, path, section, |_| Ok(true))
437+
}
438+
439+
fn upload_filtered_record_store_shards<F>(
440+
client: &Client,
441+
endpoints: &Arc<Endpoints>,
442+
api_key: Option<&str>,
443+
path: &std::path::Path,
444+
section: &str,
445+
mut should_include: F,
446+
) -> Result<()>
447+
where
448+
F: FnMut(&str) -> Result<bool>,
449+
{
321450
if !path.exists() {
322451
return Ok(());
323452
}
@@ -368,6 +497,10 @@ fn upload_record_store_shards(
368497
continue;
369498
}
370499

500+
if !should_include(line.trim_end_matches(['\n', '\r']))? {
501+
continue;
502+
}
503+
371504
shard_data.extend_from_slice(line.as_bytes());
372505
records += 1;
373506
}
@@ -561,6 +694,16 @@ struct ChunkNeedResponse {
561694
missing: Vec<String>,
562695
}
563696

697+
#[derive(Serialize)]
698+
struct ContentNeedRequest {
699+
hashes: Vec<String>,
700+
}
701+
702+
#[derive(Deserialize)]
703+
struct ContentNeedResponse {
704+
missing: Vec<String>,
705+
}
706+
564707
#[derive(Serialize)]
565708
struct UniqueChunkUploadRequest {
566709
chunks: Vec<UniqueChunk>,

0 commit comments

Comments
 (0)