Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions src/main/java/land/oras/CopyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import land.oras.exception.OrasException;
import org.jspecify.annotations.NonNull;
import org.slf4j.Logger;
Expand Down Expand Up @@ -107,17 +108,20 @@ void copyLayers(
OCI<TargetRefType> target,
TargetRefType targetRef,
String contentType) {
for (Layer layer : source.collectLayers(sourceRef, contentType, true)) {
Objects.requireNonNull(layer.getDigest(), "Layer digest is required for streaming copy");
Objects.requireNonNull(layer.getSize(), "Layer size is required for streaming copy");
LOG.debug("Copying layer {}", layer.getDigest());
target.pushBlob(
targetRef.withDigest(layer.getDigest()),
layer.getSize(),
() -> source.fetchBlob(sourceRef.withDigest(layer.getDigest())),
layer.getAnnotations());
LOG.debug("Copied layer {}", layer.getDigest());
}
CompletableFuture.allOf(source.collectLayers(sourceRef, contentType, true).stream()
.map(layer -> {
Objects.requireNonNull(layer.getDigest(), "Layer digest is required for streaming copy");
Objects.requireNonNull(layer.getSize(), "Layer size is required for streaming copy");
return CompletableFuture.runAsync(
() -> target.pushBlob(
targetRef.withDigest(layer.getDigest()),
layer.getSize(),
() -> source.fetchBlob(sourceRef.withDigest(layer.getDigest())),
layer.getAnnotations()),
source.getExecutorService());
})
.toArray(CompletableFuture[]::new))
.join();
}

/**
Expand Down
157 changes: 86 additions & 71 deletions src/main/java/land/oras/OCI.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import land.oras.exception.OrasException;
import land.oras.utils.ArchiveUtils;
Expand Down Expand Up @@ -170,77 +173,15 @@ protected List<Layer> collectLayers(T ref, String contentType, boolean includeAl
* @return The layers
*/
protected final List<Layer> pushLayers(T ref, Annotations annotations, boolean withDigest, LocalPath... paths) {
List<Layer> layers = new ArrayList<>();
for (LocalPath path : paths) {
try {
// Create tar.gz archive for directory
if (Files.isDirectory(path.getPath())) {
SupportedCompression compression = SupportedCompression.fromMediaType(path.getMediaType());

// If source need to be packed first
boolean autoUnpack = compression.isAutoUnpack();
LocalPath tempSource = autoUnpack ? ArchiveUtils.tar(path) : path;
LocalPath tempArchive = ArchiveUtils.compress(tempSource, path.getMediaType());

if (withDigest) {
ref = ref.withDigest(ref.getAlgorithm().digest(tempArchive.getPath()));
}
try (InputStream is = Files.newInputStream(tempArchive.getPath())) {
String title = path.getPath().isAbsolute()
? path.getPath().getFileName().toString()
: path.getPath().toString();

// We store the filename, based on directory name if we don't auto unpack
if (!autoUnpack) {
title = "%s.%s".formatted(title, compression.getFileExtension());
}
LOG.debug("Uploading directory as archive with title: {}", title);

Map<String, String> layerAnnotations = annotations.hasFileAnnotations(title)
? annotations.getFileAnnotations(title)
: new LinkedHashMap<>(Map.of(Const.ANNOTATION_TITLE, title));

// Add oras digest/unpack
// For example zip can be packed application/zip but never unpacked by the runtime
// This is convenience method to pack zip layer as directories
if (compression.isAutoUnpack()) {
layerAnnotations.put(
Const.ANNOTATION_ORAS_CONTENT_DIGEST,
ref.getAlgorithm().digest(tempSource.getPath()));
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "true");
} else {
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "false");
}

Layer layer = pushBlob(ref, is)
.withMediaType(path.getMediaType())
.withAnnotations(layerAnnotations);
layers.add(layer);
LOG.info("Uploaded directory: {}", layer.getDigest());
}
Files.delete(tempArchive.getPath());
} else {
try (InputStream is = Files.newInputStream(path.getPath())) {
if (withDigest) {
ref = ref.withDigest(ref.getAlgorithm().digest(path.getPath()));
}
String title = path.getPath().getFileName().toString();
Map<String, String> layerAnnotations = annotations.hasFileAnnotations(title)
? annotations.getFileAnnotations(title)
: Map.of(Const.ANNOTATION_TITLE, title);

Layer layer = pushBlob(ref, is)
.withMediaType(path.getMediaType())
.withAnnotations(layerAnnotations);
layers.add(layer);
LOG.info("Uploaded: {}", layer.getDigest());
}
}
} catch (IOException e) {
throw new OrasException("Failed to push artifact", e);
}
try {
return Arrays.stream(paths)
.map(p -> CompletableFuture.supplyAsync(
() -> pushLayer(ref, annotations, withDigest, p), getExecutorService()))
.map(CompletableFuture::join)
.toList();
} catch (CompletionException e) {
throw new OrasException("Failed to push layers", e.getCause());
}
return layers;
}

