kopia lustrzana https://github.com/gaul/s3proxy
Add Sharded Bucket middleware
Adds the sharded bucket middleware, which allows for splitting objects across multiple backend buckets for a given virtual bucket. The middleware should be configured as: s3proxy.sharded-blobstore.<bucket name>.shards=<number of shards> s3proxy.sharded-blobstore.<bucket name>.prefix=<prefix>. All shards are named <prefix>-<index>, where index is an integer from 0 to <number of shards> - 1. If the <prefix> is not supplied, the <bucket name> is used as the prefix. Listing the virtual bucket and multipart uploads are not supported. When listing all containers, the shards are elided from the result. Fixes #325 Fixes #351pull/366/head
rodzic
4686ede0bd
commit
0d8f9aa96d
|
@ -90,6 +90,7 @@ S3Proxy can modify its behavior based on middlewares:
|
|||
* [eventual consistency modeling](https://github.com/gaul/s3proxy/wiki/Middleware---eventual-consistency)
|
||||
* [large object mocking](https://github.com/gaul/s3proxy/wiki/Middleware-large-object-mocking)
|
||||
* [read-only](https://github.com/gaul/s3proxy/wiki/Middleware-read-only)
|
||||
* [sharded backend containers](https://github.com/gaul/s3proxy/wiki/Middleware-sharded-backend)
|
||||
|
||||
## Limitations
|
||||
|
||||
|
|
|
@ -224,6 +224,16 @@ public final class Main {
|
|||
blobStore = ReadOnlyBlobStore.newReadOnlyBlobStore(blobStore);
|
||||
}
|
||||
|
||||
ImmutableMap<String, Integer> shards =
|
||||
ShardedBlobStore.parseBucketShards(properties);
|
||||
ImmutableMap<String, String> prefixes =
|
||||
ShardedBlobStore.parsePrefixes(properties);
|
||||
if (!shards.isEmpty()) {
|
||||
System.err.println("Using sharded buckets backend");
|
||||
blobStore = ShardedBlobStore.newShardedBlobStore(blobStore,
|
||||
shards, prefixes);
|
||||
}
|
||||
|
||||
return blobStore;
|
||||
}
|
||||
|
||||
|
|
|
@ -84,6 +84,9 @@ public final class S3ProxyConstants {
|
|||
/** Prevent mutations. */
|
||||
public static final String PROPERTY_READ_ONLY_BLOBSTORE =
|
||||
"s3proxy.read-only-blobstore";
|
||||
/** Shard objects across a specified number of buckets. */
|
||||
public static final String PROPERTY_SHARDED_BLOBSTORE =
|
||||
"s3proxy.sharded-blobstore";
|
||||
|
||||
/** Maximum time skew allowed in signed requests. */
|
||||
public static final String PROPERTY_MAXIMUM_TIME_SKEW =
|
||||
|
|
|
@ -0,0 +1,648 @@
|
|||
/*
|
||||
* Copyright 2014-2021 Andrew Gaul <andrew@gaul.org>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.gaul.s3proxy;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.hash.HashCode;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
|
||||
import org.jclouds.blobstore.BlobStore;
|
||||
import org.jclouds.blobstore.ContainerNotFoundException;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.domain.BlobAccess;
|
||||
import org.jclouds.blobstore.domain.BlobMetadata;
|
||||
import org.jclouds.blobstore.domain.ContainerAccess;
|
||||
import org.jclouds.blobstore.domain.MultipartPart;
|
||||
import org.jclouds.blobstore.domain.MultipartUpload;
|
||||
import org.jclouds.blobstore.domain.MutableStorageMetadata;
|
||||
import org.jclouds.blobstore.domain.PageSet;
|
||||
import org.jclouds.blobstore.domain.StorageMetadata;
|
||||
import org.jclouds.blobstore.domain.internal.MutableStorageMetadataImpl;
|
||||
import org.jclouds.blobstore.domain.internal.PageSetImpl;
|
||||
import org.jclouds.blobstore.options.CopyOptions;
|
||||
import org.jclouds.blobstore.options.CreateContainerOptions;
|
||||
import org.jclouds.blobstore.options.GetOptions;
|
||||
import org.jclouds.blobstore.options.ListContainerOptions;
|
||||
import org.jclouds.blobstore.options.PutOptions;
|
||||
import org.jclouds.blobstore.util.ForwardingBlobStore;
|
||||
import org.jclouds.domain.Location;
|
||||
import org.jclouds.io.Payload;
|
||||
|
||||
/**
|
||||
* This class implements the ability to split objects destined for specified
|
||||
* buckets across multiple backend buckets. The sharding is only applied to
|
||||
* the configured buckets. Each sharded bucket must specify the number of
|
||||
* shards in the form:
|
||||
* s3proxy.sharded-blobstore.<bucket name>.shards=<integer>.
|
||||
* The number of shards is limited to 1000. An optional prefix can be
|
||||
* specified to use for shard names, like so:
|
||||
* s3proxy.sharded-blobstore.<bucket name>.prefix=<string>.
|
||||
* The shards are named as follows: <prefix>-<integer>,
|
||||
* corresponding to the shards from 0 to the specified number. If a
|
||||
* <prefix> is not specified, the name of the bucket is used instead.
|
||||
*
|
||||
* Requests for all other buckets are passed through unchanged. Shards must
|
||||
* be pre-created either out of band or by issuing the CreateBucket API with
|
||||
* the sharded bucket name. The sharded bucket itself will not be
|
||||
* instantiated on the backend.
|
||||
*/
|
||||
final class ShardedBlobStore extends ForwardingBlobStore {
|
||||
public static final Pattern PROPERTIES_PREFIX_RE = Pattern.compile(
|
||||
S3ProxyConstants.PROPERTY_SHARDED_BLOBSTORE +
|
||||
"\\.(?<bucket>.*)\\.prefix$");
|
||||
private static final Pattern PROPERTIES_SHARDS_RE = Pattern.compile(
|
||||
S3ProxyConstants.PROPERTY_SHARDED_BLOBSTORE +
|
||||
"\\.(?<bucket>.*)\\.shards$");
|
||||
private static final Pattern SHARD_RE = Pattern.compile(
|
||||
"(?<prefix>.*)-(?<shard>[0-9]+)$");
|
||||
private static final HashFunction SHARD_HASH = Hashing.murmur3_128();
|
||||
private static final int MAX_SHARD_THREADS = 10;
|
||||
private static final String SUPERBLOCK_VERSION = "1.0";
|
||||
private static final String SUPERBLOCK_BLOB_NAME =
|
||||
".s3proxy-sharded-superblock";
|
||||
private static final int MAX_SHARDS = 1000;
|
||||
private final ImmutableMap<String, ShardedBucket> buckets;
|
||||
private final ImmutableMap<String, String> prefixMap;
|
||||
|
||||
private static final class ShardedBucket {
|
||||
private final String prefix;
|
||||
private final int shards;
|
||||
|
||||
private ShardedBucket(String name, int shards) {
|
||||
this.prefix = Objects.requireNonNull(name);
|
||||
this.shards = shards;
|
||||
}
|
||||
}
|
||||
|
||||
private ShardedBlobStore(BlobStore blobStore,
|
||||
ImmutableMap<String, Integer> shards,
|
||||
ImmutableMap<String, String> prefixes) {
|
||||
super(blobStore);
|
||||
Set<String> missingShards = Sets.difference(
|
||||
prefixes.keySet(), shards.keySet());
|
||||
if (!missingShards.isEmpty()) {
|
||||
String allMissingShards = missingShards.stream().collect(
|
||||
Collectors.joining(", "));
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"Number of shards unset for sharded buckets: %s",
|
||||
allMissingShards));
|
||||
}
|
||||
ImmutableMap.Builder<String, ShardedBucket> bucketsBuilder =
|
||||
new ImmutableMap.Builder<>();
|
||||
for (String bucket : shards.keySet()) {
|
||||
String prefix = prefixes.get(bucket);
|
||||
if (prefix == null) {
|
||||
prefix = bucket;
|
||||
}
|
||||
bucketsBuilder.put(bucket, new ShardedBucket(prefix,
|
||||
shards.get(bucket)));
|
||||
}
|
||||
this.buckets = bucketsBuilder.build();
|
||||
|
||||
ImmutableMap.Builder<String, String> prefixMapBuilder =
|
||||
new ImmutableMap.Builder<>();
|
||||
for (String virtualBucket : buckets.keySet()) {
|
||||
String prefix = buckets.get(virtualBucket).prefix;
|
||||
prefixMapBuilder.put(prefix, virtualBucket);
|
||||
}
|
||||
this.prefixMap = prefixMapBuilder.build();
|
||||
}
|
||||
|
||||
public static ImmutableMap<String, Integer> parseBucketShards(
|
||||
Properties properties) {
|
||||
ImmutableMap.Builder<String, Integer> shardsMap =
|
||||
new ImmutableMap.Builder<>();
|
||||
for (String key : properties.stringPropertyNames()) {
|
||||
Matcher matcher = PROPERTIES_SHARDS_RE.matcher(key);
|
||||
if (!matcher.matches()) {
|
||||
continue;
|
||||
}
|
||||
String bucket = matcher.group("bucket");
|
||||
int shards = Integer.parseInt(properties.getProperty(key));
|
||||
checkArgument(shards > 0 && shards < MAX_SHARDS,
|
||||
"number of shards must be between 1 and 1000 for %s",
|
||||
bucket);
|
||||
shardsMap.put(bucket, shards);
|
||||
}
|
||||
return shardsMap.build();
|
||||
}
|
||||
|
||||
public static ImmutableMap<String, String> parsePrefixes(
|
||||
Properties properties) {
|
||||
ImmutableMap.Builder<String, String> prefixesMap =
|
||||
new ImmutableMap.Builder<>();
|
||||
for (String key : properties.stringPropertyNames()) {
|
||||
Matcher matcher = PROPERTIES_PREFIX_RE.matcher(key);
|
||||
if (!matcher.matches()) {
|
||||
continue;
|
||||
}
|
||||
prefixesMap.put(matcher.group("bucket"),
|
||||
properties.getProperty(key));
|
||||
}
|
||||
return prefixesMap.build();
|
||||
}
|
||||
|
||||
static ShardedBlobStore newShardedBlobStore(
|
||||
BlobStore blobStore,
|
||||
ImmutableMap<String, Integer> shards,
|
||||
ImmutableMap<String, String> prefixes) {
|
||||
return new ShardedBlobStore(blobStore, shards, prefixes);
|
||||
}
|
||||
|
||||
private Map<String, String> createSuperblockMeta(ShardedBucket bucket) {
|
||||
ImmutableMap.Builder<String, String> meta =
|
||||
new ImmutableMap.Builder<>();
|
||||
meta.put("s3proxy-sharded-superblock-version", SUPERBLOCK_VERSION);
|
||||
meta.put("s3proxy-sharded-superblock-prefix", bucket.prefix);
|
||||
meta.put("s3proxy-sharded-superblock-shards",
|
||||
Integer.toString(bucket.shards));
|
||||
return meta.build();
|
||||
}
|
||||
|
||||
private static String getShardContainer(ShardedBucket bucket, int shard) {
|
||||
return String.format("%s-%d", bucket.prefix, shard);
|
||||
}
|
||||
|
||||
private String getShard(String containerName, String blob) {
|
||||
ShardedBucket bucket = buckets.get(containerName);
|
||||
if (bucket == null) {
|
||||
return containerName;
|
||||
}
|
||||
HashCode hash = SHARD_HASH.hashString(blob, StandardCharsets.UTF_8);
|
||||
return ShardedBlobStore.getShardContainer(
|
||||
bucket, Hashing.consistentHash(hash, bucket.shards));
|
||||
}
|
||||
|
||||
private void checkSuperBlock(Blob blob, Map<String, String> expectedMeta,
|
||||
String container) {
|
||||
Map<String, String> currentSuperblockMeta =
|
||||
blob.getMetadata().getUserMetadata();
|
||||
for (String key : expectedMeta.keySet()) {
|
||||
String current = currentSuperblockMeta.get(key);
|
||||
String expected = expectedMeta.get(key);
|
||||
if (!expected.equalsIgnoreCase(current)) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Superblock block for %s does not match: %s, %s",
|
||||
container, expected, current));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean createShards(ShardedBucket bucket, Location location,
|
||||
CreateContainerOptions options) {
|
||||
ImmutableList.Builder<Future<Boolean>> futuresBuilder =
|
||||
new ImmutableList.Builder<>();
|
||||
ExecutorService executor = Executors.newFixedThreadPool(
|
||||
Math.min(bucket.shards, MAX_SHARD_THREADS));
|
||||
BlobStore blobStore = this.delegate();
|
||||
for (int n = 0; n < bucket.shards; ++n) {
|
||||
String shardContainer = ShardedBlobStore.getShardContainer(
|
||||
bucket, n);
|
||||
futuresBuilder.add(executor.submit(
|
||||
() -> blobStore.createContainerInLocation(
|
||||
location, shardContainer, options)));
|
||||
}
|
||||
ImmutableList<Future<Boolean>> futures = futuresBuilder.build();
|
||||
executor.shutdown();
|
||||
boolean ret = true;
|
||||
for (Future<Boolean> future : futures) {
|
||||
try {
|
||||
ret &= future.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException("Failed to create some shards", e);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean createContainerInLocation(Location location,
|
||||
String container) {
|
||||
return createContainerInLocation(
|
||||
location, container, CreateContainerOptions.NONE);
|
||||
}
|
||||
|
||||
@SuppressWarnings("EmptyCatch")
|
||||
@Override
|
||||
public boolean createContainerInLocation(
|
||||
Location location, String container,
|
||||
CreateContainerOptions createContainerOptions) {
|
||||
|
||||
ShardedBucket bucket = this.buckets.get(container);
|
||||
if (bucket == null) {
|
||||
return this.delegate().createContainerInLocation(
|
||||
location, container, createContainerOptions);
|
||||
}
|
||||
|
||||
Map<String, String> superblockMeta = this.createSuperblockMeta(bucket);
|
||||
Blob superblockBlob = null;
|
||||
try {
|
||||
superblockBlob = this.delegate().getBlob(
|
||||
ShardedBlobStore.getShardContainer(bucket, 0),
|
||||
SUPERBLOCK_BLOB_NAME);
|
||||
} catch (ContainerNotFoundException ignored) {
|
||||
}
|
||||
if (superblockBlob != null) {
|
||||
checkSuperBlock(superblockBlob, superblockMeta, container);
|
||||
}
|
||||
|
||||
boolean ret = createShards(bucket, location, createContainerOptions);
|
||||
|
||||
// Upload the superblock
|
||||
if (superblockBlob == null) {
|
||||
superblockBlob = this.delegate().blobBuilder(SUPERBLOCK_BLOB_NAME)
|
||||
.payload("")
|
||||
.userMetadata(superblockMeta)
|
||||
.build();
|
||||
this.delegate().putBlob(ShardedBlobStore.getShardContainer(
|
||||
bucket, 0), superblockBlob);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageSet<? extends StorageMetadata> list() {
|
||||
PageSet<? extends StorageMetadata> upstream = this.delegate().list();
|
||||
ImmutableList.Builder<StorageMetadata> results =
|
||||
new ImmutableList.Builder<>();
|
||||
Set<String> virtualBuckets = new HashSet<>();
|
||||
for (StorageMetadata sm : upstream) {
|
||||
Matcher matcher = SHARD_RE.matcher(sm.getName());
|
||||
if (!matcher.matches()) {
|
||||
results.add(sm);
|
||||
continue;
|
||||
}
|
||||
String prefix = matcher.group("prefix");
|
||||
String virtualBucketName = this.prefixMap.get(prefix);
|
||||
if (virtualBucketName == null) {
|
||||
results.add(sm);
|
||||
continue;
|
||||
}
|
||||
if (!virtualBuckets.contains(prefix)) {
|
||||
virtualBuckets.add(prefix);
|
||||
MutableStorageMetadata virtualBucket =
|
||||
new MutableStorageMetadataImpl();
|
||||
virtualBucket.setCreationDate(sm.getCreationDate());
|
||||
virtualBucket.setETag(sm.getETag());
|
||||
virtualBucket.setId(sm.getProviderId());
|
||||
virtualBucket.setLastModified(sm.getLastModified());
|
||||
virtualBucket.setLocation(sm.getLocation());
|
||||
virtualBucket.setName(virtualBucketName);
|
||||
virtualBucket.setSize(sm.getSize());
|
||||
virtualBucket.setTier(sm.getTier());
|
||||
virtualBucket.setType(sm.getType());
|
||||
virtualBucket.setUri(sm.getUri());
|
||||
// copy the user metadata from the first shard as part
|
||||
// of the response
|
||||
virtualBucket.setUserMetadata(sm.getUserMetadata());
|
||||
results.add(virtualBucket);
|
||||
}
|
||||
}
|
||||
return new PageSetImpl<>(results.build(), upstream.getNextMarker());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageSet<? extends StorageMetadata> list(String container) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
return this.delegate().list(container);
|
||||
}
|
||||
// TODO: implement listing a sharded container
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageSet<? extends StorageMetadata> list(
|
||||
String container,
|
||||
ListContainerOptions options) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
return this.delegate().list(container, options);
|
||||
}
|
||||
// TODO: implement listing a sharded container
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containerExists(String container) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
return this.delegate().containerExists(container);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerAccess getContainerAccess(String container) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
return this.delegate().getContainerAccess(container);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContainerAccess(String container,
|
||||
ContainerAccess containerAccess) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
this.delegate().setContainerAccess(container, containerAccess);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearContainer(String container) {
|
||||
clearContainer(container, new ListContainerOptions());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearContainer(String container, ListContainerOptions options) {
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteContainer(String container) {
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
private boolean deleteShards(ShardedBucket bucket) {
|
||||
ImmutableList.Builder<Future<Boolean>> futuresBuilder =
|
||||
new ImmutableList.Builder<>();
|
||||
ExecutorService executor = Executors.newFixedThreadPool(
|
||||
Math.min(bucket.shards, MAX_SHARD_THREADS));
|
||||
for (int n = 0; n < bucket.shards; ++n) {
|
||||
String shard = ShardedBlobStore.getShardContainer(bucket, n);
|
||||
futuresBuilder.add(executor.submit(
|
||||
() -> this.delegate().deleteContainerIfEmpty(shard)));
|
||||
}
|
||||
executor.shutdown();
|
||||
ImmutableList<Future<Boolean>> futures = futuresBuilder.build();
|
||||
boolean ret = true;
|
||||
for (Future<Boolean> future : futures) {
|
||||
try {
|
||||
ret &= future.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException("Failed to delete shards", e);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean deleteContainerIfEmpty(String container) {
|
||||
ShardedBucket bucket = this.buckets.get(container);
|
||||
if (bucket == null) {
|
||||
return this.delegate().deleteContainerIfEmpty(container);
|
||||
}
|
||||
|
||||
String zeroShardContainer = ShardedBlobStore.getShardContainer(
|
||||
bucket, 0);
|
||||
PageSet<? extends StorageMetadata> listing = this.delegate().list(
|
||||
zeroShardContainer);
|
||||
if (listing.size() > 1) {
|
||||
return false;
|
||||
}
|
||||
StorageMetadata sm = listing.iterator().next();
|
||||
if (!sm.getName().equals(SUPERBLOCK_BLOB_NAME)) {
|
||||
return false;
|
||||
}
|
||||
// Remove the superblock
|
||||
this.delegate().removeBlob(zeroShardContainer, SUPERBLOCK_BLOB_NAME);
|
||||
return this.deleteShards(bucket);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean directoryExists(String container, String directory) {
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDirectory(String container, String directory) {
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteDirectory(String container, String directory) {
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean blobExists(String container, String name) {
|
||||
return this.delegate().blobExists(this.getShard(container, name), name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String putBlob(String containerName, Blob blob) {
|
||||
return this.delegate().putBlob(this.getShard(containerName,
|
||||
blob.getMetadata().getName()), blob);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String putBlob(final String containerName, Blob blob,
|
||||
final PutOptions putOptions) {
|
||||
return this.delegate().putBlob(
|
||||
this.getShard(containerName, blob.getMetadata().getName()),
|
||||
blob, putOptions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String copyBlob(String fromContainer, String fromName,
|
||||
String toContainer, String toName,
|
||||
CopyOptions options) {
|
||||
String srcShard = this.getShard(fromContainer, fromName);
|
||||
String dstShard = this.getShard(toContainer, toName);
|
||||
return this.delegate().copyBlob(srcShard, fromName,
|
||||
dstShard, toName, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlobMetadata blobMetadata(String container, String name) {
|
||||
return this.delegate().blobMetadata(this.getShard(container, name),
|
||||
name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob getBlob(String containerName, String blobName) {
|
||||
return this.delegate().getBlob(this.getShard(containerName, blobName),
|
||||
blobName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob getBlob(String containerName, String blobName,
|
||||
GetOptions getOptions) {
|
||||
return this.delegate()
|
||||
.getBlob(this.getShard(containerName, blobName), blobName,
|
||||
getOptions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeBlob(String container, String name) {
|
||||
this.delegate().removeBlob(this.getShard(container, name), name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeBlobs(String container, Iterable<String> iterable) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
this.delegate().removeBlobs(container, iterable);
|
||||
}
|
||||
|
||||
Map<String, List<String>> shardMap = new HashMap<>();
|
||||
for (String blob : iterable) {
|
||||
List<String> shardBlobs =
|
||||
shardMap.computeIfAbsent(this.getShard(container, blob),
|
||||
k -> new ArrayList<>());
|
||||
shardBlobs.add(blob);
|
||||
}
|
||||
|
||||
for (Map.Entry<String, List<String>> entry : shardMap.entrySet()) {
|
||||
this.delegate().removeBlobs(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlobAccess getBlobAccess(String container, String name) {
|
||||
return this.delegate()
|
||||
.getBlobAccess(this.getShard(container, name), name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBlobAccess(String container, String name,
|
||||
BlobAccess access) {
|
||||
this.delegate()
|
||||
.setBlobAccess(this.getShard(container, name), name, access);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long countBlobs(String container) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
return this.delegate().countBlobs(container);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long countBlobs(String container, ListContainerOptions options) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
return this.delegate().countBlobs(container, options);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultipartUpload initiateMultipartUpload(String container,
|
||||
BlobMetadata blobMetadata,
|
||||
PutOptions options) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
return this.delegate()
|
||||
.initiateMultipartUpload(container, blobMetadata, options);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abortMultipartUpload(MultipartUpload mpu) {
|
||||
if (!this.buckets.containsKey(mpu.containerName())) {
|
||||
this.delegate().abortMultipartUpload(mpu);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String completeMultipartUpload(MultipartUpload mpu,
|
||||
List<MultipartPart> parts) {
|
||||
if (!this.buckets.containsKey(mpu.containerName())) {
|
||||
return this.delegate().completeMultipartUpload(mpu, parts);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultipartPart uploadMultipartPart(MultipartUpload mpu,
|
||||
int partNumber, Payload payload) {
|
||||
if (!this.buckets.containsKey(mpu.containerName())) {
|
||||
return this.delegate()
|
||||
.uploadMultipartPart(mpu, partNumber, payload);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<MultipartPart> listMultipartUpload(MultipartUpload mpu) {
|
||||
if (!this.buckets.containsKey(mpu.containerName())) {
|
||||
return this.delegate().listMultipartUpload(mpu);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<MultipartUpload> listMultipartUploads(String container) {
|
||||
if (!this.buckets.containsKey(container)) {
|
||||
return this.delegate().listMultipartUploads(container);
|
||||
}
|
||||
throw new UnsupportedOperationException("sharded bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void downloadBlob(String container, String name, File destination) {
|
||||
this.delegate().downloadBlob(this.getShard(container, name), name,
|
||||
destination);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void downloadBlob(String container, String name, File destination,
|
||||
ExecutorService executor) {
|
||||
this.delegate()
|
||||
.downloadBlob(this.getShard(container, name), name, destination,
|
||||
executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream streamBlob(String container, String name) {
|
||||
return this.delegate().streamBlob(this.getShard(container, name), name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream streamBlob(String container, String name,
|
||||
ExecutorService executor) {
|
||||
return this.delegate()
|
||||
.streamBlob(this.getShard(container, name), name, executor);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,258 @@
|
|||
/*
|
||||
* Copyright 2014-2021 Andrew Gaul <andrew@gaul.org>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.gaul.s3proxy;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.ByteSource;
|
||||
import com.google.inject.Module;
|
||||
|
||||
import org.jclouds.ContextBuilder;
|
||||
import org.jclouds.blobstore.BlobStore;
|
||||
import org.jclouds.blobstore.BlobStoreContext;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.domain.PageSet;
|
||||
import org.jclouds.blobstore.domain.StorageMetadata;
|
||||
import org.jclouds.blobstore.options.CopyOptions;
|
||||
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public final class ShardedBlobStoreTest {
|
||||
private int shards;
|
||||
private String prefix;
|
||||
private String containerName;
|
||||
private BlobStoreContext context;
|
||||
private BlobStore blobStore;
|
||||
private BlobStore shardedBlobStore;
|
||||
private List<String> createdContainers;
|
||||
private ImmutableMap<String, String> prefixesMap;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
containerName = TestUtils.createRandomContainerName();
|
||||
shards = 10;
|
||||
prefix = TestUtils.createRandomContainerName();
|
||||
context = ContextBuilder
|
||||
.newBuilder("transient")
|
||||
.credentials("identity", "credential")
|
||||
.modules(ImmutableList.<Module>of(new SLF4JLoggingModule()))
|
||||
.build(BlobStoreContext.class);
|
||||
blobStore = context.getBlobStore();
|
||||
ImmutableMap<String, Integer> shardsMap =
|
||||
new ImmutableMap.Builder<String, Integer>()
|
||||
.put(containerName, shards).build();
|
||||
prefixesMap = new ImmutableMap.Builder<String, String>()
|
||||
.put(containerName, prefix).build();
|
||||
shardedBlobStore = ShardedBlobStore.newShardedBlobStore(
|
||||
blobStore, shardsMap, prefixesMap);
|
||||
createdContainers = new ArrayList<>();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (this.context != null) {
|
||||
for (String container : this.createdContainers) {
|
||||
blobStore.deleteContainer(container);
|
||||
}
|
||||
context.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void createContainer(String container) {
|
||||
String prefix = this.prefixesMap.get(container);
|
||||
if (prefix != null) {
|
||||
for (int n = 0; n < this.shards; ++n) {
|
||||
this.createdContainers.add(
|
||||
String.format("%s-%d", this.prefix, n));
|
||||
}
|
||||
} else {
|
||||
this.createdContainers.add(container);
|
||||
}
|
||||
assertThat(shardedBlobStore.createContainerInLocation(
|
||||
null, container)).isTrue();
|
||||
}
|
||||
|
||||
public int countShards() {
|
||||
PageSet<? extends StorageMetadata> listing = blobStore.list();
|
||||
int blobStoreShards = 0;
|
||||
for (StorageMetadata entry: listing) {
|
||||
if (entry.getName().startsWith(prefix)) {
|
||||
blobStoreShards++;
|
||||
}
|
||||
}
|
||||
return blobStoreShards;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateContainer() {
|
||||
this.createContainer(containerName);
|
||||
assertThat(blobStore.containerExists(containerName)).isFalse();
|
||||
assertThat(this.countShards()).isEqualTo(this.shards);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteContainer() {
|
||||
this.createContainer(containerName);
|
||||
assertThat(this.countShards()).isEqualTo(this.shards);
|
||||
assertThat(shardedBlobStore.deleteContainerIfEmpty(containerName))
|
||||
.isTrue();
|
||||
assertThat(this.countShards()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutBlob() throws Exception {
|
||||
String blobName = "foo";
|
||||
String blobName2 = "bar";
|
||||
ByteSource content = TestUtils.randomByteSource().slice(0, 1024);
|
||||
ByteSource content2 = TestUtils.randomByteSource().slice(1024, 1024);
|
||||
Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content)
|
||||
.build();
|
||||
Blob blob2 = shardedBlobStore.blobBuilder(blobName2).payload(content2)
|
||||
.build();
|
||||
|
||||
createContainer(containerName);
|
||||
shardedBlobStore.putBlob(containerName, blob);
|
||||
shardedBlobStore.putBlob(containerName, blob2);
|
||||
|
||||
blob = shardedBlobStore.getBlob(containerName, blobName);
|
||||
try (InputStream actual = blob.getPayload().openStream();
|
||||
InputStream expected = content.openStream()) {
|
||||
assertThat(actual).hasContentEqualTo(expected);
|
||||
}
|
||||
blob2 = shardedBlobStore.getBlob(containerName, blobName2);
|
||||
try (InputStream actual = blob2.getPayload().openStream();
|
||||
InputStream expected = content2.openStream()) {
|
||||
assertThat(actual).hasContentEqualTo(expected);
|
||||
}
|
||||
|
||||
String blobContainer = null;
|
||||
String blob2Container = null;
|
||||
for (int i = 0; i < shards; i++) {
|
||||
String shard = String.format("%s-%d", prefix, i);
|
||||
for (StorageMetadata entry : blobStore.list(shard)) {
|
||||
if (entry.getName().equals(blobName)) {
|
||||
blobContainer = shard;
|
||||
}
|
||||
if (entry.getName().equals(blobName2)) {
|
||||
blob2Container = shard;
|
||||
}
|
||||
}
|
||||
}
|
||||
assertThat(blobContainer).isNotNull();
|
||||
assertThat(blob2Container).isNotNull();
|
||||
assertThat(blobContainer).isNotEqualTo(blob2Container);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteBlob() {
|
||||
String blobName = TestUtils.createRandomBlobName();
|
||||
ByteSource content = TestUtils.randomByteSource().slice(0, 1024);
|
||||
Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content)
|
||||
.build();
|
||||
this.createContainer(containerName);
|
||||
shardedBlobStore.putBlob(containerName, blob);
|
||||
assertThat(shardedBlobStore.blobExists(containerName, blobName))
|
||||
.isTrue();
|
||||
shardedBlobStore.removeBlob(containerName, blobName);
|
||||
assertThat(shardedBlobStore.blobExists(containerName, blobName))
|
||||
.isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutBlobUnsharded() throws Exception {
|
||||
String unshardedContainer = TestUtils.createRandomContainerName();
|
||||
String blobName = TestUtils.createRandomBlobName();
|
||||
ByteSource content = TestUtils.randomByteSource().slice(0, 1024);
|
||||
Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content)
|
||||
.build();
|
||||
this.createContainer(unshardedContainer);
|
||||
shardedBlobStore.putBlob(unshardedContainer, blob);
|
||||
blob = blobStore.getBlob(unshardedContainer, blobName);
|
||||
try (InputStream actual = blob.getPayload().openStream();
|
||||
InputStream expected = content.openStream()) {
|
||||
assertThat(actual).hasContentEqualTo(expected);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyBlob() throws Exception {
|
||||
String blobName = TestUtils.createRandomBlobName();
|
||||
ByteSource content = TestUtils.randomByteSource().slice(0, 1024);
|
||||
Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content)
|
||||
.build();
|
||||
this.createContainer(containerName);
|
||||
shardedBlobStore.putBlob(containerName, blob);
|
||||
String copyBlobName = TestUtils.createRandomBlobName();
|
||||
shardedBlobStore.copyBlob(
|
||||
containerName, blobName, containerName, copyBlobName,
|
||||
CopyOptions.NONE);
|
||||
blob = shardedBlobStore.getBlob(containerName, copyBlobName);
|
||||
try (InputStream actual = blob.getPayload().openStream();
|
||||
InputStream expected = content.openStream()) {
|
||||
assertThat(actual).hasContentEqualTo(expected);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyBlobUnshardedToSharded() throws Exception {
|
||||
String blobName = TestUtils.createRandomBlobName();
|
||||
String unshardedContainer = TestUtils.createRandomContainerName();
|
||||
ByteSource content = TestUtils.randomByteSource().slice(0, 1024);
|
||||
Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content)
|
||||
.build();
|
||||
this.createContainer(containerName);
|
||||
this.createContainer(unshardedContainer);
|
||||
shardedBlobStore.putBlob(unshardedContainer, blob);
|
||||
shardedBlobStore.copyBlob(
|
||||
unshardedContainer, blobName, containerName, blobName,
|
||||
CopyOptions.NONE);
|
||||
blob = shardedBlobStore.getBlob(containerName, blobName);
|
||||
try (InputStream actual = blob.getPayload().openStream();
|
||||
InputStream expected = content.openStream()) {
|
||||
assertThat(actual).hasContentEqualTo(expected);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyBlobShardedToUnsharded() throws Exception {
|
||||
String blobName = TestUtils.createRandomBlobName();
|
||||
String unshardedContainer = TestUtils.createRandomContainerName();
|
||||
ByteSource content = TestUtils.randomByteSource().slice(0, 1024);
|
||||
Blob blob = shardedBlobStore.blobBuilder(blobName).payload(content)
|
||||
.build();
|
||||
this.createContainer(containerName);
|
||||
this.createContainer(unshardedContainer);
|
||||
shardedBlobStore.putBlob(containerName, blob);
|
||||
shardedBlobStore.copyBlob(
|
||||
containerName, blobName, unshardedContainer, blobName,
|
||||
CopyOptions.NONE);
|
||||
blob = shardedBlobStore.getBlob(unshardedContainer, blobName);
|
||||
try (InputStream actual = blob.getPayload().openStream();
|
||||
InputStream expected = content.openStream()) {
|
||||
assertThat(actual).hasContentEqualTo(expected);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -235,4 +235,12 @@ final class TestUtils {
|
|||
|
||||
return info;
|
||||
}
|
||||
|
||||
static String createRandomContainerName() {
|
||||
return "container-" + new Random().nextInt(Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
static String createRandomBlobName() {
|
||||
return "blob-" + new Random().nextInt(Integer.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
|
|
Ładowanie…
Reference in New Issue