Skip to content

Commit cb0695d

Browse files
committed
progress
1 parent 3acbd1d commit cb0695d

1 file changed

Lines changed: 58 additions & 1 deletion

File tree

indexer/src/upload.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ const MANIFEST_SHARD_RECORD_LIMIT: usize = 50_000;
2121
const MANIFEST_SHARD_BYTE_LIMIT: usize = 4 * 1024 * 1024;
2222
const UPLOAD_PARALLELISM: usize = 4;
2323

24+
const PROGRESS_STEP_PERCENT: u8 = 10;
25+
2426
#[derive(Debug)]
2527
struct ManifestShard {
2628
index: u64,
@@ -155,12 +157,17 @@ fn upload_content_blobs(
155157
);
156158
let workers = spawn_workers(rx, worker_func);
157159

160+
let mut processed = 0usize;
161+
let mut last_percent = 0u8;
158162
loop {
159163
let batch = stream.next_batch(1000)?;
160164
if batch.is_empty() {
161165
break;
162166
}
163167

168+
processed = processed.saturating_add(batch.len());
169+
maybe_log_progress("content blobs", processed, artifacts.content_blob_count(), &mut last_percent);
170+
164171
tx.send(batch)
165172
.map_err(|_| anyhow!("content blob upload worker dropped"))?;
166173
}
@@ -270,6 +277,8 @@ fn upload_unique_chunks(
270277
Ok(())
271278
});
272279
let workers = spawn_workers(rx, worker_func);
280+
let mut processed = 0usize;
281+
let mut last_percent = 0u8;
273282
for batch in needed_chunks.chunks(100) {
274283
let mut chunks = Vec::with_capacity(batch.len());
275284
for hash in batch {
@@ -282,6 +291,9 @@ fn upload_unique_chunks(
282291
});
283292
}
284293

294+
processed = processed.saturating_add(chunks.len());
295+
maybe_log_progress("unique chunks", processed, needed_chunks.len(), &mut last_percent);
296+
285297
tx.send(chunks)
286298
.map_err(|_| anyhow!("unique chunk upload worker dropped"))?;
287299
}
@@ -322,12 +334,17 @@ fn upload_chunk_mappings(
322334
Ok(())
323335
});
324336
let workers = spawn_workers(rx, worker_func);
337+
let mut processed = 0usize;
338+
let mut last_percent = 0u8;
325339
loop {
326340
let batch = stream.next_batch(1000)?;
327341
if batch.is_empty() {
328342
break;
329343
}
330344

345+
processed = processed.saturating_add(batch.len());
346+
maybe_log_progress("chunk mappings", processed, artifacts.chunk_mapping_count(), &mut last_percent);
347+
331348
tx.send(batch)
332349
.map_err(|_| anyhow!("chunk mapping upload worker dropped"))?;
333350
}
@@ -351,6 +368,7 @@ fn upload_manifest_shards(
351368
api_key,
352369
artifacts.file_pointers_path(),
353370
"file_pointer",
371+
artifacts.file_pointer_count(),
354372
)?;
355373

356374
if let Some(needed) = needed_hashes {
@@ -361,6 +379,7 @@ fn upload_manifest_shards(
361379
api_key,
362380
artifacts.symbol_records_path(),
363381
"symbol_record",
382+
Some(artifacts.symbol_record_count()),
364383
|line| {
365384
let record: SymbolRecord =
366385
serde_json::from_str(line).context("failed to parse symbol record")?;
@@ -377,6 +396,7 @@ fn upload_manifest_shards(
377396
api_key,
378397
artifacts.symbol_records_path(),
379398
"symbol_record",
399+
artifacts.symbol_record_count(),
380400
)?;
381401
}
382402

@@ -386,6 +406,7 @@ fn upload_manifest_shards(
386406
api_key,
387407
artifacts.symbol_namespaces_path(),
388408
"symbol_namespace",
409+
artifacts.symbol_namespace_count(),
389410
)?;
390411

391412
if let Some(needed) = needed_hashes {
@@ -396,6 +417,7 @@ fn upload_manifest_shards(
396417
api_key,
397418
artifacts.reference_records_path(),
398419
"reference_record",
420+
Some(artifacts.reference_record_count()),
399421
|line| {
400422
let record: ReferenceRecord = serde_json::from_str(line)
401423
.context("failed to parse reference record")?;
@@ -412,6 +434,7 @@ fn upload_manifest_shards(
412434
api_key,
413435
artifacts.reference_records_path(),
414436
"reference_record",
437+
artifacts.reference_record_count(),
415438
)?;
416439
}
417440

@@ -432,8 +455,17 @@ fn upload_record_store_shards(
432455
api_key: Option<&str>,
433456
path: &std::path::Path,
434457
section: &str,
458+
total_records: usize,
435459
) -> Result<()> {
436-
upload_filtered_record_store_shards(client, endpoints, api_key, path, section, |_| Ok(true))
460+
upload_filtered_record_store_shards(
461+
client,
462+
endpoints,
463+
api_key,
464+
path,
465+
section,
466+
Some(total_records),
467+
|_| Ok(true),
468+
)
437469
}
438470

439471
fn upload_filtered_record_store_shards<F>(
@@ -442,6 +474,7 @@ fn upload_filtered_record_store_shards<F>(
442474
api_key: Option<&str>,
443475
path: &std::path::Path,
444476
section: &str,
477+
total_records: Option<usize>,
445478
mut should_include: F,
446479
) -> Result<()>
447480
where
@@ -477,6 +510,8 @@ where
477510
let mut line = String::new();
478511
let mut shard_index: u64 = 0;
479512
let mut eof = false;
513+
let mut processed_records: usize = 0;
514+
let mut last_percent = 0u8;
480515

481516
while !eof {
482517
let mut shard_data = Vec::with_capacity(MANIFEST_SHARD_BYTE_LIMIT + 1024);
@@ -497,6 +532,11 @@ where
497532
continue;
498533
}
499534

535+
processed_records = processed_records.saturating_add(1);
536+
if let Some(total) = total_records {
537+
maybe_log_progress(section, processed_records, total, &mut last_percent);
538+
}
539+
500540
if !should_include(line.trim_end_matches(['\n', '\r']))? {
501541
continue;
502542
}
@@ -679,6 +719,23 @@ where
679719
WorkerGroup { handles }
680720
}
681721

722+
fn maybe_log_progress(label: &str, processed: usize, total: usize, last_percent: &mut u8) {
723+
if total == 0 {
724+
return;
725+
}
726+
727+
let mut percent = (processed.saturating_mul(100) / total) as u8;
728+
if percent > 100 {
729+
percent = 100;
730+
}
731+
let should_log = percent >= last_percent.saturating_add(PROGRESS_STEP_PERCENT) || percent == 100;
732+
733+
if should_log {
734+
*last_percent = percent;
735+
info!(label = label, percent, processed, total, "upload progress");
736+
}
737+
}
738+
682739
#[derive(Serialize)]
683740
struct ContentBlobUploadRequest {
684741
blobs: Vec<crate::models::ContentBlob>,

0 commit comments

Comments
 (0)