Skip to content

Commit 74fc46c

Browse files
authored
GCS client library migration in Java SDK - part 2b (#37592)
* Add copy() and remove() for GcsUtil V2. * Add tests and modify copy and remove to take strategies * Add deprecated annotations to V1 copy and remove. * Refactor MissingStrategy and OverwriteStrategy enums. Add rewriteHelper() and move(). * Add rename tests and refacor copy and remove tests. * Refactor rename * Add experimental annotations to the new copy, remove and rename * Remove unused import. * Fix style. * Trigger post commit java for the integration tests of GcsUtil. * Revise according to reviews.
1 parent e87bb29 commit 74fc46c

5 files changed

Lines changed: 525 additions & 2 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 4
3+
"modification": 6
44
}

sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,21 @@
3737
import java.nio.channels.WritableByteChannel;
3838
import java.util.Collection;
3939
import java.util.List;
40+
import java.util.Set;
4041
import java.util.concurrent.ExecutorService;
4142
import java.util.function.Supplier;
4243
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
4344
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult;
45+
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
46+
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
4447
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
4548
import org.apache.beam.sdk.io.fs.MoveOptions;
49+
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
4650
import org.apache.beam.sdk.options.DefaultValueFactory;
4751
import org.apache.beam.sdk.options.ExperimentalOptions;
4852
import org.apache.beam.sdk.options.PipelineOptions;
4953
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
54+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
5055
import org.checkerframework.checker.nullness.qual.Nullable;
5156

5257
public class GcsUtil {
@@ -433,12 +438,65 @@ public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
433438
delegate.copy(srcFilenames, destFilenames);
434439
}
435440

441+
/** experimental api. */
442+
public void copyV2(Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths) throws IOException {
443+
copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE);
444+
}
445+
446+
/** experimental api. */
447+
public void copy(
448+
Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, OverwriteStrategy strategy)
449+
throws IOException {
450+
if (delegateV2 != null) {
451+
delegateV2.copy(srcPaths, dstPaths, strategy);
452+
} else {
453+
throw new IOException("GcsUtil V2 not initialized.");
454+
}
455+
}
456+
436457
public void rename(
437458
Iterable<String> srcFilenames, Iterable<String> destFilenames, MoveOptions... moveOptions)
438459
throws IOException {
439460
delegate.rename(srcFilenames, destFilenames, moveOptions);
440461
}
441462