/**
Expand Down Expand Up @@ -307,6 +248,12 @@ public final Manifest attachArtifact(T ref, ArtifactType artifactType, LocalPath
*/
public abstract Tags getTags(T ref);

/**
* Get the executor service for concurrent operations. This is used for concurrent pushing and pulling of layers.
* @return The executor service
*/
public abstract ExecutorService getExecutorService();

/**
* Get the tags for a ref
* @param ref The ref
Expand Down Expand Up @@ -483,4 +430,72 @@ public Manifest attachArtifact(T ref, ArtifactType artifactType, Annotations ann
SupportedAlgorithm.getDefault().digest(manifest.toJson().getBytes(StandardCharsets.UTF_8))),
manifest);
}

protected Layer pushLayer(T ref, Annotations annotations, boolean withDigest, LocalPath path) {
try {
// Create tar.gz archive for directory
if (Files.isDirectory(path.getPath())) {
SupportedCompression compression = SupportedCompression.fromMediaType(path.getMediaType());

// If source need to be packed first
boolean autoUnpack = compression.isAutoUnpack();
LocalPath tempSource = autoUnpack ? ArchiveUtils.tar(path) : path;
LocalPath tempArchive = ArchiveUtils.compress(tempSource, path.getMediaType());

if (withDigest) {
ref = ref.withDigest(ref.getAlgorithm().digest(tempArchive.getPath()));
}
try (InputStream is = Files.newInputStream(tempArchive.getPath())) {
String title = path.getPath().isAbsolute()
? path.getPath().getFileName().toString()
: path.getPath().toString();

// We store the filename, based on directory name if we don't auto unpack
if (!autoUnpack) {
title = "%s.%s".formatted(title, compression.getFileExtension());
}
LOG.debug("Uploading directory as archive with title: {}", title);

Map<String, String> layerAnnotations = annotations.hasFileAnnotations(title)
? annotations.getFileAnnotations(title)
: new LinkedHashMap<>(Map.of(Const.ANNOTATION_TITLE, title));

// Add oras digest/unpack
// For example zip can be packed application/zip but never unpacked by the runtime
// This is convenience method to pack zip layer as directories
if (compression.isAutoUnpack()) {
layerAnnotations.put(
Const.ANNOTATION_ORAS_CONTENT_DIGEST,
ref.getAlgorithm().digest(tempSource.getPath()));
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "true");
} else {
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "false");
}

Layer layer =
pushBlob(ref, is).withMediaType(path.getMediaType()).withAnnotations(layerAnnotations);
LOG.info("Uploaded directory: {}", layer.getDigest());
Files.delete(tempArchive.getPath());
return layer;
}
} else {
try (InputStream is = Files.newInputStream(path.getPath())) {
if (withDigest) {
ref = ref.withDigest(ref.getAlgorithm().digest(path.getPath()));
}
String title = path.getPath().getFileName().toString();
Map<String, String> layerAnnotations = annotations.hasFileAnnotations(title)
? annotations.getFileAnnotations(title)
: Map.of(Const.ANNOTATION_TITLE, title);

Layer layer =
pushBlob(ref, is).withMediaType(path.getMediaType()).withAnnotations(layerAnnotations);
LOG.info("Uploaded: {}", layer.getDigest());
return layer;
}
}
} catch (IOException e) {
throw new OrasException("Failed to push artifact", e);
}
}
}
9 changes: 9 additions & 0 deletions src/main/java/land/oras/OCILayout.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import land.oras.exception.OrasException;
import land.oras.utils.Const;
Expand All @@ -45,6 +47,8 @@ public final class OCILayout extends OCI<LayoutRef> {
@SuppressWarnings("all")
private final String imageLayoutVersion = "1.0.0";

private final ExecutorService executors = Executors.newSingleThreadExecutor();

/**
* Path on the file system of the OCI Layout
*/
Expand All @@ -63,6 +67,11 @@ public static OCILayout.Builder builder() {
return OCILayout.Builder.builder();
}

@Override
public ExecutorService getExecutorService() {
return executors;
}

@Override
public Manifest pushArtifact(
LayoutRef ref,
Expand Down
Loading