diff --git a/src/main/java/land/oras/CopyUtils.java b/src/main/java/land/oras/CopyUtils.java index f6e05447..78749020 100644 --- a/src/main/java/land/oras/CopyUtils.java +++ b/src/main/java/land/oras/CopyUtils.java @@ -21,6 +21,7 @@ */ import java.util.Objects; +import java.util.concurrent.CompletableFuture; import land.oras.exception.OrasException; import org.jspecify.annotations.NonNull; import org.slf4j.Logger; @@ -107,17 +108,20 @@ void copyLayers( OCI target, TargetRefType targetRef, String contentType) { - for (Layer layer : source.collectLayers(sourceRef, contentType, true)) { - Objects.requireNonNull(layer.getDigest(), "Layer digest is required for streaming copy"); - Objects.requireNonNull(layer.getSize(), "Layer size is required for streaming copy"); - LOG.debug("Copying layer {}", layer.getDigest()); - target.pushBlob( - targetRef.withDigest(layer.getDigest()), - layer.getSize(), - () -> source.fetchBlob(sourceRef.withDigest(layer.getDigest())), - layer.getAnnotations()); - LOG.debug("Copied layer {}", layer.getDigest()); - } + CompletableFuture.allOf(source.collectLayers(sourceRef, contentType, true).stream() + .map(layer -> { + Objects.requireNonNull(layer.getDigest(), "Layer digest is required for streaming copy"); + Objects.requireNonNull(layer.getSize(), "Layer size is required for streaming copy"); + return CompletableFuture.runAsync( + () -> target.pushBlob( + targetRef.withDigest(layer.getDigest()), + layer.getSize(), + () -> source.fetchBlob(sourceRef.withDigest(layer.getDigest())), + layer.getAnnotations()), + source.getExecutorService()); + }) + .toArray(CompletableFuture[]::new)) + .join(); } /** diff --git a/src/main/java/land/oras/OCI.java b/src/main/java/land/oras/OCI.java index 8a849be4..5eff5633 100644 --- a/src/main/java/land/oras/OCI.java +++ b/src/main/java/land/oras/OCI.java @@ -27,11 +27,14 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; -import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import land.oras.exception.OrasException; import land.oras.utils.ArchiveUtils; @@ -170,77 +173,15 @@ protected List collectLayers(T ref, String contentType, boolean includeAl * @return The layers */ protected final List pushLayers(T ref, Annotations annotations, boolean withDigest, LocalPath... paths) { - List layers = new ArrayList<>(); - for (LocalPath path : paths) { - try { - // Create tar.gz archive for directory - if (Files.isDirectory(path.getPath())) { - SupportedCompression compression = SupportedCompression.fromMediaType(path.getMediaType()); - - // If source need to be packed first - boolean autoUnpack = compression.isAutoUnpack(); - LocalPath tempSource = autoUnpack ? ArchiveUtils.tar(path) : path; - LocalPath tempArchive = ArchiveUtils.compress(tempSource, path.getMediaType()); - - if (withDigest) { - ref = ref.withDigest(ref.getAlgorithm().digest(tempArchive.getPath())); - } - try (InputStream is = Files.newInputStream(tempArchive.getPath())) { - String title = path.getPath().isAbsolute() - ? path.getPath().getFileName().toString() - : path.getPath().toString(); - - // We store the filename, based on directory name if we don't auto unpack - if (!autoUnpack) { - title = "%s.%s".formatted(title, compression.getFileExtension()); - } - LOG.debug("Uploading directory as archive with title: {}", title); - - Map layerAnnotations = annotations.hasFileAnnotations(title) - ? annotations.getFileAnnotations(title) - : new LinkedHashMap<>(Map.of(Const.ANNOTATION_TITLE, title)); - - // Add oras digest/unpack - // For example zip can be packed application/zip but never unpacked by the runtime - // This is convenience method to pack zip layer as directories - if (compression.isAutoUnpack()) { - layerAnnotations.put( - Const.ANNOTATION_ORAS_CONTENT_DIGEST, - ref.getAlgorithm().digest(tempSource.getPath())); - layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "true"); - } else { - layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "false"); - } - - Layer layer = pushBlob(ref, is) - .withMediaType(path.getMediaType()) - .withAnnotations(layerAnnotations); - layers.add(layer); - LOG.info("Uploaded directory: {}", layer.getDigest()); - } - Files.delete(tempArchive.getPath()); - } else { - try (InputStream is = Files.newInputStream(path.getPath())) { - if (withDigest) { - ref = ref.withDigest(ref.getAlgorithm().digest(path.getPath())); - } - String title = path.getPath().getFileName().toString(); - Map layerAnnotations = annotations.hasFileAnnotations(title) - ? annotations.getFileAnnotations(title) - : Map.of(Const.ANNOTATION_TITLE, title); - - Layer layer = pushBlob(ref, is) - .withMediaType(path.getMediaType()) - .withAnnotations(layerAnnotations); - layers.add(layer); - LOG.info("Uploaded: {}", layer.getDigest()); - } - } - } catch (IOException e) { - throw new OrasException("Failed to push artifact", e); - } + try { + return Arrays.stream(paths) + .map(p -> CompletableFuture.supplyAsync( + () -> pushLayer(ref, annotations, withDigest, p), getExecutorService())) + .map(CompletableFuture::join) + .toList(); + } catch (CompletionException e) { + throw new OrasException("Failed to push layers", e.getCause()); } - return layers; } /** @@ -307,6 +248,12 @@ public final Manifest attachArtifact(T ref, ArtifactType artifactType, LocalPath */ public abstract Tags getTags(T ref); + /** + * Get the executor service for concurrent operations. This is used for concurrent pushing and pulling of layers. + * @return The executor service + */ + public abstract ExecutorService getExecutorService(); + /** * Get the tags for a ref * @param ref The ref @@ -483,4 +430,72 @@ public Manifest attachArtifact(T ref, ArtifactType artifactType, Annotations ann SupportedAlgorithm.getDefault().digest(manifest.toJson().getBytes(StandardCharsets.UTF_8))), manifest); } + + protected Layer pushLayer(T ref, Annotations annotations, boolean withDigest, LocalPath path) { + try { + // Create tar.gz archive for directory + if (Files.isDirectory(path.getPath())) { + SupportedCompression compression = SupportedCompression.fromMediaType(path.getMediaType()); + + // If source need to be packed first + boolean autoUnpack = compression.isAutoUnpack(); + LocalPath tempSource = autoUnpack ? ArchiveUtils.tar(path) : path; + LocalPath tempArchive = ArchiveUtils.compress(tempSource, path.getMediaType()); + + if (withDigest) { + ref = ref.withDigest(ref.getAlgorithm().digest(tempArchive.getPath())); + } + try (InputStream is = Files.newInputStream(tempArchive.getPath())) { + String title = path.getPath().isAbsolute() + ? path.getPath().getFileName().toString() + : path.getPath().toString(); + + // We store the filename, based on directory name if we don't auto unpack + if (!autoUnpack) { + title = "%s.%s".formatted(title, compression.getFileExtension()); + } + LOG.debug("Uploading directory as archive with title: {}", title); + + Map layerAnnotations = annotations.hasFileAnnotations(title) + ? annotations.getFileAnnotations(title) + : new LinkedHashMap<>(Map.of(Const.ANNOTATION_TITLE, title)); + + // Add oras digest/unpack + // For example zip can be packed application/zip but never unpacked by the runtime + // This is convenience method to pack zip layer as directories + if (compression.isAutoUnpack()) { + layerAnnotations.put( + Const.ANNOTATION_ORAS_CONTENT_DIGEST, + ref.getAlgorithm().digest(tempSource.getPath())); + layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "true"); + } else { + layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "false"); + } + + Layer layer = + pushBlob(ref, is).withMediaType(path.getMediaType()).withAnnotations(layerAnnotations); + LOG.info("Uploaded directory: {}", layer.getDigest()); + Files.delete(tempArchive.getPath()); + return layer; + } + } else { + try (InputStream is = Files.newInputStream(path.getPath())) { + if (withDigest) { + ref = ref.withDigest(ref.getAlgorithm().digest(path.getPath())); + } + String title = path.getPath().getFileName().toString(); + Map layerAnnotations = annotations.hasFileAnnotations(title) + ? annotations.getFileAnnotations(title) + : Map.of(Const.ANNOTATION_TITLE, title); + + Layer layer = + pushBlob(ref, is).withMediaType(path.getMediaType()).withAnnotations(layerAnnotations); + LOG.info("Uploaded: {}", layer.getDigest()); + return layer; + } + } + } catch (IOException e) { + throw new OrasException("Failed to push artifact", e); + } + } } diff --git a/src/main/java/land/oras/OCILayout.java b/src/main/java/land/oras/OCILayout.java index 7c855bb5..7b741554 100644 --- a/src/main/java/land/oras/OCILayout.java +++ b/src/main/java/land/oras/OCILayout.java @@ -29,6 +29,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Supplier; import land.oras.exception.OrasException; import land.oras.utils.Const; @@ -45,6 +47,8 @@ public final class OCILayout extends OCI { @SuppressWarnings("all") private final String imageLayoutVersion = "1.0.0"; + private final ExecutorService executors = Executors.newSingleThreadExecutor(); + /** * Path on the file system of the OCI Layout */ @@ -63,6 +67,11 @@ public static OCILayout.Builder builder() { return OCILayout.Builder.builder(); } + @Override + public ExecutorService getExecutorService() { + return executors; + } + @Override public Manifest pushArtifact( LayoutRef ref, diff --git a/src/main/java/land/oras/Registry.java b/src/main/java/land/oras/Registry.java index 73d4041d..06c4be9b 100644 --- a/src/main/java/land/oras/Registry.java +++ b/src/main/java/land/oras/Registry.java @@ -33,6 +33,9 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Supplier; import land.oras.auth.AuthProvider; import land.oras.auth.AuthStoreAuthenticationProvider; @@ -55,6 +58,16 @@ @NullMarked public final class Registry extends OCI { + /** + * Max concurrent downloads and upload for blobs + */ + private int maxConcurrentDownloads = 1; + + /** + * The executor service for parallel operations + */ + private ExecutorService executors; + /** * The registries configuration loaded from the environment */ @@ -118,6 +131,14 @@ private void setInsecure(boolean insecure) { this.insecure = insecure; } + /** + * Set the max concurrent downloads and uploads for blobs. Default to number of available processors + * @param maxConcurrentDownloads Max concurrent downloads and uploads for blobs + */ + private void setParallelism(int maxConcurrentDownloads) { + this.maxConcurrentDownloads = maxConcurrentDownloads; + } + /** * Return if this registry is insecure * @return True if insecure @@ -156,6 +177,11 @@ private void setSkipTlsVerify(boolean skipTlsVerify) { */ private Registry build() { client = HttpClient.Builder.builder().withSkipTlsVerify(skipTlsVerify).build(); + executors = Executors.newFixedThreadPool(maxConcurrentDownloads, r -> { + Thread t = new Thread(r); + t.setName("layer-transfer-worker-%d".formatted(t.getId())); + return t; + }); return this; } @@ -192,6 +218,11 @@ public Registry asInsecure() { return registry; } + @Override + public ExecutorService getExecutorService() { + return executors; + } + @Override public Tags getTags(ContainerRef containerRef) { ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this); @@ -374,45 +405,17 @@ public void pullArtifact(ContainerRef containerRef, Path path, boolean overwrite LOG.info("Skipped pulling layers without file name in '{}'", Const.ANNOTATION_TITLE); return; } - for (Layer layer : layers) { - if (!layer.getAnnotations().containsKey(Const.ANNOTATION_TITLE)) { - LOG.info("Skipped pulling layer without file name in '{}'", Const.ANNOTATION_TITLE); - continue; - } - try (InputStream is = fetchBlob(ref.withDigest(layer.getDigest()))) { - // Unpack or just copy blob - if (Boolean.parseBoolean(layer.getAnnotations().getOrDefault(Const.ANNOTATION_ORAS_UNPACK, "false"))) { - LOG.debug("Extracting blob to: {}", path); - - // Uncompress the tar.gz archive and verify digest if present - LocalPath tempArchive = ArchiveUtils.uncompress(is, layer.getMediaType()); - String expectedDigest = layer.getAnnotations().get(Const.ANNOTATION_ORAS_CONTENT_DIGEST); - if (expectedDigest != null) { - LOG.trace("Expected digest: {}", expectedDigest); - String actualDigest = ref.getAlgorithm().digest(tempArchive.getPath()); - LOG.trace("Actual digest: {}", actualDigest); - if (!expectedDigest.equals(actualDigest)) { - throw new OrasException( - "Digest mismatch: expected %s but got %s".formatted(expectedDigest, actualDigest)); - } - } - - // Extract the tar - ArchiveUtils.untar(Files.newInputStream(tempArchive.getPath()), path); - - } else { - Path targetPath = path.resolve(layer.getAnnotations().get(Const.ANNOTATION_TITLE)); - if (Files.exists(targetPath) && !overwrite) { - LOG.info("File already exists: {}", targetPath); - continue; - } - LOG.debug("Copying blob to: {}", targetPath); - Files.copy(is, targetPath, StandardCopyOption.REPLACE_EXISTING); - } - } catch (IOException e) { - throw new OrasException("Failed to pull artifact", e); - } + if (layers.stream().noneMatch(layer -> layer.getAnnotations().containsKey(Const.ANNOTATION_TITLE))) { + LOG.info("Skipped pulling artifact without file name in '{}'", Const.ANNOTATION_TITLE); + return; } + // Pull layers in parallel + CompletableFuture.allOf(layers.stream() + .filter(layer -> layer.getAnnotations().containsKey(Const.ANNOTATION_TITLE)) + .map(layer -> CompletableFuture.runAsync( + () -> pullLayer(ref, layer, path, overwrite), getExecutorService())) + .toArray(CompletableFuture[]::new)) + .join(); } @Override @@ -948,6 +951,43 @@ String getContentType(ContainerRef containerRef) { return getResolvedHeaders(containerRef).headers().get(Const.CONTENT_TYPE_HEADER.toLowerCase()); } + private void pullLayer(ContainerRef ref, Layer layer, Path path, boolean overwrite) { + Objects.requireNonNull(layer.getDigest()); + try (InputStream is = fetchBlob(ref.withDigest(layer.getDigest()))) { + // Unpack or just copy blob + if (Boolean.parseBoolean(layer.getAnnotations().getOrDefault(Const.ANNOTATION_ORAS_UNPACK, "false"))) { + LOG.debug("Extracting blob to: {}", path); + + // Uncompress the tar.gz archive and verify digest if present + LocalPath tempArchive = ArchiveUtils.uncompress(is, layer.getMediaType()); + String expectedDigest = layer.getAnnotations().get(Const.ANNOTATION_ORAS_CONTENT_DIGEST); + if (expectedDigest != null) { + LOG.trace("Expected digest: {}", expectedDigest); + String actualDigest = ref.getAlgorithm().digest(tempArchive.getPath()); + LOG.trace("Actual digest: {}", actualDigest); + if (!expectedDigest.equals(actualDigest)) { + throw new OrasException( + "Digest mismatch: expected %s but got %s".formatted(expectedDigest, actualDigest)); + } + } + + // Extract the tar + ArchiveUtils.untar(Files.newInputStream(tempArchive.getPath()), path); + + } else { + Path targetPath = path.resolve(layer.getAnnotations().get(Const.ANNOTATION_TITLE)); + if (Files.exists(targetPath) && !overwrite) { + LOG.info("File already exists: {}", targetPath); + return; + } + LOG.debug("Copying blob to: {}", targetPath); + Files.copy(is, targetPath, StandardCopyOption.REPLACE_EXISTING); + } + } catch (IOException e) { + throw new OrasException("Failed to pull artifact", e); + } + } + /** * Append digest to location header returned from upload post * @param location The location header from upload post @@ -1068,6 +1108,7 @@ public Builder from(Registry registry) { this.registry.setInsecure(registry.insecure); this.registry.setRegistry(registry.registry); this.registry.setSkipTlsVerify(registry.skipTlsVerify); + this.registry.setParallelism(registry.maxConcurrentDownloads); return this; } @@ -1144,6 +1185,16 @@ public Builder withAuthProvider(AuthProvider authProvider) { return this; } + /** + * Set the maximum number of concurrent downloads when pulling an artifact with multiple layers. Default is 4. + * @param parallelism The maximum number of parallel uploads/download + * @return The builder + */ + public Builder withParallelism(int parallelism) { + registry.setParallelism(parallelism); + return this; + } + /** * Set the insecure flag * @param insecure Insecure diff --git a/src/test/java/land/oras/DockerIoITCase.java b/src/test/java/land/oras/DockerIoITCase.java index f698f3b5..882e3800 100644 --- a/src/test/java/land/oras/DockerIoITCase.java +++ b/src/test/java/land/oras/DockerIoITCase.java @@ -106,11 +106,13 @@ void shouldPullOneBlob() { void shouldCopyTagToInternalRegistry() { // Source registry - Registry sourceRegistry = Registry.Builder.builder().defaults().build(); + Registry sourceRegistry = + Registry.Builder.builder().withParallelism(10).defaults().build(); // Copy to this internal registry Registry targetRegistry = Registry.Builder.builder() .defaults("myuser", "mypass") + .withParallelism(10) .withInsecure(true) .build();