From d9e85205a06cda629241510cb8e4e1425b85f3fa Mon Sep 17 00:00:00 2001 From: Andrew Gaul Date: Wed, 9 Oct 2024 19:39:12 -0700 Subject: [PATCH] Add NIO.2 BlobStore This will enable multiple backends, e.g., jimfs (in-memory), filesystem, and possibly stranger things like Hadoop. Currently only configured to use jimfs. Fixes #697. --- .github/workflows/ci-main.yml | 5 + README.md | 1 + pom.xml | 5 + src/main/java/org/gaul/s3proxy/Quirks.java | 3 +- .../java/org/gaul/s3proxy/S3ProxyHandler.java | 6 +- .../s3proxy/nio2blob/Nio2BlobApiMetadata.java | 80 ++ .../nio2blob/Nio2BlobProviderMetadata.java | 75 ++ .../gaul/s3proxy/nio2blob/Nio2BlobStore.java | 958 ++++++++++++++++++ .../nio2blob/Nio2BlobStoreContextModule.java | 31 + src/main/resources/checkstyle.xml | 1 - .../java/org/gaul/s3proxy/AwsSdkTest.java | 12 + .../resources/s3proxy-transient-nio2.conf | 16 + 12 files changed, 1190 insertions(+), 3 deletions(-) create mode 100644 src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobApiMetadata.java create mode 100644 src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobProviderMetadata.java create mode 100644 src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobStore.java create mode 100644 src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobStoreContextModule.java create mode 100644 src/test/resources/s3proxy-transient-nio2.conf diff --git a/.github/workflows/ci-main.yml b/.github/workflows/ci-main.yml index 2fec08a..39a79d2 100644 --- a/.github/workflows/ci-main.yml +++ b/.github/workflows/ci-main.yml @@ -75,6 +75,11 @@ jobs: run: | mvn test + - name: Maven Test with transient-nio2 + run: | + # TODO: run other test classes + mvn test -Ds3proxy.test.conf=s3proxy-transient-nio2.conf -Dtest=AwsSdkTest + - name: Install Azurite run: npx --yes --loglevel info azurite --version - name: Start Azurite diff --git a/README.md b/README.md index ac3891a..40d9eb3 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,7 @@ Maven Central hosts S3Proxy artifacts and the wiki has * rackspace-cloudfiles-uk and rackspace-cloudfiles-us * s3 (all implementations) * transient (in-memory storage) +* transient-nio2 (in-memory storage, preview) See the wiki for [examples of configurations](https://github.com/gaul/s3proxy/wiki/Storage-backend-examples). diff --git a/pom.xml b/pom.xml index 47cc19f..d73ee12 100644 --- a/pom.xml +++ b/pom.xml @@ -424,6 +424,11 @@ guava 32.0.0-jre + + com.google.jimfs + jimfs + 1.3.0 + javax.xml.bind jaxb-api diff --git a/src/main/java/org/gaul/s3proxy/Quirks.java b/src/main/java/org/gaul/s3proxy/Quirks.java index 0bccba2..53bdc9b 100644 --- a/src/main/java/org/gaul/s3proxy/Quirks.java +++ b/src/main/java/org/gaul/s3proxy/Quirks.java @@ -98,7 +98,8 @@ final class Quirks { "filesystem", "google-cloud-storage", "openstack-swift", - "transient" + "transient", + "transient-nio2" ); /** Blobstores with opaque ETags. */ diff --git a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java index 6d2de45..398361b 100644 --- a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java +++ b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java @@ -2265,7 +2265,11 @@ public class S3ProxyHandler { String uploadId) throws IOException, S3Exception { BlobMetadata metadata; PutOptions options; - if (Quirks.MULTIPART_REQUIRES_STUB.contains(getBlobStoreType( + if ("transient-nio2".equals(getBlobStoreType(blobStore))) { + // TODO: transient-nio2 does not yet support blob access + metadata = blobStore.blobMetadata(containerName, uploadId); + options = new PutOptions(); + } else if (Quirks.MULTIPART_REQUIRES_STUB.contains(getBlobStoreType( blobStore))) { metadata = blobStore.blobMetadata(containerName, uploadId); BlobAccess access = blobStore.getBlobAccess(containerName, diff --git a/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobApiMetadata.java b/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobApiMetadata.java new file mode 100644 index 0000000..966076f --- /dev/null +++ b/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobApiMetadata.java @@ -0,0 +1,80 @@ +/* + * Copyright 2014-2024 Andrew Gaul + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gaul.s3proxy.nio2blob; + +import java.net.URI; +import java.util.Properties; +import java.util.Set; + +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.reflect.Reflection2; +import org.jclouds.rest.internal.BaseHttpApiMetadata; + +public final class Nio2BlobApiMetadata extends BaseHttpApiMetadata { + public Nio2BlobApiMetadata() { + this(builder()); + } + + protected Nio2BlobApiMetadata(Builder builder) { + super(builder); + } + + private static Builder builder() { + return new Builder(); + } + + @Override + public Builder toBuilder() { + return builder().fromApiMetadata(this); + } + + public static Properties defaultProperties() { + return BaseHttpApiMetadata.defaultProperties(); + } + + // Fake API client + private interface Nio2BlobClient { + } + + public static final class Builder + extends BaseHttpApiMetadata.Builder { + protected Builder() { + super(Nio2BlobClient.class); + id("transient-nio2") + .name("NIO.2 Blobstore") + .identityName("Account Name") + .credentialName("Access Key") + .defaultEndpoint("http://localhost/") + .documentation(URI.create( + "http://www.jclouds.org/documentation/userguide" + + "/blobstore-guide")) + .defaultProperties(Nio2BlobApiMetadata.defaultProperties()) + .view(Reflection2.typeToken(BlobStoreContext.class)) + .defaultModules(Set.of(Nio2BlobStoreContextModule.class)); + } + + @Override + public Nio2BlobApiMetadata build() { + return new Nio2BlobApiMetadata(this); + } + + @Override + protected Builder self() { + return this; + } + } +} diff --git a/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobProviderMetadata.java b/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobProviderMetadata.java new file mode 100644 index 0000000..c2bd2ed --- /dev/null +++ b/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobProviderMetadata.java @@ -0,0 +1,75 @@ +/* + * Copyright 2014-2024 Andrew Gaul + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gaul.s3proxy.nio2blob; + +import java.util.Properties; + +import com.google.auto.service.AutoService; + +import org.jclouds.providers.ProviderMetadata; +import org.jclouds.providers.internal.BaseProviderMetadata; + +/** + * Implementation of org.jclouds.types.ProviderMetadata for NIO.2 filesystems. + */ +@AutoService(ProviderMetadata.class) +public final class Nio2BlobProviderMetadata extends BaseProviderMetadata { + public Nio2BlobProviderMetadata() { + super(builder()); + } + + public Nio2BlobProviderMetadata(Builder builder) { + super(builder); + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public Builder toBuilder() { + return builder().fromProviderMetadata(this); + } + + public static Properties defaultProperties() { + Properties properties = new Properties(); + // TODO: filesystem basedir + return properties; + } + public static final class Builder extends BaseProviderMetadata.Builder { + protected Builder() { + id("transient-nio2") + .name("NIO.2 filesystem blobstore") + .apiMetadata(new Nio2BlobApiMetadata()) + .endpoint("https://127.0.0.1") // TODO: + .defaultProperties( + Nio2BlobProviderMetadata.defaultProperties()); + } + + @Override + public Nio2BlobProviderMetadata build() { + return new Nio2BlobProviderMetadata(this); + } + + @Override + public Builder fromProviderMetadata( + ProviderMetadata in) { + super.fromProviderMetadata(in); + return this; + } + } +} diff --git a/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobStore.java b/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobStore.java new file mode 100644 index 0000000..513fd8c --- /dev/null +++ b/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobStore.java @@ -0,0 +1,958 @@ +/* + * Copyright 2014-2024 Andrew Gaul + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gaul.s3proxy.nio2blob; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.UserDefinedFileAttributeView; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import com.google.common.hash.HashingInputStream; +import com.google.common.io.BaseEncoding; +import com.google.common.io.ByteSource; +import com.google.common.io.ByteStreams; +import com.google.common.jimfs.Configuration; +import com.google.common.jimfs.Jimfs; +import com.google.common.net.HttpHeaders; +import com.google.common.primitives.Longs; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.KeyNotFoundException; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.BlobAccess; +import org.jclouds.blobstore.domain.BlobMetadata; +import org.jclouds.blobstore.domain.ContainerAccess; +import org.jclouds.blobstore.domain.MultipartPart; +import org.jclouds.blobstore.domain.MultipartUpload; +import org.jclouds.blobstore.domain.PageSet; +import org.jclouds.blobstore.domain.StorageMetadata; +import org.jclouds.blobstore.domain.StorageType; +import org.jclouds.blobstore.domain.Tier; +import org.jclouds.blobstore.domain.internal.BlobBuilderImpl; +import org.jclouds.blobstore.domain.internal.PageSetImpl; +import org.jclouds.blobstore.domain.internal.StorageMetadataImpl; +import org.jclouds.blobstore.internal.BaseBlobStore; +import org.jclouds.blobstore.options.CopyOptions; +import org.jclouds.blobstore.options.CreateContainerOptions; +import org.jclouds.blobstore.options.GetOptions; +import org.jclouds.blobstore.options.ListContainerOptions; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.blobstore.util.BlobStoreUtils; +import org.jclouds.blobstore.util.BlobUtils; +import org.jclouds.collect.Memoized; +import org.jclouds.domain.Credentials; +import org.jclouds.domain.Location; +import org.jclouds.http.HttpCommand; +import org.jclouds.http.HttpRequest; +import org.jclouds.http.HttpResponse; +import org.jclouds.http.HttpResponseException; +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Singleton +public final class Nio2BlobStore extends BaseBlobStore { + private static final Logger logger = LoggerFactory.getLogger( + Nio2BlobStore.class); + private static final String XATTR_CACHE_CONTROL = "user.cache-control"; + private static final String XATTR_CONTENT_DISPOSITION = + "user.content-disposition"; + private static final String XATTR_CONTENT_ENCODING = + "user.content-encoding"; + private static final String XATTR_CONTENT_LANGUAGE = + "user.content-language"; + private static final String XATTR_CONTENT_MD5 = "user.content-md5"; + private static final String XATTR_CONTENT_TYPE = "user.content-type"; + private static final String XATTR_EXPIRES = "user.expires"; + private static final String XATTR_STORAGE_TIER = "user.storage-tier"; + private static final String XATTR_USER_METADATA_PREFIX = + "user.user-metadata."; + private static final String MULTIPART_PREFIX = ".mpus-"; + private static final byte[] DIRECTORY_MD5 = + Hashing.md5().hashBytes(new byte[0]).asBytes(); + + private final Supplier> locations; + private final FileSystem fs; + + @Inject + Nio2BlobStore(BlobStoreContext context, BlobUtils blobUtils, + Supplier defaultLocation, + @Memoized Supplier> locations, + PayloadSlicer slicer, + @org.jclouds.location.Provider Supplier creds) { + super(context, blobUtils, defaultLocation, locations, slicer); + this.locations = requireNonNull(locations, "locations"); + // TODO: close this + this.fs = Jimfs.newFileSystem(Configuration.unix().toBuilder() + .setAttributeViews("basic", "user") + .setWorkingDirectory("/") + .build()); + } + + @Override + public Set listAssignableLocations() { + return locations.get(); + } + + @Override + public PageSet list() { + var set = ImmutableSet.builder(); + try (var stream = Files.newDirectoryStream(fs.getPath(""))) { + for (var path : stream) { + var attr = Files.readAttributes(path, + BasicFileAttributes.class); + var lastModifiedTime = new Date( + attr.lastModifiedTime().toMillis()); + var creationTime = new Date(attr.creationTime().toMillis()); + set.add(new StorageMetadataImpl(StorageType.CONTAINER, + /*id=*/ null, path.getFileName().toString(), + /*location=*/ null, /*uri=*/ null, + /*eTag=*/ null, creationTime, lastModifiedTime, + Map.of(), /*size=*/ null, Tier.STANDARD)); + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + return new PageSetImpl(set.build(), null); + } + + @Override + public PageSet list(String container, + ListContainerOptions options) { + var delimiter = options.getDelimiter(); + if (delimiter != null && !delimiter.equals("/")) { + throw new IllegalArgumentException("Delimiters other than / not supported"); + } + + var prefix = options.getPrefix(); + var dirPrefix = fs.getPath(container); + if (prefix != null) { + int idx = prefix.lastIndexOf('/'); + if (idx != -1) { + dirPrefix = dirPrefix.resolve(prefix.substring(0, idx)); + } + } else { + prefix = ""; + } + prefix = "/" + container + "/" + prefix; + var set = ImmutableSet.builder(); + try { + listHelper(set, /*count=*/ 0, options.getMaxResults(), + container, dirPrefix, prefix, delimiter, + options.getMarker()); + } catch (IOException ioe) { + logger.error("unexpected exception", ioe); + throw new RuntimeException(ioe); + } + return new PageSetImpl(set.build(), null); + } + + // TODO: marker + private static int listHelper(ImmutableSet.Builder builder, + int count, Integer maxResults, String container, Path parent, String prefix, + String delimiter, String marker) throws IOException { + logger.debug("recursing at: {} with prefix: {}", parent, prefix); + if (!Files.isDirectory(parent)) { // TODO: TOCTOU + return count; + } + try (var stream = Files.newDirectoryStream(parent)) { + for (var path : stream) { + if (maxResults != null && count == maxResults) { + // TODO: this is wrong -- return all results, sort, and limit in caller to produce marker + return count; + } + + logger.debug("examining: {}", path); + if (!path.toString().startsWith(prefix.substring(1))) { + continue; + } else if (Files.isDirectory(path)) { + if (!"/".equals(delimiter)) { + count += listHelper(builder, count, maxResults, container, path, prefix, delimiter, marker); + } + + // Add a prefix if the directory blob exists or if the delimiter causes us not to recuse. + var view = Files.getFileAttributeView(path, UserDefinedFileAttributeView.class); + if (view != null && Set.copyOf(view.list()).contains(XATTR_CONTENT_MD5) || "/".equals(delimiter)) { + var name = path.toString().substring((container + "/").length()); + builder.add(new StorageMetadataImpl( + StorageType.RELATIVE_PATH, + /*id=*/ null, name + "/", + /*location=*/ null, /*uri=*/ null, + /*eTag=*/ null, /*creationTime=*/ null, + /*lastModifiedTime=*/ null, + Map.of(), /*size=*/ null, Tier.STANDARD)); + count++; + } + } else { + var name = path.toString().substring((container + "/").length()); + logger.debug("adding: {}", name); + var attr = Files.readAttributes(path, BasicFileAttributes.class); + var lastModifiedTime = new Date(attr.lastModifiedTime().toMillis()); + var creationTime = new Date(attr.creationTime().toMillis()); + builder.add(new StorageMetadataImpl(StorageType.BLOB, + /*id=*/ null, name, + /*location=*/ null, /*uri=*/ null, + /*eTag=*/ null, creationTime, lastModifiedTime, + // TODO: get Tier + Map.of(), attr.size(), Tier.STANDARD)); + count++; + } + } + } catch (NoSuchFileException nsfe) { + // ignore + } + + return count; + } + + @Override + public boolean containerExists(String container) { + return Files.exists(fs.getPath(container)); + } + + @Override + public boolean createContainerInLocation(Location location, + String container) { + return createContainerInLocation(location, container, + new CreateContainerOptions()); + } + + @Override + public boolean createContainerInLocation(Location location, + String container, CreateContainerOptions options) { + try { + Files.createDirectory(fs.getPath(container)); + } catch (FileAlreadyExistsException faee) { + return false; + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + return true; + } + + @Override + public void deleteContainer(String container) { + try { + Files.deleteIfExists(fs.getPath(container)); + } catch (DirectoryNotEmptyException dnee) { + // TODO: what to do? + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public boolean blobExists(String container, String key) { + return blobMetadata(container, key) != null; + } + + @Override + public Blob getBlob(String container, String key, GetOptions options) { + var path = fs.getPath(container, key); + logger.debug("Getting blob at: " + path); + + try { + var isDirectory = Files.isDirectory(path); + var attr = Files.readAttributes(path, BasicFileAttributes.class); + var view = Files.getFileAttributeView(path, UserDefinedFileAttributeView.class); + var attributes = Set.copyOf(view.list()); + var cacheControl = readStringAttributeIfPresent(view, attributes, XATTR_CACHE_CONTROL); + var contentDisposition = readStringAttributeIfPresent(view, attributes, XATTR_CONTENT_DISPOSITION); + var contentEncoding = readStringAttributeIfPresent(view, attributes, XATTR_CONTENT_ENCODING); + var contentLanguage = readStringAttributeIfPresent(view, attributes, XATTR_CONTENT_LANGUAGE); + var contentType = isDirectory ? "application/x-directory" : + readStringAttributeIfPresent(view, attributes, XATTR_CONTENT_TYPE); + Date expires = null; + HashCode hashCode = null; + String eTag = null; + var tier = Tier.STANDARD; + var userMetadata = ImmutableMap.builder(); + // TODO: times + //var lastModifiedTime = new Date(attr.lastModifiedTime().toMillis()); + //var creationTime = new Date(attr.creationTime().toMillis()); + + if (isDirectory) { + if (!attributes.contains(XATTR_CONTENT_MD5)) { + // Lacks directory marker -- implicit directory. + return null; + } + } else if (attributes.contains(XATTR_CONTENT_MD5)) { + var buf = ByteBuffer.allocate(view.size(XATTR_CONTENT_MD5)); + view.read(XATTR_CONTENT_MD5, buf); + var etagBytes = buf.array(); + if (etagBytes.length == 16) { + // regular object + hashCode = HashCode.fromBytes(buf.array()); + eTag = "\"" + hashCode + "\""; + } else { + // multi-part object + eTag = new String(etagBytes, StandardCharsets.US_ASCII); + } + } + if (attributes.contains(XATTR_EXPIRES)) { + ByteBuffer buf = ByteBuffer.allocate(view.size(XATTR_EXPIRES)); + view.read(XATTR_EXPIRES, buf); + buf.flip(); + expires = new Date(buf.asLongBuffer().get()); + } + var tierString = readStringAttributeIfPresent(view, attributes, XATTR_STORAGE_TIER); + if (tierString != null) { + tier = Tier.valueOf(tierString); + } + for (String attribute : attributes) { + if (!attribute.startsWith(XATTR_USER_METADATA_PREFIX)) { + continue; + } + var value = readStringAttributeIfPresent(view, attributes, attribute); + userMetadata.put(attribute.substring(XATTR_USER_METADATA_PREFIX.length()), value); + } + + // Handle range. + String contentRange = null; + InputStream inputStream; + long size; + if (isDirectory) { + inputStream = ByteSource.empty().openStream(); + size = 0; + } else { + inputStream = Files.newInputStream(path); // TODO: leaky on exception + size = attr.size(); + if (options.getRanges().size() > 0) { + var range = options.getRanges().get(0); + // HTTP uses a closed interval while Java array indexing uses a + // half-open interval. + long offset = 0; + long last = size; + if (range.startsWith("-")) { + offset = last - Long.parseLong(range.substring(1)) + 1; + if (offset < 0) { + offset = 0; + } + } else if (range.endsWith("-")) { + offset = Long.parseLong(range.substring(0, range.length() - 1)); + } else if (range.contains("-")) { + String[] firstLast = range.split("\\-"); + offset = Long.parseLong(firstLast[0]); + last = Long.parseLong(firstLast[1]); + } else { + throw new HttpResponseException("illegal range: " + range, null, HttpResponse.builder().statusCode(416).build()); + } + + if (offset >= size) { + throw new HttpResponseException("illegal range: " + range, null, HttpResponse.builder().statusCode(416).build()); + } + if (last + 1 > size) { + last = size - 1; + } + ByteStreams.skipFully(inputStream, offset); + size = last - offset + 1; + inputStream = ByteStreams.limit(inputStream, size); + contentRange = "bytes " + offset + "-" + last + "/" + attr.size(); + } + } + + Blob blob = new BlobBuilderImpl() + .type(isDirectory ? StorageType.FOLDER : StorageType.BLOB) + .name(key) + .userMetadata(userMetadata.build()) + .payload(inputStream) + .cacheControl(cacheControl) + .contentDisposition(contentDisposition) + .contentEncoding(contentEncoding) + .contentLanguage(contentLanguage) + .contentLength(size) + .contentMD5(hashCode) + .contentType(contentType) + .eTag(eTag) + .expires(expires) + .tier(tier) + .build(); + blob.getMetadata().setContainer(container); + blob.getMetadata().setSize(size); + if (contentRange != null) { + blob.getAllHeaders().put(HttpHeaders.CONTENT_RANGE, contentRange); + } + return blob; + } catch (NoSuchFileException nsfe) { + return null; + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public String putBlob(String container, Blob blob) { + return putBlob(container, blob, new PutOptions()); + } + + @Override + public String putBlob(String container, Blob blob, PutOptions options) { + var path = fs.getPath(container, blob.getMetadata().getName()); + // TODO: should we use a known suffix to filter these out during list? + var tmpPath = fs.getPath(container, blob.getMetadata().getName() + "-" + UUID.randomUUID()); + logger.debug("Creating blob at: " + path); + + if (blob.getMetadata().getName().endsWith("/")) { + try { + logger.debug("Creating directory blob: {}", path); + Files.createDirectories(path); + } catch (FileAlreadyExistsException faee) { + logger.debug("Parent directories already exist: {}", path.getParent()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + + var view = Files.getFileAttributeView(path, UserDefinedFileAttributeView.class); + try { + writeCommonMetadataAttr(view, blob); + view.write(XATTR_CONTENT_MD5, ByteBuffer.wrap(DIRECTORY_MD5)); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + + return BaseEncoding.base16().lowerCase().encode(DIRECTORY_MD5); + } + + // Create parent directories. + try { + Files.createDirectories(path.getParent()); + } catch (FileAlreadyExistsException faee) { + logger.debug("Parent directories already exist: {}", path.getParent()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + + var metadata = blob.getMetadata().getContentMetadata(); + try (var is = new HashingInputStream(Hashing.md5(), blob.getPayload().openStream()); + var os = Files.newOutputStream(tmpPath)) { + var count = is.transferTo(os); + var hashCode = is.hash(); + + var view = Files.getFileAttributeView(tmpPath, UserDefinedFileAttributeView.class); + if (view != null) { + try { + var eTag = hashCode.asBytes(); + view.write(XATTR_CONTENT_MD5, ByteBuffer.wrap(eTag)); + writeStringAttributeIfPresent(view, XATTR_CACHE_CONTROL, metadata.getCacheControl()); + writeStringAttributeIfPresent(view, XATTR_CONTENT_DISPOSITION, metadata.getContentDisposition()); + writeStringAttributeIfPresent(view, XATTR_CONTENT_ENCODING, metadata.getContentEncoding()); + writeStringAttributeIfPresent(view, XATTR_CONTENT_LANGUAGE, metadata.getContentLanguage()); + writeStringAttributeIfPresent(view, XATTR_CONTENT_TYPE, metadata.getContentType()); + var expires = metadata.getExpires(); + if (expires != null) { + ByteBuffer buf = ByteBuffer.allocate(Longs.BYTES).putLong(expires.getTime()); + buf.flip(); + view.write(XATTR_EXPIRES, buf); + } + writeStringAttributeIfPresent(view, XATTR_STORAGE_TIER, blob.getMetadata().getTier().toString()); + for (var entry : blob.getMetadata().getUserMetadata().entrySet()) { + writeStringAttributeIfPresent(view, XATTR_USER_METADATA_PREFIX + entry.getKey(), entry.getValue()); + } + } catch (IOException e) { + // TODO: + //logger.debug("xattrs not supported on %s", path); + } + } + + Files.move(tmpPath, path, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + + return "\"" + hashCode + "\""; + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public String copyBlob(String fromContainer, String fromName, + String toContainer, String toName, CopyOptions options) { + var blob = getBlob(fromContainer, fromName); + if (blob == null) { + throw new KeyNotFoundException(fromContainer, fromName, "while copying"); + } + + var eTag = blob.getMetadata().getETag(); + if (eTag != null) { + eTag = maybeQuoteETag(eTag); + if (options.ifMatch() != null && !maybeQuoteETag(options.ifMatch()).equals(eTag)) { + throw returnResponseException(412); + } + if (options.ifNoneMatch() != null && maybeQuoteETag(options.ifNoneMatch()).equals(eTag)) { + throw returnResponseException(412); + } + } + + var lastModified = blob.getMetadata().getLastModified(); + if (lastModified != null) { + if (options.ifModifiedSince() != null && lastModified.compareTo(options.ifModifiedSince()) <= 0) { + throw returnResponseException(412); + } + if (options.ifUnmodifiedSince() != null && lastModified.compareTo(options.ifUnmodifiedSince()) >= 0) { + throw returnResponseException(412); + } + } + + try (var is = blob.getPayload().openStream()) { + var metadata = blob.getMetadata().getContentMetadata(); + var builder = blobBuilder(toName).payload(is); + Long contentLength = metadata.getContentLength(); + if (contentLength != null) { + builder.contentLength(contentLength); + } + + var contentMetadata = options.contentMetadata(); + if (contentMetadata != null) { + String cacheControl = contentMetadata.getCacheControl(); + if (cacheControl != null) { + builder.cacheControl(cacheControl); + } + String contentDisposition = contentMetadata.getContentDisposition(); + if (contentDisposition != null) { + builder.contentDisposition(contentDisposition); + } + String contentEncoding = contentMetadata.getContentEncoding(); + if (contentEncoding != null) { + builder.contentEncoding(contentEncoding); + } + String contentLanguage = contentMetadata.getContentLanguage(); + if (contentLanguage != null) { + builder.contentLanguage(contentLanguage); + } + String contentType = contentMetadata.getContentType(); + if (contentType != null) { + builder.contentType(contentType); + } + } else { + builder.cacheControl(metadata.getCacheControl()) + .contentDisposition(metadata.getContentDisposition()) + .contentEncoding(metadata.getContentEncoding()) + .contentLanguage(metadata.getContentLanguage()) + .contentType(metadata.getContentType()); + } + + var userMetadata = options.userMetadata(); + if (userMetadata != null) { + builder.userMetadata(userMetadata); + } else { + builder.userMetadata(blob.getMetadata().getUserMetadata()); + } + return putBlob(toContainer, builder.build()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public void removeBlob(String container, String key) { + try { + var path = fs.getPath(container, key); + Files.delete(path); + removeEmptyParentDirectories(path.getParent()); + } catch (NoSuchFileException nsfe) { + return; + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public BlobMetadata blobMetadata(String container, String key) { + Blob blob = getBlob(container, key); + if (blob == null) { + return null; + } + + try { + blob.getPayload().openStream().close(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + return blob != null ? (BlobMetadata) BlobStoreUtils.copy(blob.getMetadata()) : null; + } + + @Override + protected boolean deleteAndVerifyContainerGone(String container) { + deleteContainer(container); + return !containerExists(container); + } + + // TODO: ContainerAccess + @Override + public ContainerAccess getContainerAccess(String container) { + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public void setContainerAccess(String container, ContainerAccess access) { + throw new UnsupportedOperationException("not yet implemented"); + } + + // TODO: BlobAccess + @Override + public BlobAccess getBlobAccess(String container, String key) { + //return BlobAccess.PRIVATE; + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public void setBlobAccess(String container, String key, BlobAccess access) { + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public MultipartUpload initiateMultipartUpload(String container, + BlobMetadata blobMetadata, PutOptions options) { + var uploadId = UUID.randomUUID().toString(); + // create a stub blob + var blob = blobBuilder(MULTIPART_PREFIX + uploadId + "-" + blobMetadata.getName() + "-stub").payload(ByteSource.empty()).build(); + putBlob(container, blob); + return MultipartUpload.create(container, blobMetadata.getName(), uploadId, + blobMetadata, options); + } + + @Override + public void abortMultipartUpload(MultipartUpload mpu) { + var parts = listMultipartUpload(mpu); + for (var part : parts) { + removeBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber()); + } + removeBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-stub"); + } + + @Override + public String completeMultipartUpload(MultipartUpload mpu, List parts) { + var metas = ImmutableList.builder(); + long contentLength = 0; + var md5Hasher = Hashing.md5().newHasher(); + + for (var part : parts) { + var meta = blobMetadata(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber()); + contentLength += meta.getContentMetadata().getContentLength(); + metas.add(meta); + if (meta.getETag() != null) { + var eTag = meta.getETag(); + if (eTag.startsWith("\"") && eTag.endsWith("\"") && + eTag.length() >= 2) { + eTag = eTag.substring(1, eTag.length() - 1); + } + md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(eTag)); + } + } + var mpuETag = new StringBuilder("\"") + .append(md5Hasher.hash()) + .append("-") + .append(parts.size()) + .append("\"") + .toString(); + var blobBuilder = blobBuilder(mpu.blobName()) + .userMetadata(mpu.blobMetadata().getUserMetadata()) + .payload(new MultiBlobInputStream(this, metas.build())) + .contentLength(contentLength) + .eTag(mpuETag); + var cacheControl = mpu.blobMetadata().getContentMetadata().getCacheControl(); + if (cacheControl != null) { + blobBuilder.cacheControl(cacheControl); + } + var contentDisposition = mpu.blobMetadata().getContentMetadata().getContentDisposition(); + if (contentDisposition != null) { + blobBuilder.contentDisposition(contentDisposition); + } + var contentEncoding = mpu.blobMetadata().getContentMetadata().getContentEncoding(); + if (contentEncoding != null) { + blobBuilder.contentEncoding(contentEncoding); + } + var contentLanguage = mpu.blobMetadata().getContentMetadata().getContentLanguage(); + if (contentLanguage != null) { + blobBuilder.contentLanguage(contentLanguage); + } + // intentionally not copying MD5 + var contentType = mpu.blobMetadata().getContentMetadata().getContentType(); + if (contentType != null) { + blobBuilder.contentType(contentType); + } + var expires = mpu.blobMetadata().getContentMetadata().getExpires(); + if (expires != null) { + blobBuilder.expires(expires); + } + var tier = mpu.blobMetadata().getTier(); + if (tier != null) { + blobBuilder.tier(tier); + } + + putBlob(mpu.containerName(), blobBuilder.build()); + + for (var part : parts) { + removeBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber()); + } + removeBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-stub"); + + // TODO: setBlobAccess(mpu.containerName(), mpu.blobName(), mpu.putOptions().getBlobAccess()); + + return mpuETag; + } + + @Override + public MultipartPart uploadMultipartPart(MultipartUpload mpu, int partNumber, Payload payload) { + var partName = MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + partNumber; + var blob = blobBuilder(partName) + .payload(payload) + .build(); + var partETag = putBlob(mpu.containerName(), blob); + var metadata = blobMetadata(mpu.containerName(), partName); // TODO: racy, how to get this from payload? + var partSize = metadata.getContentMetadata().getContentLength(); + return MultipartPart.create(partNumber, partSize, partETag, metadata.getLastModified()); + } + + @Override + public List listMultipartUpload(MultipartUpload mpu) { + var parts = ImmutableList.builder(); + var options = + new ListContainerOptions().prefix(MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-").recursive(); + while (true) { + var pageSet = list(mpu.containerName(), options); + for (var sm : pageSet) { + if (sm.getName().endsWith("-stub")) { + continue; + } + int partNumber = Integer.parseInt(sm.getName().substring((MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-").length())); + long partSize = sm.getSize(); + parts.add(MultipartPart.create(partNumber, partSize, sm.getETag(), sm.getLastModified())); + } + if (pageSet.isEmpty() || pageSet.getNextMarker() == null) { + break; + } + options.afterMarker(pageSet.getNextMarker()); + } + return parts.build(); + } + + @Override + public List listMultipartUploads(String container) { + var mpus = ImmutableList.builder(); + var options = new ListContainerOptions().prefix(MULTIPART_PREFIX).recursive(); + int uuidLength = UUID.randomUUID().toString().length(); + while (true) { + var pageSet = list(container, options); + for (StorageMetadata sm : pageSet) { + if (!sm.getName().endsWith("-stub")) { + continue; + } + var uploadId = sm.getName().substring(MULTIPART_PREFIX.length(), MULTIPART_PREFIX.length() + uuidLength); + var blobName = sm.getName().substring(MULTIPART_PREFIX.length() + uuidLength + 1); + int index = blobName.lastIndexOf('-'); + blobName = blobName.substring(0, index); + + mpus.add(MultipartUpload.create(container, blobName, uploadId, null, null)); + } + if (pageSet.isEmpty() || pageSet.getNextMarker() == null) { + break; + } + options.afterMarker(pageSet.getNextMarker()); + } + + return mpus.build(); + } + + @Override + public long getMinimumMultipartPartSize() { + return 1; + } + + @Override + public long getMaximumMultipartPartSize() { + return 100 * 1024 * 1024; + } + + @Override + public int getMaximumNumberOfParts() { + return 50 * 1000; + } + + @Override + public InputStream streamBlob(String container, String name) { + throw new UnsupportedOperationException("not yet implemented"); + } + + /** + * Read the String representation of a filesystem attribute, or return null + * if not present. + */ + private static String readStringAttributeIfPresent( + UserDefinedFileAttributeView view, Set attr, String name) + throws IOException { + if (!attr.contains(name)) { + return null; + } + ByteBuffer buf = ByteBuffer.allocate(view.size(name)); + view.read(name, buf); + return new String(buf.array(), StandardCharsets.UTF_8); + } + + /** Write the String representation of a filesystem attribute. */ + private static void writeStringAttributeIfPresent( + UserDefinedFileAttributeView view, String name, String value) + throws IOException { + if (value != null) { + view.write(name, ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8))); + } + } + + private static final class MultiBlobInputStream extends InputStream { + private final BlobStore blobStore; + private final Iterator metas; + private InputStream current; + + MultiBlobInputStream(BlobStore blobStore, List metas) { + this.blobStore = blobStore; + this.metas = metas.iterator(); + } + + @Override + public int read() throws IOException { + while (true) { + if (current == null) { + if (!metas.hasNext()) { + return -1; + } + BlobMetadata meta = metas.next(); + current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream(); + } + int result = current.read(); + if (result == -1) { + current.close(); + current = null; + continue; + } + return result & 0x000000FF; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + while (true) { + if (current == null) { + if (!metas.hasNext()) { + return -1; + } + BlobMetadata meta = metas.next(); + current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream(); + } + int result = current.read(b, off, len); + if (result == -1) { + current.close(); + current = null; + continue; + } + return result; + } + } + + @Override + public void close() throws IOException { + if (current != null) { + current.close(); + current = null; + } + } + } + + private static HttpResponseException returnResponseException(int code) { + var response = HttpResponse.builder().statusCode(code).build(); + return new HttpResponseException(new HttpCommand(HttpRequest.builder() + .method("GET") + .endpoint("http://stub") + .build()), response); + } + + private static String maybeQuoteETag(String eTag) { + if (!eTag.startsWith("\"") && !eTag.endsWith("\"")) { + eTag = "\"" + eTag + "\""; + } + return eTag; + } + + /** + * Nio2BlobStore implicitly creates directories when creating a key /a/b/c. + * When removing /a/b/c, it must clean up /a and /a/b, unless a client explicitly created a subdirectory which has file attributes. + */ + private static void removeEmptyParentDirectories(Path path) throws IOException { + while (true) { + var parent = path.getParent(); + if (parent == null || parent.equals(path.getRoot())) { + break; + } + var view = Files.getFileAttributeView(path, UserDefinedFileAttributeView.class); + if (view != null && Set.copyOf(view.list()).contains(XATTR_CONTENT_MD5)) { + break; + } + try { + logger.debug("deleting: {}", path); + Files.delete(path); + } catch (DirectoryNotEmptyException dnee) { + break; + } + path = path.getParent(); + } + } + + // TODO: call in other places + private static void writeCommonMetadataAttr(UserDefinedFileAttributeView view, Blob blob) throws IOException { + var metadata = blob.getMetadata().getContentMetadata(); + writeStringAttributeIfPresent(view, XATTR_CACHE_CONTROL, metadata.getCacheControl()); + writeStringAttributeIfPresent(view, XATTR_CONTENT_DISPOSITION, metadata.getContentDisposition()); + writeStringAttributeIfPresent(view, XATTR_CONTENT_ENCODING, metadata.getContentEncoding()); + writeStringAttributeIfPresent(view, XATTR_CONTENT_LANGUAGE, metadata.getContentLanguage()); + writeStringAttributeIfPresent(view, XATTR_CONTENT_TYPE, metadata.getContentType()); + var expires = metadata.getExpires(); + if (expires != null) { + var buf = ByteBuffer.allocate(Longs.BYTES).putLong(expires.getTime()); + buf.flip(); + view.write(XATTR_EXPIRES, buf); + } + writeStringAttributeIfPresent(view, XATTR_STORAGE_TIER, blob.getMetadata().getTier().toString()); + for (var entry : blob.getMetadata().getUserMetadata().entrySet()) { + writeStringAttributeIfPresent(view, XATTR_USER_METADATA_PREFIX + entry.getKey(), entry.getValue()); + } + } +} diff --git a/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobStoreContextModule.java b/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobStoreContextModule.java new file mode 100644 index 0000000..b5f70eb --- /dev/null +++ b/src/main/java/org/gaul/s3proxy/nio2blob/Nio2BlobStoreContextModule.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014-2024 Andrew Gaul + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gaul.s3proxy.nio2blob; + +import com.google.inject.AbstractModule; +import com.google.inject.Scopes; + +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.attr.ConsistencyModel; + +public final class Nio2BlobStoreContextModule extends AbstractModule { + @Override + protected void configure() { + bind(ConsistencyModel.class).toInstance(ConsistencyModel.STRICT); + bind(BlobStore.class).to(Nio2BlobStore.class).in(Scopes.SINGLETON); + } +} diff --git a/src/main/resources/checkstyle.xml b/src/main/resources/checkstyle.xml index c1e7314..46674b9 100644 --- a/src/main/resources/checkstyle.xml +++ b/src/main/resources/checkstyle.xml @@ -10,7 +10,6 @@ - diff --git a/src/test/java/org/gaul/s3proxy/AwsSdkTest.java b/src/test/java/org/gaul/s3proxy/AwsSdkTest.java index 129fd93..61a8e89 100644 --- a/src/test/java/org/gaul/s3proxy/AwsSdkTest.java +++ b/src/test/java/org/gaul/s3proxy/AwsSdkTest.java @@ -602,7 +602,10 @@ public final class AwsSdkTest { @Test public void testUpdateBlobXmlAcls() throws Exception { + // TODO: + assumeTrue(!blobStoreType.equals("transient-nio2")); assumeTrue(!Quirks.NO_BLOB_ACCESS_CONTROL.contains(blobStoreType)); + String blobName = "testUpdateBlobXmlAcls-blob"; var metadata = new ObjectMetadata(); metadata.setContentLength(BYTE_SOURCE.size()); @@ -810,6 +813,9 @@ public final class AwsSdkTest { @Test public void testHttpClient() throws Exception { + // TODO: + assumeTrue(!blobStoreType.equals("transient-nio2")); + String blobName = "blob-name"; var metadata = new ObjectMetadata(); metadata.setContentLength(BYTE_SOURCE.size()); @@ -976,6 +982,8 @@ public final class AwsSdkTest { @Test public void testBlobListRecursiveImplicitMarker() throws Exception { + assumeTrue(!blobStoreType.equals("transient-nio2")); // TODO: + ObjectListing listing = client.listObjects(containerName); assertThat(listing.getObjectSummaries()).isEmpty(); @@ -1004,6 +1012,8 @@ public final class AwsSdkTest { @Test public void testBlobListV2() throws Exception { + assumeTrue(!blobStoreType.equals("transient-nio2")); // TODO: + var metadata = new ObjectMetadata(); metadata.setContentLength(BYTE_SOURCE.size()); for (int i = 1; i < 5; ++i) { @@ -1532,6 +1542,8 @@ public final class AwsSdkTest { @Test public void testConditionalGet() throws Exception { assumeTrue(!blobStoreType.equals("b2")); + // TODO: + assumeTrue(!blobStoreType.equals("transient-nio2")); String blobName = "blob-name"; var metadata = new ObjectMetadata(); diff --git a/src/test/resources/s3proxy-transient-nio2.conf b/src/test/resources/s3proxy-transient-nio2.conf new file mode 100644 index 0000000..d527377 --- /dev/null +++ b/src/test/resources/s3proxy-transient-nio2.conf @@ -0,0 +1,16 @@ +s3proxy.endpoint=http://127.0.0.1:0 +s3proxy.secure-endpoint=https://127.0.0.1:0 +#s3proxy.service-path=s3proxy +# authorization must be aws-v2, aws-v4, aws-v2-or-v4, or none +s3proxy.authorization=aws-v2-or-v4 +s3proxy.identity=local-identity +s3proxy.credential=local-credential +s3proxy.keystore-path=keystore.jks +s3proxy.keystore-password=password + +jclouds.provider=transient-nio2 +jclouds.identity=remote-identity +jclouds.credential=remote-credential +# endpoint is optional for some providers +#jclouds.endpoint=http://127.0.0.1:8081 +jclouds.filesystem.basedir=/tmp/blobstore