From 0d8f9aa96db3e8346f776e6bd5414956c8e8214b Mon Sep 17 00:00:00 2001 From: Timur Alperovich Date: Sun, 28 Mar 2021 22:04:22 -0700 Subject: [PATCH] Add Sharded Bucket middleware Adds the sharded bucket middleware, which allows for splitting objects across multiple backend buckets for a given virtual bucket. The middleware should be configured as: s3proxy.sharded-blobstore..shards= s3proxy.sharded-blobstore..prefix=. All shards are named -, where index is an integer from 0 to - 1. If the is not supplied, the is used as the prefix. Listing the virtual bucket and multipart uploads are not supported. When listing all containers, the shards are elided from the result. Fixes #325 Fixes #351 --- README.md | 1 + src/main/java/org/gaul/s3proxy/Main.java | 10 + .../org/gaul/s3proxy/S3ProxyConstants.java | 3 + .../org/gaul/s3proxy/ShardedBlobStore.java | 648 ++++++++++++++++++ .../gaul/s3proxy/ShardedBlobStoreTest.java | 258 +++++++ src/test/java/org/gaul/s3proxy/TestUtils.java | 8 + 6 files changed, 928 insertions(+) create mode 100644 src/main/java/org/gaul/s3proxy/ShardedBlobStore.java create mode 100644 src/test/java/org/gaul/s3proxy/ShardedBlobStoreTest.java diff --git a/README.md b/README.md index 9b02549..a74de1d 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ S3Proxy can modify its behavior based on middlewares: * [eventual consistency modeling](https://github.com/gaul/s3proxy/wiki/Middleware---eventual-consistency) * [large object mocking](https://github.com/gaul/s3proxy/wiki/Middleware-large-object-mocking) * [read-only](https://github.com/gaul/s3proxy/wiki/Middleware-read-only) +* [sharded backend containers](https://github.com/gaul/s3proxy/wiki/Middleware-sharded-backend) ## Limitations diff --git a/src/main/java/org/gaul/s3proxy/Main.java b/src/main/java/org/gaul/s3proxy/Main.java index 019487d..8d86f3f 100644 --- a/src/main/java/org/gaul/s3proxy/Main.java +++ b/src/main/java/org/gaul/s3proxy/Main.java @@ -224,6 +224,16 @@ public final class Main { blobStore = ReadOnlyBlobStore.newReadOnlyBlobStore(blobStore); } + ImmutableMap shards = + ShardedBlobStore.parseBucketShards(properties); + ImmutableMap prefixes = + ShardedBlobStore.parsePrefixes(properties); + if (!shards.isEmpty()) { + System.err.println("Using sharded buckets backend"); + blobStore = ShardedBlobStore.newShardedBlobStore(blobStore, + shards, prefixes); + } + return blobStore; } diff --git a/src/main/java/org/gaul/s3proxy/S3ProxyConstants.java b/src/main/java/org/gaul/s3proxy/S3ProxyConstants.java index a07f1c2..3cb294c 100644 --- a/src/main/java/org/gaul/s3proxy/S3ProxyConstants.java +++ b/src/main/java/org/gaul/s3proxy/S3ProxyConstants.java @@ -84,6 +84,9 @@ public final class S3ProxyConstants { /** Prevent mutations. */ public static final String PROPERTY_READ_ONLY_BLOBSTORE = "s3proxy.read-only-blobstore"; + /** Shard objects across a specified number of buckets. */ + public static final String PROPERTY_SHARDED_BLOBSTORE = + "s3proxy.sharded-blobstore"; /** Maximum time skew allowed in signed requests. */ public static final String PROPERTY_MAXIMUM_TIME_SKEW = diff --git a/src/main/java/org/gaul/s3proxy/ShardedBlobStore.java b/src/main/java/org/gaul/s3proxy/ShardedBlobStore.java new file mode 100644 index 0000000..b419d85 --- /dev/null +++ b/src/main/java/org/gaul/s3proxy/ShardedBlobStore.java @@ -0,0 +1,648 @@ +/* + * Copyright 2014-2021 Andrew Gaul + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gaul.s3proxy; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.File; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; + +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.ContainerNotFoundException; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.BlobAccess; +import org.jclouds.blobstore.domain.BlobMetadata; +import org.jclouds.blobstore.domain.ContainerAccess; +import org.jclouds.blobstore.domain.MultipartPart; +import org.jclouds.blobstore.domain.MultipartUpload; +import org.jclouds.blobstore.domain.MutableStorageMetadata; +import org.jclouds.blobstore.domain.PageSet; +import org.jclouds.blobstore.domain.StorageMetadata; +import org.jclouds.blobstore.domain.internal.MutableStorageMetadataImpl; +import org.jclouds.blobstore.domain.internal.PageSetImpl; +import org.jclouds.blobstore.options.CopyOptions; +import org.jclouds.blobstore.options.CreateContainerOptions; +import org.jclouds.blobstore.options.GetOptions; +import org.jclouds.blobstore.options.ListContainerOptions; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.blobstore.util.ForwardingBlobStore; +import org.jclouds.domain.Location; +import org.jclouds.io.Payload; + +/** + * This class implements the ability to split objects destined for specified + * buckets across multiple backend buckets. The sharding is only applied to + * the configured buckets. Each sharded bucket must specify the number of + * shards in the form: + * s3proxy.sharded-blobstore.<bucket name>.shards=<integer>. + * The number of shards is limited to 1000. An optional prefix can be + * specified to use for shard names, like so: + * s3proxy.sharded-blobstore.<bucket name>.prefix=<string>. + * The shards are named as follows: <prefix>-<integer>, + * corresponding to the shards from 0 to the specified number. If a + * <prefix> is not specified, the name of the bucket is used instead. + * + * Requests for all other buckets are passed through unchanged. Shards must + * be pre-created either out of band or by issuing the CreateBucket API with + * the sharded bucket name. The sharded bucket itself will not be + * instantiated on the backend. + */ +final class ShardedBlobStore extends ForwardingBlobStore { + public static final Pattern PROPERTIES_PREFIX_RE = Pattern.compile( + S3ProxyConstants.PROPERTY_SHARDED_BLOBSTORE + + "\\.(?.*)\\.prefix$"); + private static final Pattern PROPERTIES_SHARDS_RE = Pattern.compile( + S3ProxyConstants.PROPERTY_SHARDED_BLOBSTORE + + "\\.(?.*)\\.shards$"); + private static final Pattern SHARD_RE = Pattern.compile( + "(?.*)-(?[0-9]+)$"); + private static final HashFunction SHARD_HASH = Hashing.murmur3_128(); + private static final int MAX_SHARD_THREADS = 10; + private static final String SUPERBLOCK_VERSION = "1.0"; + private static final String SUPERBLOCK_BLOB_NAME = + ".s3proxy-sharded-superblock"; + private static final int MAX_SHARDS = 1000; + private final ImmutableMap buckets; + private final ImmutableMap prefixMap; + + private static final class ShardedBucket { + private final String prefix; + private final int shards; + + private ShardedBucket(String name, int shards) { + this.prefix = Objects.requireNonNull(name); + this.shards = shards; + } + } + + private ShardedBlobStore(BlobStore blobStore, + ImmutableMap shards, + ImmutableMap prefixes) { + super(blobStore); + Set missingShards = Sets.difference( + prefixes.keySet(), shards.keySet()); + if (!missingShards.isEmpty()) { + String allMissingShards = missingShards.stream().collect( + Collectors.joining(", ")); + throw new IllegalArgumentException( + String.format( + "Number of shards unset for sharded buckets: %s", + allMissingShards)); + } + ImmutableMap.Builder bucketsBuilder = + new ImmutableMap.Builder<>(); + for (String bucket : shards.keySet()) { + String prefix = prefixes.get(bucket); + if (prefix == null) { + prefix = bucket; + } + bucketsBuilder.put(bucket, new ShardedBucket(prefix, + shards.get(bucket))); + } + this.buckets = bucketsBuilder.build(); + + ImmutableMap.Builder prefixMapBuilder = + new ImmutableMap.Builder<>(); + for (String virtualBucket : buckets.keySet()) { + String prefix = buckets.get(virtualBucket).prefix; + prefixMapBuilder.put(prefix, virtualBucket); + } + this.prefixMap = prefixMapBuilder.build(); + } + + public static ImmutableMap parseBucketShards( + Properties properties) { + ImmutableMap.Builder shardsMap = + new ImmutableMap.Builder<>(); + for (String key : properties.stringPropertyNames()) { + Matcher matcher = PROPERTIES_SHARDS_RE.matcher(key); + if (!matcher.matches()) { + continue; + } + String bucket = matcher.group("bucket"); + int shards = Integer.parseInt(properties.getProperty(key)); + checkArgument(shards > 0 && shards < MAX_SHARDS, + "number of shards must be between 1 and 1000 for %s", + bucket); + shardsMap.put(bucket, shards); + } + return shardsMap.build(); + } + + public static ImmutableMap parsePrefixes( + Properties properties) { + ImmutableMap.Builder prefixesMap = + new ImmutableMap.Builder<>(); + for (String key : properties.stringPropertyNames()) { + Matcher matcher = PROPERTIES_PREFIX_RE.matcher(key); + if (!matcher.matches()) { + continue; + } + prefixesMap.put(matcher.group("bucket"), + properties.getProperty(key)); + } + return prefixesMap.build(); + } + + static ShardedBlobStore newShardedBlobStore( + BlobStore blobStore, + ImmutableMap shards, + ImmutableMap prefixes) { + return new ShardedBlobStore(blobStore, shards, prefixes); + } + + private Map createSuperblockMeta(ShardedBucket bucket) { + ImmutableMap.Builder meta = + new ImmutableMap.Builder<>(); + meta.put("s3proxy-sharded-superblock-version", SUPERBLOCK_VERSION); + meta.put("s3proxy-sharded-superblock-prefix", bucket.prefix); + meta.put("s3proxy-sharded-superblock-shards", + Integer.toString(bucket.shards)); + return meta.build(); + } + + private static String getShardContainer(ShardedBucket bucket, int shard) { + return String.format("%s-%d", bucket.prefix, shard); + } + + private String getShard(String containerName, String blob) { + ShardedBucket bucket = buckets.get(containerName); + if (bucket == null) { + return containerName; + } + HashCode hash = SHARD_HASH.hashString(blob, StandardCharsets.UTF_8); + return ShardedBlobStore.getShardContainer( + bucket, Hashing.consistentHash(hash, bucket.shards)); + } + + private void checkSuperBlock(Blob blob, Map expectedMeta, + String container) { + Map currentSuperblockMeta = + blob.getMetadata().getUserMetadata(); + for (String key : expectedMeta.keySet()) { + String current = currentSuperblockMeta.get(key); + String expected = expectedMeta.get(key); + if (!expected.equalsIgnoreCase(current)) { + throw new RuntimeException(String.format( + "Superblock block for %s does not match: %s, %s", + container, expected, current)); + } + } + } + + private boolean createShards(ShardedBucket bucket, Location location, + CreateContainerOptions options) { + ImmutableList.Builder> futuresBuilder = + new ImmutableList.Builder<>(); + ExecutorService executor = Executors.newFixedThreadPool( + Math.min(bucket.shards, MAX_SHARD_THREADS)); + BlobStore blobStore = this.delegate(); + for (int n = 0; n < bucket.shards; ++n) { + String shardContainer = ShardedBlobStore.getShardContainer( + bucket, n); + futuresBuilder.add(executor.submit( + () -> blobStore.createContainerInLocation( + location, shardContainer, options))); + } + ImmutableList> futures = futuresBuilder.build(); + executor.shutdown(); + boolean ret = true; + for (Future future : futures) { + try { + ret &= future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to create some shards", e); + } + } + + return ret; + } + + @Override + public boolean createContainerInLocation(Location location, + String container) { + return createContainerInLocation( + location, container, CreateContainerOptions.NONE); + } + + @SuppressWarnings("EmptyCatch") + @Override + public boolean createContainerInLocation( + Location location, String container, + CreateContainerOptions createContainerOptions) { + + ShardedBucket bucket = this.buckets.get(container); + if (bucket == null) { + return this.delegate().createContainerInLocation( + location, container, createContainerOptions); + } + + Map superblockMeta = this.createSuperblockMeta(bucket); + Blob superblockBlob = null; + try { + superblockBlob = this.delegate().getBlob( + ShardedBlobStore.getShardContainer(bucket, 0), + SUPERBLOCK_BLOB_NAME); + } catch (ContainerNotFoundException ignored) { + } + if (superblockBlob != null) { + checkSuperBlock(superblockBlob, superblockMeta, container); + } + + boolean ret = createShards(bucket, location, createContainerOptions); + + // Upload the superblock + if (superblockBlob == null) { + superblockBlob = this.delegate().blobBuilder(SUPERBLOCK_BLOB_NAME) + .payload("") + .userMetadata(superblockMeta) + .build(); + this.delegate().putBlob(ShardedBlobStore.getShardContainer( + bucket, 0), superblockBlob); + } + + return ret; + } + + @Override + public PageSet list() { + PageSet upstream = this.delegate().list(); + ImmutableList.Builder results = + new ImmutableList.Builder<>(); + Set virtualBuckets = new HashSet<>(); + for (StorageMetadata sm : upstream) { + Matcher matcher = SHARD_RE.matcher(sm.getName()); + if (!matcher.matches()) { + results.add(sm); + continue; + } + String prefix = matcher.group("prefix"); + String virtualBucketName = this.prefixMap.get(prefix); + if (virtualBucketName == null) { + results.add(sm); + continue; + } + if (!virtualBuckets.contains(prefix)) { + virtualBuckets.add(prefix); + MutableStorageMetadata virtualBucket = + new MutableStorageMetadataImpl(); + virtualBucket.setCreationDate(sm.getCreationDate()); + virtualBucket.setETag(sm.getETag()); + virtualBucket.setId(sm.getProviderId()); + virtualBucket.setLastModified(sm.getLastModified()); + virtualBucket.setLocation(sm.getLocation()); + virtualBucket.setName(virtualBucketName); + virtualBucket.setSize(sm.getSize()); + virtualBucket.setTier(sm.getTier()); + virtualBucket.setType(sm.getType()); + virtualBucket.setUri(sm.getUri()); + // copy the user metadata from the first shard as part + // of the response + virtualBucket.setUserMetadata(sm.getUserMetadata()); + results.add(virtualBucket); + } + } + return new PageSetImpl<>(results.build(), upstream.getNextMarker()); + } + + @Override + public PageSet list(String container) { + if (!this.buckets.containsKey(container)) { + return this.delegate().list(container); + } + // TODO: implement listing a sharded container + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public PageSet list( + String container, + ListContainerOptions options) { + if (!this.buckets.containsKey(container)) { + return this.delegate().list(container, options); + } + // TODO: implement listing a sharded container + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public boolean containerExists(String container) { + if (!this.buckets.containsKey(container)) { + return this.delegate().containerExists(container); + } + return true; + } + + @Override + public ContainerAccess getContainerAccess(String container) { + if (!this.buckets.containsKey(container)) { + return this.delegate().getContainerAccess(container); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public void setContainerAccess(String container, + ContainerAccess containerAccess) { + if (!this.buckets.containsKey(container)) { + this.delegate().setContainerAccess(container, containerAccess); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public void clearContainer(String container) { + clearContainer(container, new ListContainerOptions()); + } + + @Override + public void clearContainer(String container, ListContainerOptions options) { + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public void deleteContainer(String container) { + throw new UnsupportedOperationException("sharded bucket"); + } + + private boolean deleteShards(ShardedBucket bucket) { + ImmutableList.Builder> futuresBuilder = + new ImmutableList.Builder<>(); + ExecutorService executor = Executors.newFixedThreadPool( + Math.min(bucket.shards, MAX_SHARD_THREADS)); + for (int n = 0; n < bucket.shards; ++n) { + String shard = ShardedBlobStore.getShardContainer(bucket, n); + futuresBuilder.add(executor.submit( + () -> this.delegate().deleteContainerIfEmpty(shard))); + } + executor.shutdown(); + ImmutableList> futures = futuresBuilder.build(); + boolean ret = true; + for (Future future : futures) { + try { + ret &= future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to delete shards", e); + } + } + + return ret; + } + + @Override + public boolean deleteContainerIfEmpty(String container) { + ShardedBucket bucket = this.buckets.get(container); + if (bucket == null) { + return this.delegate().deleteContainerIfEmpty(container); + } + + String zeroShardContainer = ShardedBlobStore.getShardContainer( + bucket, 0); + PageSet listing = this.delegate().list( + zeroShardContainer); + if (listing.size() > 1) { + return false; + } + StorageMetadata sm = listing.iterator().next(); + if (!sm.getName().equals(SUPERBLOCK_BLOB_NAME)) { + return false; + } + // Remove the superblock + this.delegate().removeBlob(zeroShardContainer, SUPERBLOCK_BLOB_NAME); + return this.deleteShards(bucket); + } + + @Override + public boolean directoryExists(String container, String directory) { + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public void createDirectory(String container, String directory) { + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public void deleteDirectory(String container, String directory) { + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public boolean blobExists(String container, String name) { + return this.delegate().blobExists(this.getShard(container, name), name); + } + + @Override + public String putBlob(String containerName, Blob blob) { + return this.delegate().putBlob(this.getShard(containerName, + blob.getMetadata().getName()), blob); + } + + @Override + public String putBlob(final String containerName, Blob blob, + final PutOptions putOptions) { + return this.delegate().putBlob( + this.getShard(containerName, blob.getMetadata().getName()), + blob, putOptions); + } + + @Override + public String copyBlob(String fromContainer, String fromName, + String toContainer, String toName, + CopyOptions options) { + String srcShard = this.getShard(fromContainer, fromName); + String dstShard = this.getShard(toContainer, toName); + return this.delegate().copyBlob(srcShard, fromName, + dstShard, toName, options); + } + + @Override + public BlobMetadata blobMetadata(String container, String name) { + return this.delegate().blobMetadata(this.getShard(container, name), + name); + } + + @Override + public Blob getBlob(String containerName, String blobName) { + return this.delegate().getBlob(this.getShard(containerName, blobName), + blobName); + } + + @Override + public Blob getBlob(String containerName, String blobName, + GetOptions getOptions) { + return this.delegate() + .getBlob(this.getShard(containerName, blobName), blobName, + getOptions); + } + + @Override + public void removeBlob(String container, String name) { + this.delegate().removeBlob(this.getShard(container, name), name); + } + + @Override + public void removeBlobs(String container, Iterable iterable) { + if (!this.buckets.containsKey(container)) { + this.delegate().removeBlobs(container, iterable); + } + + Map> shardMap = new HashMap<>(); + for (String blob : iterable) { + List shardBlobs = + shardMap.computeIfAbsent(this.getShard(container, blob), + k -> new ArrayList<>()); + shardBlobs.add(blob); + } + + for (Map.Entry> entry : shardMap.entrySet()) { + this.delegate().removeBlobs(entry.getKey(), entry.getValue()); + } + } + + @Override + public BlobAccess getBlobAccess(String container, String name) { + return this.delegate() + .getBlobAccess(this.getShard(container, name), name); + } + + @Override + public void setBlobAccess(String container, String name, + BlobAccess access) { + this.delegate() + .setBlobAccess(this.getShard(container, name), name, access); + } + + @Override + public long countBlobs(String container) { + if (!this.buckets.containsKey(container)) { + return this.delegate().countBlobs(container); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public long countBlobs(String container, ListContainerOptions options) { + if (!this.buckets.containsKey(container)) { + return this.delegate().countBlobs(container, options); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public MultipartUpload initiateMultipartUpload(String container, + BlobMetadata blobMetadata, + PutOptions options) { + if (!this.buckets.containsKey(container)) { + return this.delegate() + .initiateMultipartUpload(container, blobMetadata, options); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public void abortMultipartUpload(MultipartUpload mpu) { + if (!this.buckets.containsKey(mpu.containerName())) { + this.delegate().abortMultipartUpload(mpu); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public String completeMultipartUpload(MultipartUpload mpu, + List parts) { + if (!this.buckets.containsKey(mpu.containerName())) { + return this.delegate().completeMultipartUpload(mpu, parts); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public MultipartPart uploadMultipartPart(MultipartUpload mpu, + int partNumber, Payload payload) { + if (!this.buckets.containsKey(mpu.containerName())) { + return this.delegate() + .uploadMultipartPart(mpu, partNumber, payload); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public List listMultipartUpload(MultipartUpload mpu) { + if (!this.buckets.containsKey(mpu.containerName())) { + return this.delegate().listMultipartUpload(mpu); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public List listMultipartUploads(String container) { + if (!this.buckets.containsKey(container)) { + return this.delegate().listMultipartUploads(container); + } + throw new UnsupportedOperationException("sharded bucket"); + } + + @Override + public void downloadBlob(String container, String name, File destination) { + this.delegate().downloadBlob(this.getShard(container, name), name, + destination); + } + + @Override + public void downloadBlob(String container, String name, File destination, + ExecutorService executor) { + this.delegate() + .downloadBlob(this.getShard(container, name), name, destination, + executor); + } + + @Override + public InputStream streamBlob(String container, String name) { + return this.delegate().streamBlob(this.getShard(container, name), name); + } + + @Override + public InputStream streamBlob(String container, String name, + ExecutorService executor) { + return this.delegate() + .streamBlob(this.getShard(container, name), name, executor); + } +} diff --git a/src/test/java/org/gaul/s3proxy/ShardedBlobStoreTest.java b/src/test/java/org/gaul/s3proxy/ShardedBlobStoreTest.java new file mode 100644 index 0000000..1087c8e --- /dev/null +++ b/src/test/java/org/gaul/s3proxy/ShardedBlobStoreTest.java @@ -0,0 +1,258 @@ +/* + * Copyright 2014-2021 Andrew Gaul + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gaul.s3proxy; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; +import com.google.inject.Module; + +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.PageSet; +import org.jclouds.blobstore.domain.StorageMetadata; +import org.jclouds.blobstore.options.CopyOptions; +import org.jclouds.logging.slf4j.config.SLF4JLoggingModule; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public final class ShardedBlobStoreTest { + private int shards; + private String prefix; + private String containerName; + private BlobStoreContext context; + private BlobStore blobStore; + private BlobStore shardedBlobStore; + private List createdContainers; + private ImmutableMap prefixesMap; + + @Before + public void setUp() { + containerName = TestUtils.createRandomContainerName(); + shards = 10; + prefix = TestUtils.createRandomContainerName(); + context = ContextBuilder + .newBuilder("transient") + .credentials("identity", "credential") + .modules(ImmutableList.of(new SLF4JLoggingModule())) + .build(BlobStoreContext.class); + blobStore = context.getBlobStore(); + ImmutableMap shardsMap = + new ImmutableMap.Builder() + .put(containerName, shards).build(); + prefixesMap = new ImmutableMap.Builder() + .put(containerName, prefix).build(); + shardedBlobStore = ShardedBlobStore.newShardedBlobStore( + blobStore, shardsMap, prefixesMap); + createdContainers = new ArrayList<>(); + } + + @After + public void tearDown() { + if (this.context != null) { + for (String container : this.createdContainers) { + blobStore.deleteContainer(container); + } + context.close(); + } + } + + private void createContainer(String container) { + String prefix = this.prefixesMap.get(container); + if (prefix != null) { + for (int n = 0; n < this.shards; ++n) { + this.createdContainers.add( + String.format("%s-%d", this.prefix, n)); + } + } else { + this.createdContainers.add(container); + } + assertThat(shardedBlobStore.createContainerInLocation( + null, container)).isTrue(); + } + + public int countShards() { + PageSet listing = blobStore.list(); + int blobStoreShards = 0; + for (StorageMetadata entry: listing) { + if (entry.getName().startsWith(prefix)) { + blobStoreShards++; + } + } + return blobStoreShards; + } + + @Test + public void testCreateContainer() { + this.createContainer(containerName); + assertThat(blobStore.containerExists(containerName)).isFalse(); + assertThat(this.countShards()).isEqualTo(this.shards); + } + + @Test + public void testDeleteContainer() { + this.createContainer(containerName); + assertThat(this.countShards()).isEqualTo(this.shards); + assertThat(shardedBlobStore.deleteContainerIfEmpty(containerName)) + .isTrue(); + assertThat(this.countShards()).isZero(); + } + + @Test + public void testPutBlob() throws Exception { + String blobName = "foo"; + String blobName2 = "bar"; + ByteSource content = TestUtils.randomByteSource().slice(0, 1024); + ByteSource content2 = TestUtils.randomByteSource().slice(1024, 1024); + Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content) + .build(); + Blob blob2 = shardedBlobStore.blobBuilder(blobName2).payload(content2) + .build(); + + createContainer(containerName); + shardedBlobStore.putBlob(containerName, blob); + shardedBlobStore.putBlob(containerName, blob2); + + blob = shardedBlobStore.getBlob(containerName, blobName); + try (InputStream actual = blob.getPayload().openStream(); + InputStream expected = content.openStream()) { + assertThat(actual).hasContentEqualTo(expected); + } + blob2 = shardedBlobStore.getBlob(containerName, blobName2); + try (InputStream actual = blob2.getPayload().openStream(); + InputStream expected = content2.openStream()) { + assertThat(actual).hasContentEqualTo(expected); + } + + String blobContainer = null; + String blob2Container = null; + for (int i = 0; i < shards; i++) { + String shard = String.format("%s-%d", prefix, i); + for (StorageMetadata entry : blobStore.list(shard)) { + if (entry.getName().equals(blobName)) { + blobContainer = shard; + } + if (entry.getName().equals(blobName2)) { + blob2Container = shard; + } + } + } + assertThat(blobContainer).isNotNull(); + assertThat(blob2Container).isNotNull(); + assertThat(blobContainer).isNotEqualTo(blob2Container); + } + + @Test + public void testDeleteBlob() { + String blobName = TestUtils.createRandomBlobName(); + ByteSource content = TestUtils.randomByteSource().slice(0, 1024); + Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content) + .build(); + this.createContainer(containerName); + shardedBlobStore.putBlob(containerName, blob); + assertThat(shardedBlobStore.blobExists(containerName, blobName)) + .isTrue(); + shardedBlobStore.removeBlob(containerName, blobName); + assertThat(shardedBlobStore.blobExists(containerName, blobName)) + .isFalse(); + } + + @Test + public void testPutBlobUnsharded() throws Exception { + String unshardedContainer = TestUtils.createRandomContainerName(); + String blobName = TestUtils.createRandomBlobName(); + ByteSource content = TestUtils.randomByteSource().slice(0, 1024); + Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content) + .build(); + this.createContainer(unshardedContainer); + shardedBlobStore.putBlob(unshardedContainer, blob); + blob = blobStore.getBlob(unshardedContainer, blobName); + try (InputStream actual = blob.getPayload().openStream(); + InputStream expected = content.openStream()) { + assertThat(actual).hasContentEqualTo(expected); + } + } + + @Test + public void testCopyBlob() throws Exception { + String blobName = TestUtils.createRandomBlobName(); + ByteSource content = TestUtils.randomByteSource().slice(0, 1024); + Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content) + .build(); + this.createContainer(containerName); + shardedBlobStore.putBlob(containerName, blob); + String copyBlobName = TestUtils.createRandomBlobName(); + shardedBlobStore.copyBlob( + containerName, blobName, containerName, copyBlobName, + CopyOptions.NONE); + blob = shardedBlobStore.getBlob(containerName, copyBlobName); + try (InputStream actual = blob.getPayload().openStream(); + InputStream expected = content.openStream()) { + assertThat(actual).hasContentEqualTo(expected); + } + } + + @Test + public void testCopyBlobUnshardedToSharded() throws Exception { + String blobName = TestUtils.createRandomBlobName(); + String unshardedContainer = TestUtils.createRandomContainerName(); + ByteSource content = TestUtils.randomByteSource().slice(0, 1024); + Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content) + .build(); + this.createContainer(containerName); + this.createContainer(unshardedContainer); + shardedBlobStore.putBlob(unshardedContainer, blob); + shardedBlobStore.copyBlob( + unshardedContainer, blobName, containerName, blobName, + CopyOptions.NONE); + blob = shardedBlobStore.getBlob(containerName, blobName); + try (InputStream actual = blob.getPayload().openStream(); + InputStream expected = content.openStream()) { + assertThat(actual).hasContentEqualTo(expected); + } + } + + @Test + public void testCopyBlobShardedToUnsharded() throws Exception { + String blobName = TestUtils.createRandomBlobName(); + String unshardedContainer = TestUtils.createRandomContainerName(); + ByteSource content = TestUtils.randomByteSource().slice(0, 1024); + Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content) + .build(); + this.createContainer(containerName); + this.createContainer(unshardedContainer); + shardedBlobStore.putBlob(containerName, blob); + shardedBlobStore.copyBlob( + containerName, blobName, unshardedContainer, blobName, + CopyOptions.NONE); + blob = shardedBlobStore.getBlob(unshardedContainer, blobName); + try (InputStream actual = blob.getPayload().openStream(); + InputStream expected = content.openStream()) { + assertThat(actual).hasContentEqualTo(expected); + } + } +} diff --git a/src/test/java/org/gaul/s3proxy/TestUtils.java b/src/test/java/org/gaul/s3proxy/TestUtils.java index e494536..e76e9bb 100644 --- a/src/test/java/org/gaul/s3proxy/TestUtils.java +++ b/src/test/java/org/gaul/s3proxy/TestUtils.java @@ -235,4 +235,12 @@ final class TestUtils { return info; } + + static String createRandomContainerName() { + return "container-" + new Random().nextInt(Integer.MAX_VALUE); + } + + static String createRandomBlobName() { + return "blob-" + new Random().nextInt(Integer.MAX_VALUE); + } }