463+
/** experimental api. */
464+
public void renameV2(
465+
Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, MoveOptions... moveOptions)
466+
throws IOException {
467+
Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
468+
final MissingStrategy srcMissing;
469+
final OverwriteStrategy dstOverwrite;
470+
471+
if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) {
472+
srcMissing = MissingStrategy.SKIP_IF_MISSING;
473+
} else {
474+
srcMissing = MissingStrategy.FAIL_IF_MISSING;
475+
}
476+
477+
if (moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) {
478+
dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS;
479+
} else {
480+
dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE;
481+
}
482+
483+
rename(srcPaths, dstPaths, srcMissing, dstOverwrite);
484+
}
485+
486+
/** experimental api. */
487+
public void rename(
488+
Iterable<GcsPath> srcPaths,
489+
Iterable<GcsPath> dstPaths,
490+
MissingStrategy srcMissing,
491+
OverwriteStrategy dstOverwrite)
492+
throws IOException {
493+
if (delegateV2 != null) {
494+
delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite);
495+
} else {
496+
throw new IOException("GcsUtil V2 not initialized.");
497+
}
498+
}
499+
442500
@VisibleForTesting
443501
@SuppressWarnings("JdkObsolete") // for LinkedList
444502
java.util.LinkedList<GcsUtilV1.RewriteOp> makeRewriteOps(
@@ -469,6 +527,20 @@ public void remove(Collection<String> filenames) throws IOException {
469527
delegate.remove(filenames);
470528
}
471529

530+
/** experimental api. */
531+
public void removeV2(Iterable<GcsPath> paths) throws IOException {
532+
remove(paths, MissingStrategy.SKIP_IF_MISSING);
533+
}
534+
535+
/** experimental api. */
536+
public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws IOException {
537+
if (delegateV2 != null) {
538+
delegateV2.remove(paths, strategy);
539+
} else {
540+
throw new IOException("GcsUtil V2 not initialized.");
541+
}
542+
}
543+
472544
@SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION")
473545
public static class StorageObjectOrIOException {
474546
final GcsUtilV1.StorageObjectOrIOException delegate;

sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,24 @@
1818
package org.apache.beam.sdk.extensions.gcp.util;
1919

2020
import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2122
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2223

2324
import com.google.api.gax.paging.Page;
2425
import com.google.auto.value.AutoValue;
2526
import com.google.cloud.storage.Blob;
27+
import com.google.cloud.storage.BlobId;
28+
import com.google.cloud.storage.BlobInfo;
2629
import com.google.cloud.storage.Bucket;
2730
import com.google.cloud.storage.BucketInfo;
31+
import com.google.cloud.storage.CopyWriter;
2832
import com.google.cloud.storage.Storage;
2933
import com.google.cloud.storage.Storage.BlobField;
3034
import com.google.cloud.storage.Storage.BlobGetOption;
3135
import com.google.cloud.storage.Storage.BlobListOption;
3236
import com.google.cloud.storage.Storage.BucketField;
3337
import com.google.cloud.storage.Storage.BucketGetOption;
38+
import com.google.cloud.storage.Storage.CopyRequest;
3439
import com.google.cloud.storage.StorageBatch;
3540
import com.google.cloud.storage.StorageBatchResult;
3641
import com.google.cloud.storage.StorageException;
@@ -71,18 +76,27 @@ public GcsUtilV2 create(PipelineOptions options) {
7176
/** Maximum number of requests permitted in a GCS batch request. */
7277
private static final int MAX_REQUESTS_PER_BATCH = 100;
7378

79+
/**
80+
* Limit the number of bytes Cloud Storage will attempt to copy before responding to an individual
81+
* request. If you see Read Timeout errors, try reducing this value.
82+
*/
83+
private static final long MEGABYTES_COPIED_PER_CHUNK = 2048L;
84+
7485
GcsUtilV2(PipelineOptions options) {
7586
String projectId = options.as(GcpOptions.class).getProject();
7687
storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
7788
}
7889

7990
@SuppressWarnings({
80-
"nullness" // For Creating AccessDeniedException and FileAlreadyExistsException with null.
91+
"nullness" // For Creating AccessDeniedException FileNotFoundException, and
92+
// FileAlreadyExistsException with null.
8193
})
8294
private IOException translateStorageException(GcsPath gcsPath, StorageException e) {
8395
switch (e.getCode()) {
8496
case 403:
8597
return new AccessDeniedException(gcsPath.toString(), null, e.getMessage());
98+
case 404:
99+
return new FileNotFoundException(e.getMessage());
86100
case 409:
87101
return new FileAlreadyExistsException(gcsPath.toString(), null, e.getMessage());
88102
default:
@@ -259,6 +273,151 @@ public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
259273
return results;
260274
}
261275

276+
public enum MissingStrategy {
277+
FAIL_IF_MISSING,
278+
SKIP_IF_MISSING,
279+
}
280+
281+
public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws IOException {
282+
for (List<GcsPath> pathPartition :
283+
Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
284+
285+
// Create a new empty batch every time
286+
StorageBatch batch = storage.batch();
287+
List<StorageBatchResult<Boolean>> batchResultFutures = new ArrayList<>();
288+
289+
for (GcsPath path : pathPartition) {
290+
batchResultFutures.add(batch.delete(path.getBucket(), path.getObject()));
291+
}
292+
batch.submit();
293+
294+
for (int i = 0; i < batchResultFutures.size(); i++) {
295+
StorageBatchResult<Boolean> future = batchResultFutures.get(i);
296+
try {
297+
Boolean deleted = future.get();
298+
if (!deleted) {
299+
if (strategy == MissingStrategy.FAIL_IF_MISSING) {
300+
throw new FileNotFoundException(
301+
String.format(
302+
"The specified file does not exist: %s", pathPartition.get(i).toString()));
303+
} else {
304+
LOG.warn("Ignoring failed deletion on file {}.", pathPartition.get(i).toString());
305+
}
306+
}
307+
} catch (StorageException e) {
308+
throw translateStorageException(pathPartition.get(i), e);
309+
}
310+
}
311+
}
312+
}
313+
314+
public enum OverwriteStrategy {
315+
FAIL_IF_EXISTS, // Fail if target exists
316+
SKIP_IF_EXISTS, // Skip if target exists
317+
SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic)
318+
ALWAYS_OVERWRITE // Overwrite regardless of state
319+
}
320+
321+
private void rewriteHelper(
322+
Iterable<GcsPath> srcPaths,
323+
Iterable<GcsPath> dstPaths,
324+
boolean deleteSrc,
325+
MissingStrategy srcMissing,
326+
OverwriteStrategy dstOverwrite)
327+
throws IOException {
328+
List<GcsPath> srcList = Lists.newArrayList(srcPaths);
329+
List<GcsPath> dstList = Lists.newArrayList(dstPaths);
330+
checkArgument(
331+
srcList.size() == dstList.size(),
332+
"Number of source files %s must equal number of destination files %s",
333+
srcList.size(),
334+
dstList.size());
335+
336+
for (int i = 0; i < srcList.size(); i++) {
337+
GcsPath srcPath = srcList.get(i);
338+
GcsPath dstPath = dstList.get(i);
339+
BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject());
340+
BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject());
341+
342+
CopyRequest.Builder copyRequestBuilder =
343+
CopyRequest.newBuilder()
344+
.setSource(srcId)
345+
.setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK);
346+
347+
if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) {
348+
copyRequestBuilder.setTarget(dstId);
349+
} else {
350+
// FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking the target blob
351+
BlobInfo existingTarget;
352+
try {
353+
existingTarget = storage.get(dstId);
354+
} catch (StorageException e) {
355+
throw translateStorageException(dstPath, e);
356+
}
357+
358+
if (existingTarget == null) {
359+
copyRequestBuilder.setTarget(dstId, Storage.BlobTargetOption.doesNotExist());
360+
} else {
361+
switch (dstOverwrite) {
362+
case SKIP_IF_EXISTS:
363+
LOG.warn("Ignoring rewriting from {} to {} because target exists.", srcPath, dstPath);
364+
continue; // Skip to next file in for-loop
365+
366+
case SAFE_OVERWRITE:
367+
copyRequestBuilder.setTarget(
368+
dstId, Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()));
369+
break;
370+
371+
case FAIL_IF_EXISTS:
372+
throw new FileAlreadyExistsException(
373+
srcPath.toString(),
374+
dstPath.toString(),
375+
"Target object already exists and strategy is FAIL_IF_EXISTS");
376+
default:
377+
throw new IllegalStateException("Unknown OverwriteStrategy: " + dstOverwrite);
378+
}
379+
}
380+
}
381+
382+
try {
383+
CopyWriter copyWriter = storage.copy(copyRequestBuilder.build());
384+
copyWriter.getResult();
385+
386+
if (deleteSrc) {
387+
if (!storage.delete(srcId)) {
388+
// This may happen if the source file is deleted by another process after copy.
389+
LOG.warn(
390+
"Source file {} could not be deleted after move to {}. It may not have existed.",
391+
srcPath,
392+
dstPath);
393+
}
394+
}
395+
} catch (StorageException e) {
396+
if (e.getCode() == 404 && srcMissing == MissingStrategy.SKIP_IF_MISSING) {
397+
LOG.warn(
398+
"Ignoring rewriting from {} to {} because source does not exist.", srcPath, dstPath);
399+
continue;
400+
}
401+
throw translateStorageException(srcPath, e);
402+
}
403+
}
404+
}
405+
406+
public void copy(
407+
Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, OverwriteStrategy strategy)
408+
throws IOException {
409+
rewriteHelper(srcPaths, dstPaths, false, MissingStrategy.FAIL_IF_MISSING, strategy);
410+
}
411+
412+
public void move(
413+
Iterable<GcsPath> srcPaths,
414+
Iterable<GcsPath> dstPaths,
415+
MissingStrategy srcMissing,
416+
OverwriteStrategy dstOverwrite)
417+
throws IOException {
418+
rewriteHelper(srcPaths, dstPaths, true, srcMissing, dstOverwrite);
419+
}
420+
262421
/** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */
263422
public Bucket getBucket(GcsPath path, BucketGetOption... options) throws IOException {
264423
String bucketName = path.getBucket();

0 commit comments

Comments
 (0)