s3-credentials/s3_credentials/cli.py

1538 wiersze
49 KiB
Python

from re import A
import boto3
import botocore
import click
import configparser
from csv import DictWriter
import fnmatch
import io
import itertools
import json
import mimetypes
import os
import pathlib
import re
import sys
import textwrap
from . import policies
PUBLIC_ACCESS_BLOCK_CONFIGURATION = {
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
def bucket_exists(s3, bucket):
try:
s3.head_bucket(Bucket=bucket)
return True
except botocore.exceptions.ClientError:
return False
def user_exists(iam, username):
try:
iam.get_user(UserName=username)
return True
except iam.exceptions.NoSuchEntityException:
return False
def common_boto3_options(fn):
for decorator in reversed(
(
click.option(
"--access-key",
help="AWS access key ID",
),
click.option(
"--secret-key",
help="AWS secret access key",
),
click.option(
"--session-token",
help="AWS session token",
),
click.option(
"--endpoint-url",
help="Custom endpoint URL",
),
click.option(
"-a",
"--auth",
type=click.File("r"),
help="Path to JSON/INI file containing credentials",
),
)
):
fn = decorator(fn)
return fn
def common_output_options(fn):
for decorator in reversed(
(
click.option("--nl", help="Output newline-delimited JSON", is_flag=True),
click.option("--csv", help="Output CSV", is_flag=True),
click.option("--tsv", help="Output TSV", is_flag=True),
)
):
fn = decorator(fn)
return fn
@click.group()
@click.version_option()
def cli():
"""
A tool for creating credentials for accessing S3 buckets
Documentation: https://s3-credentials.readthedocs.io/
"""
class PolicyParam(click.ParamType):
"Returns string of guaranteed well-formed JSON"
name = "policy"
def convert(self, policy, param, ctx):
if policy.strip().startswith("{"):
# Verify policy string is valid JSON
try:
json.loads(policy)
except ValueError:
self.fail("Invalid JSON string")
return policy
else:
# Assume policy is a file path or '-'
try:
with click.open_file(policy) as f:
contents = f.read()
try:
json.loads(contents)
return contents
except ValueError:
self.fail(
"{} contained invalid JSON".format(
"Input" if policy == "-" else "File"
)
)
except FileNotFoundError:
self.fail("File not found")
class DurationParam(click.ParamType):
name = "duration"
pattern = re.compile(r"^(\d+)(m|h|s)?$")
def convert(self, value, param, ctx):
match = self.pattern.match(value)
if match is None:
self.fail("Duration must be of form 3600s or 15m or 2h")
integer_string, suffix = match.groups()
integer = int(integer_string)
if suffix == "m":
integer *= 60
elif suffix == "h":
integer *= 3600
# Must be between 15 minutes and 12 hours
if not (15 * 60 <= integer <= 12 * 60 * 60):
self.fail("Duration must be between 15 minutes and 12 hours")
return integer
class StatementParam(click.ParamType):
"Ensures statement is valid JSON with required fields"
name = "statement"
def convert(self, statement, param, ctx):
try:
data = json.loads(statement)
except ValueError:
self.fail("Invalid JSON string")
if not isinstance(data, dict):
self.fail("JSON must be an object")
missing_keys = {"Effect", "Action", "Resource"} - data.keys()
if missing_keys:
self.fail(
"Statement JSON missing required keys: {}".format(
", ".join(sorted(missing_keys))
)
)
return data
@cli.command()
@click.argument(
"buckets",
nargs=-1,
required=True,
)
@click.option("--read-only", help="Only allow reading from the bucket", is_flag=True)
@click.option("--write-only", help="Only allow writing to the bucket", is_flag=True)
@click.option(
"--prefix", help="Restrict to keys starting with this prefix", default="*"
)
@click.option(
"extra_statements",
"--statement",
multiple=True,
type=StatementParam(),
help="JSON statement to add to the policy",
)
@click.option(
"--public-bucket",
help="Bucket policy for allowing public access",
is_flag=True,
)
def policy(buckets, read_only, write_only, prefix, extra_statements, public_bucket):
"""
Output generated JSON policy for one or more buckets
Takes the same options as s3-credentials create
To output a read-only JSON policy for a bucket:
s3-credentials policy my-bucket --read-only
"""
"Generate JSON policy for one or more buckets"
if public_bucket:
if len(buckets) != 1:
raise click.ClickException(
"--public-bucket policy can only be generated for a single bucket"
)
click.echo(
json.dumps(policies.bucket_policy_allow_all_get(buckets[0]), indent=4)
)
return
permission = "read-write"
if read_only:
permission = "read-only"
if write_only:
permission = "write-only"
statements = []
if permission == "read-write":
for bucket in buckets:
statements.extend(policies.read_write_statements(bucket, prefix))
elif permission == "read-only":
for bucket in buckets:
statements.extend(policies.read_only_statements(bucket, prefix))
elif permission == "write-only":
for bucket in buckets:
statements.extend(policies.write_only_statements(bucket, prefix))
else:
assert False, "Unknown permission: {}".format(permission)
if extra_statements:
statements.extend(extra_statements)
bucket_access_policy = policies.wrap_policy(statements)
click.echo(json.dumps(bucket_access_policy, indent=4))
@cli.command()
@click.argument(
"buckets",
nargs=-1,
required=True,
)
@click.option(
"format_",
"-f",
"--format",
type=click.Choice(["ini", "json"]),
default="json",
help="Output format for credentials",
)
@click.option(
"-d",
"--duration",
type=DurationParam(),
help="How long should these credentials work for? Default is forever, use 3600 for 3600 seconds, 15m for 15 minutes, 1h for 1 hour",
)
@click.option("--username", help="Username to create or existing user to use")
@click.option(
"-c",
"--create-bucket",
help="Create buckets if they do not already exist",
is_flag=True,
)
@click.option(
"--prefix", help="Restrict to keys starting with this prefix", default="*"
)
@click.option(
"--public",
help="Make the created bucket public: anyone will be able to download files if they know their name",
is_flag=True,
)
@click.option(
"--website",
help="Configure bucket to act as a website, using index.html and error.html",
is_flag=True,
)
@click.option("--read-only", help="Only allow reading from the bucket", is_flag=True)
@click.option("--write-only", help="Only allow writing to the bucket", is_flag=True)
@click.option(
"--policy",
type=PolicyParam(),
help="Path to a policy.json file, or literal JSON string - $!BUCKET_NAME!$ will be replaced with the name of the bucket",
)
@click.option(
"extra_statements",
"--statement",
multiple=True,
type=StatementParam(),
help="JSON statement to add to the policy",
)
@click.option("--bucket-region", help="Region in which to create buckets")
@click.option("--silent", help="Don't show performed steps", is_flag=True)
@click.option("--dry-run", help="Show steps without executing them", is_flag=True)
@click.option(
"--user-permissions-boundary",
help=(
"Custom permissions boundary to use for created users, or 'none' to "
"create without. Defaults to limiting to S3 based on "
"--read-only and --write-only options."
),
)
@common_boto3_options
def create(
buckets,
format_,
duration,
username,
create_bucket,
prefix,
public,
website,
read_only,
write_only,
policy,
extra_statements,
bucket_region,
user_permissions_boundary,
silent,
dry_run,
**boto_options,
):
"""
Create and return new AWS credentials for specified S3 buckets - optionally
also creating the bucket if it does not yet exist.
To create a new bucket and output read-write credentials:
s3-credentials create my-new-bucket -c
To create read-only credentials for an existing bucket:
s3-credentials create my-existing-bucket --read-only
To create write-only credentials that are only valid for 15 minutes:
s3-credentials create my-existing-bucket --write-only -d 15m
"""
if read_only and write_only:
raise click.ClickException(
"Cannot use --read-only and --write-only at the same time"
)
extra_statements = list(extra_statements)
def log(message):
if not silent:
click.echo(message, err=True)
permission = "read-write"
if read_only:
permission = "read-only"
if write_only:
permission = "write-only"
if not user_permissions_boundary and (policy or extra_statements):
user_permissions_boundary = "none"
if website:
public = True
s3 = None
iam = None
sts = None
if not dry_run:
s3 = make_client("s3", **boto_options)
iam = make_client("iam", **boto_options)
sts = make_client("sts", **boto_options)
# Verify buckets
for bucket in buckets:
# Create bucket if it doesn't exist
if dry_run or (not bucket_exists(s3, bucket)):
if (not dry_run) and (not create_bucket):
raise click.ClickException(
"Bucket does not exist: {} - try --create-bucket to create it".format(
bucket
)
)
if dry_run or create_bucket:
kwargs = {}
if bucket_region:
kwargs = {
"CreateBucketConfiguration": {
"LocationConstraint": bucket_region
}
}
bucket_policy = {}
if public:
bucket_policy = policies.bucket_policy_allow_all_get(bucket)
if dry_run:
click.echo(
"Would create bucket: '{}'{}".format(
bucket,
(
" with args {}".format(json.dumps(kwargs, indent=4))
if kwargs
else ""
),
)
)
if public:
click.echo(
"... then add this public access block configuration:"
)
click.echo(json.dumps(PUBLIC_ACCESS_BLOCK_CONFIGURATION))
if bucket_policy:
click.echo("... then attach the following bucket policy to it:")
click.echo(json.dumps(bucket_policy, indent=4))
if website:
click.echo(
"... then configure index.html and error.html website settings"
)
else:
s3.create_bucket(Bucket=bucket, **kwargs)
info = "Created bucket: {}".format(bucket)
if bucket_region:
info += " in region: {}".format(bucket_region)
log(info)
if public:
s3.put_public_access_block(
Bucket=bucket,
PublicAccessBlockConfiguration=PUBLIC_ACCESS_BLOCK_CONFIGURATION,
)
log("Set public access block configuration")
if bucket_policy:
s3.put_bucket_policy(
Bucket=bucket, Policy=json.dumps(bucket_policy)
)
log("Attached bucket policy allowing public access")
if website:
s3.put_bucket_website(
Bucket=bucket,
WebsiteConfiguration={
"ErrorDocument": {"Key": "error.html"},
"IndexDocument": {"Suffix": "index.html"},
},
)
log(
"Configured website: IndexDocument=index.html, ErrorDocument=error.html"
)
# At this point the buckets definitely exist - create the inline policy for assume_role()
assume_role_policy = {}
if policy:
assume_role_policy = json.loads(policy.replace("$!BUCKET_NAME!$", bucket))
else:
statements = []
if permission == "read-write":
for bucket in buckets:
statements.extend(policies.read_write_statements(bucket, prefix))
elif permission == "read-only":
for bucket in buckets:
statements.extend(policies.read_only_statements(bucket, prefix))
elif permission == "write-only":
for bucket in buckets:
statements.extend(policies.write_only_statements(bucket, prefix))
else:
assert False, "Unknown permission: {}".format(permission)
statements.extend(extra_statements)
assume_role_policy = policies.wrap_policy(statements)
if duration:
# We're going to use sts.assume_role() rather than creating a user
if dry_run:
click.echo("Would ensure role: 's3-credentials.AmazonS3FullAccess'")
click.echo(
"Would assume role using following policy for {} seconds:".format(
duration
)
)
click.echo(json.dumps(assume_role_policy, indent=4))
else:
s3_role_arn = ensure_s3_role_exists(iam, sts)
log("Assume role against {} for {}s".format(s3_role_arn, duration))
credentials_response = sts.assume_role(
RoleArn=s3_role_arn,
RoleSessionName="s3.{permission}.{buckets}".format(
permission="custom" if (policy or extra_statements) else permission,
buckets=",".join(buckets),
),
Policy=json.dumps(assume_role_policy),
DurationSeconds=duration,
)
if format_ == "ini":
click.echo(
(
"[default]\naws_access_key_id={}\n"
"aws_secret_access_key={}\naws_session_token={}"
).format(
credentials_response["Credentials"]["AccessKeyId"],
credentials_response["Credentials"]["SecretAccessKey"],
credentials_response["Credentials"]["SessionToken"],
)
)
else:
click.echo(
json.dumps(
credentials_response["Credentials"], indent=4, default=str
)
)
return
# No duration, so wo create a new user so we can issue non-expiring credentials
if not username:
# Default username is "s3.read-write.bucket1,bucket2"
username = "s3.{permission}.{buckets}".format(
permission="custom" if (policy or extra_statements) else permission,
buckets=",".join(buckets),
)
if dry_run or (not user_exists(iam, username)):
kwargs = {"UserName": username}
if user_permissions_boundary != "none":
# This is a user-account level limitation, it does not grant
# permissions on its own but is a useful extra level of defense
# https://github.com/simonw/s3-credentials/issues/1#issuecomment-958201717
if not user_permissions_boundary:
# Pick one based on --read-only/--write-only
if read_only:
user_permissions_boundary = (
"arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
)
else:
# Need full access in order to be able to write
user_permissions_boundary = (
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
)
kwargs["PermissionsBoundary"] = user_permissions_boundary
info = " user: '{}'".format(username)
if user_permissions_boundary != "none":
info += " with permissions boundary: '{}'".format(user_permissions_boundary)
if dry_run:
click.echo("Would create{}".format(info))
else:
iam.create_user(**kwargs)
log("Created {}".format(info))
# Add inline policies to the user so they can access the buckets
user_policy = {}
for bucket in buckets:
policy_name = "s3.{permission}.{bucket}".format(
permission="custom" if (policy or extra_statements) else permission,
bucket=bucket,
)
if policy:
user_policy = json.loads(policy.replace("$!BUCKET_NAME!$", bucket))
else:
if permission == "read-write":
user_policy = policies.read_write(bucket, prefix, extra_statements)
elif permission == "read-only":
user_policy = policies.read_only(bucket, prefix, extra_statements)
elif permission == "write-only":
user_policy = policies.write_only(bucket, prefix, extra_statements)
else:
assert False, "Unknown permission: {}".format(permission)
if dry_run:
click.echo(
"Would attach policy called '{}' to user '{}', details:\n{}".format(
policy_name,
username,
json.dumps(user_policy, indent=4),
)
)
else:
iam.put_user_policy(
PolicyDocument=json.dumps(user_policy),
PolicyName=policy_name,
UserName=username,
)
log("Attached policy {} to user {}".format(policy_name, username))
# Retrieve and print out the credentials
if dry_run:
click.echo("Would call create access key for user '{}'".format(username))
else:
response = iam.create_access_key(
UserName=username,
)
log("Created access key for user: {}".format(username))
if format_ == "ini":
click.echo(
("[default]\naws_access_key_id={}\n" "aws_secret_access_key={}").format(
response["AccessKey"]["AccessKeyId"],
response["AccessKey"]["SecretAccessKey"],
)
)
elif format_ == "json":
click.echo(json.dumps(response["AccessKey"], indent=4, default=str))
@cli.command()
@common_boto3_options
def whoami(**boto_options):
"Identify currently authenticated user"
sts = make_client("sts", **boto_options)
identity = sts.get_caller_identity()
identity.pop("ResponseMetadata")
click.echo(json.dumps(identity, indent=4, default=str))
@cli.command()
@common_output_options
@common_boto3_options
def list_users(nl, csv, tsv, **boto_options):
"""
List all users for this account
s3-credentials list-users
Add --csv or --csv for CSV or TSV format:
s3-credentials list-users --csv
"""
iam = make_client("iam", **boto_options)
output(
paginate(iam, "list_users", "Users"),
(
"UserName",
"UserId",
"Arn",
"Path",
"CreateDate",
"PasswordLastUsed",
"PermissionsBoundary",
"Tags",
),
nl,
csv,
tsv,
)
@cli.command()
@click.argument("role_names", nargs=-1)
@click.option("--details", help="Include attached policies (slower)", is_flag=True)
@common_output_options
@common_boto3_options
def list_roles(role_names, details, nl, csv, tsv, **boto_options):
"""
List roles
To list all roles for this AWS account:
s3-credentials list-roles
Add --csv or --csv for CSV or TSV format:
s3-credentials list-roles --csv
For extra details per role (much slower) add --details
s3-credentials list-roles --details
"""
iam = make_client("iam", **boto_options)
headers = (
"Path",
"RoleName",
"RoleId",
"Arn",
"CreateDate",
"AssumeRolePolicyDocument",
"Description",
"MaxSessionDuration",
"PermissionsBoundary",
"Tags",
"RoleLastUsed",
)
if details:
headers += ("inline_policies", "attached_policies")
def iterate():
for role in paginate(iam, "list_roles", "Roles"):
if role_names and role["RoleName"] not in role_names:
continue
if details:
role_name = role["RoleName"]
role["inline_policies"] = []
# Get inline policy names, then policy for each one
for policy_name in paginate(
iam, "list_role_policies", "PolicyNames", RoleName=role_name
):
role_policy_response = iam.get_role_policy(
RoleName=role_name,
PolicyName=policy_name,
)
role_policy_response.pop("ResponseMetadata", None)
role["inline_policies"].append(role_policy_response)
# Get attached managed policies
role["attached_policies"] = []
for attached in paginate(
iam,
"list_attached_role_policies",
"AttachedPolicies",
RoleName=role_name,
):
policy_arn = attached["PolicyArn"]
attached_policy_response = iam.get_policy(
PolicyArn=policy_arn,
)
policy_details = attached_policy_response["Policy"]
# Also need to fetch the policy JSON
version_id = policy_details["DefaultVersionId"]
policy_version_response = iam.get_policy_version(
PolicyArn=policy_arn,
VersionId=version_id,
)
policy_details["PolicyVersion"] = policy_version_response[
"PolicyVersion"
]
role["attached_policies"].append(policy_details)
yield role
output(iterate(), headers, nl, csv, tsv)
@cli.command()
@click.argument("usernames", nargs=-1)
@common_boto3_options
def list_user_policies(usernames, **boto_options):
"""
List inline policies for specified users
s3-credentials list-user-policies username
Returns policies for all users if no usernames are provided.
"""
iam = make_client("iam", **boto_options)
if not usernames:
usernames = [user["UserName"] for user in paginate(iam, "list_users", "Users")]
for username in usernames:
click.echo("User: {}".format(username))
for policy_name in paginate(
iam, "list_user_policies", "PolicyNames", UserName=username
):
click.echo("PolicyName: {}".format(policy_name))
policy_response = iam.get_user_policy(
UserName=username, PolicyName=policy_name
)
click.echo(
json.dumps(policy_response["PolicyDocument"], indent=4, default=str)
)
@cli.command()
@click.argument("buckets", nargs=-1)
@click.option("--details", help="Include extra bucket details (slower)", is_flag=True)
@common_output_options
@common_boto3_options
def list_buckets(buckets, details, nl, csv, tsv, **boto_options):
"""
List buckets
To list all buckets and their creation time as JSON:
s3-credentials list-buckets
Add --csv or --csv for CSV or TSV format:
s3-credentials list-buckets --csv
For extra details per bucket (much slower) add --details
s3-credentials list-buckets --details
"""
s3 = make_client("s3", **boto_options)
headers = ["Name", "CreationDate"]
if details:
headers += ["bucket_acl", "public_access_block", "bucket_website"]
def iterator():
for bucket in s3.list_buckets()["Buckets"]:
if buckets and (bucket["Name"] not in buckets):
continue
if details:
bucket_acl = dict(
(key, value)
for key, value in s3.get_bucket_acl(
Bucket=bucket["Name"],
).items()
if key != "ResponseMetadata"
)
region = s3.get_bucket_location(Bucket=bucket["Name"])[
"LocationConstraint"
]
if region is None:
# "Buckets in Region us-east-1 have a LocationConstraint of null"
region = "us-east-1"
try:
pab = s3.get_public_access_block(
Bucket=bucket["Name"],
)["PublicAccessBlockConfiguration"]
except s3.exceptions.ClientError:
pab = None
try:
bucket_website = dict(
(key, value)
for key, value in s3.get_bucket_website(
Bucket=bucket["Name"],
).items()
if key != "ResponseMetadata"
)
bucket_website["url"] = (
"http://{}.s3-website.{}.amazonaws.com/".format(
bucket["Name"], region
)
)
except s3.exceptions.ClientError:
bucket_website = None
bucket["region"] = region
bucket["bucket_acl"] = bucket_acl
bucket["public_access_block"] = pab
bucket["bucket_website"] = bucket_website
yield bucket
output(iterator(), headers, nl, csv, tsv)
@cli.command()
@click.argument("usernames", nargs=-1, required=True)
@common_boto3_options
def delete_user(usernames, **boto_options):
"""
Delete specified users, their access keys and their inline policies
s3-credentials delete-user username1 username2
"""
iam = make_client("iam", **boto_options)
for username in usernames:
click.echo("User: {}".format(username))
# Fetch and delete their policies
policy_names_to_delete = list(
paginate(iam, "list_user_policies", "PolicyNames", UserName=username)
)
for policy_name in policy_names_to_delete:
iam.delete_user_policy(
UserName=username,
PolicyName=policy_name,
)
click.echo(" Deleted policy: {}".format(policy_name))
# Fetch and delete their access keys
access_key_ids_to_delete = [
access_key["AccessKeyId"]
for access_key in paginate(
iam, "list_access_keys", "AccessKeyMetadata", UserName=username
)
]
for access_key_id in access_key_ids_to_delete:
iam.delete_access_key(
UserName=username,
AccessKeyId=access_key_id,
)
click.echo(" Deleted access key: {}".format(access_key_id))
iam.delete_user(UserName=username)
click.echo(" Deleted user")
def make_client(service, access_key, secret_key, session_token, endpoint_url, auth):
if auth:
if access_key or secret_key or session_token:
raise click.ClickException(
"--auth cannot be used with --access-key, --secret-key or --session-token"
)
auth_content = auth.read().strip()
if auth_content.startswith("{"):
# Treat as JSON
decoded = json.loads(auth_content)
access_key = decoded.get("AccessKeyId")
secret_key = decoded.get("SecretAccessKey")
session_token = decoded.get("SessionToken")
else:
# Treat as INI
config = configparser.ConfigParser()
config.read_string(auth_content)
# Use the first section that has an aws_access_key_id
for section in config.sections():
if "aws_access_key_id" in config[section]:
access_key = config[section].get("aws_access_key_id")
secret_key = config[section].get("aws_secret_access_key")
session_token = config[section].get("aws_session_token")
break
kwargs = {}
if access_key:
kwargs["aws_access_key_id"] = access_key
if secret_key:
kwargs["aws_secret_access_key"] = secret_key
if session_token:
kwargs["aws_session_token"] = session_token
if endpoint_url:
kwargs["endpoint_url"] = endpoint_url
return boto3.client(service, **kwargs)
def ensure_s3_role_exists(iam, sts):
"Create s3-credentials.AmazonS3FullAccess role if not exists, return ARN"
role_name = "s3-credentials.AmazonS3FullAccess"
account_id = sts.get_caller_identity()["Account"]
try:
role = iam.get_role(RoleName=role_name)
return role["Role"]["Arn"]
except iam.exceptions.NoSuchEntityException:
create_role_response = iam.create_role(
Description=(
"Role used by the s3-credentials tool to create time-limited "
"credentials that are restricted to specific buckets"
),
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::{}:root".format(account_id)
},
"Action": "sts:AssumeRole",
}
],
}
),
MaxSessionDuration=12 * 60 * 60,
)
# Attach AmazonS3FullAccess to it - note that even though we use full access
# on the role itself any time we call sts.assume_role() we attach an additional
# policy to ensure reduced access for the temporary credentials
iam.attach_role_policy(
RoleName="s3-credentials.AmazonS3FullAccess",
PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess",
)
return create_role_response["Role"]["Arn"]
@cli.command()
@click.argument("bucket")
@click.option("--prefix", help="List keys starting with this prefix")
@common_output_options
@common_boto3_options
def list_bucket(bucket, prefix, nl, csv, tsv, **boto_options):
"""
List contents of bucket
To list the contents of a bucket as JSON:
s3-credentials list-bucket my-bucket
Add --csv or --csv for CSV or TSV format:
s3-credentials list-bucket my-bucket --csv
"""
s3 = make_client("s3", **boto_options)
kwargs = {"Bucket": bucket}
if prefix:
kwargs["Prefix"] = prefix
try:
output(
paginate(s3, "list_objects_v2", "Contents", **kwargs),
("Key", "LastModified", "ETag", "Size", "StorageClass", "Owner"),
nl,
csv,
tsv,
)
except botocore.exceptions.ClientError as e:
raise click.ClickException(e)
@cli.command()
@click.argument("bucket")
@click.argument("key")
@click.argument(
"path",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, readable=True, allow_dash=True
),
)
@click.option(
"--content-type",
help="Content-Type to use (default is auto-detected based on file extension)",
)
@click.option("silent", "-s", "--silent", is_flag=True, help="Don't show progress bar")
@common_boto3_options
def put_object(bucket, key, path, content_type, silent, **boto_options):
"""
Upload an object to an S3 bucket
To upload a file to /my-key.txt in the my-bucket bucket:
s3-credentials put-object my-bucket my-key.txt /path/to/file.txt
Use - to upload content from standard input:
echo "Hello" | s3-credentials put-object my-bucket hello.txt -
"""
s3 = make_client("s3", **boto_options)
size = None
extra_args = {}
if path == "-":
# boto needs to be able to seek
fp = io.BytesIO(sys.stdin.buffer.read())
if not silent:
size = fp.getbuffer().nbytes
else:
if not content_type:
content_type = mimetypes.guess_type(path)[0]
fp = click.open_file(path, "rb")
if not silent:
size = os.path.getsize(path)
if content_type is not None:
extra_args["ContentType"] = content_type
if not silent:
# Show progress bar
with click.progressbar(length=size, label="Uploading", file=sys.stderr) as bar:
s3.upload_fileobj(
fp, bucket, key, Callback=bar.update, ExtraArgs=extra_args
)
else:
s3.upload_fileobj(fp, bucket, key, ExtraArgs=extra_args)
@cli.command()
@click.argument("bucket")
@click.argument(
"objects",
nargs=-1,
required=True,
)
@click.option(
"--prefix",
help="Prefix to add to the files within the bucket",
)
@click.option("silent", "-s", "--silent", is_flag=True, help="Don't show progress bar")
@click.option("--dry-run", help="Show steps without executing them", is_flag=True)
@common_boto3_options
def put_objects(bucket, objects, prefix, silent, dry_run, **boto_options):
"""
Upload multiple objects to an S3 bucket
Pass one or more files to upload them:
s3-credentials put-objects my-bucket one.txt two.txt
These will be saved to the root of the bucket. To save to a different location
use the --prefix option:
s3-credentials put-objects my-bucket one.txt two.txt --prefix my-folder
This will upload them my-folder/one.txt and my-folder/two.txt.
If you pass a directory it will be uploaded recursively:
s3-credentials put-objects my-bucket my-folder
This will create keys in my-folder/... in the S3 bucket.
To upload all files in a folder to the root of the bucket instead use this:
s3-credentials put-objects my-bucket my-folder/*
"""
s3 = make_client("s3", **boto_options)
if prefix and not prefix.endswith("/"):
prefix = prefix + "/"
total_size = 0
# Figure out files to upload and their keys
paths = [] # (path, key)
for obj in objects:
path = pathlib.Path(obj)
if path.is_file():
# Just use the filename as the key
paths.append((path, path.name))
elif path.is_dir():
# Key is the relative path within the directory
for p in path.glob("**/*"):
if p.is_file():
paths.append((p, str(p.relative_to(path.parent))))
def upload(path, key, callback=None):
final_key = key
if prefix:
final_key = prefix + key
if dry_run:
click.echo("{} => s3://{}/{}".format(path, bucket, final_key))
else:
s3.upload_file(
Filename=str(path), Bucket=bucket, Key=final_key, Callback=callback
)
if not silent and not dry_run:
total_size = sum(p[0].stat().st_size for p in paths)
with click.progressbar(
length=total_size,
label="Uploading {} ({} file{})".format(
format_bytes(total_size),
len(paths),
"s" if len(paths) != 1 else "",
),
file=sys.stderr,
) as bar:
for path, key in paths:
upload(path, key, bar.update)
else:
for path, key in paths:
upload(path, key)
@cli.command()
@click.argument("bucket")
@click.argument("key")
@click.option(
"output",
"-o",
"--output",
type=click.Path(file_okay=True, dir_okay=False, writable=True, allow_dash=False),
help="Write to this file instead of stdout",
)
@common_boto3_options
def get_object(bucket, key, output, **boto_options):
"""
Download an object from an S3 bucket
To see the contents of the bucket on standard output:
s3-credentials get-object my-bucket hello.txt
To save to a file:
s3-credentials get-object my-bucket hello.txt -o hello.txt
"""
s3 = make_client("s3", **boto_options)
if not output:
fp = sys.stdout.buffer
else:
fp = click.open_file(output, "wb")
s3.download_fileobj(bucket, key, fp)
@cli.command()
@click.argument("bucket")
@click.argument(
"keys",
nargs=-1,
required=False,
)
@click.option(
"output",
"-o",
"--output",
type=click.Path(file_okay=False, dir_okay=True, writable=True, allow_dash=False),
help="Write to this directory instead of one matching the bucket name",
)
@click.option(
"patterns",
"-p",
"--pattern",
multiple=True,
help="Glob patterns for files to download, e.g. '*/*.js'",
)
@click.option("silent", "-s", "--silent", is_flag=True, help="Don't show progress bar")
@common_boto3_options
def get_objects(bucket, keys, output, patterns, silent, **boto_options):
"""
Download multiple objects from an S3 bucket
To download everything, run:
s3-credentials get-objects my-bucket
Files will be saved to a directory called my-bucket. Use -o dirname to save to a
different directory.
To download specific keys, list them:
s3-credentials get-objects my-bucket one.txt path/two.txt
To download files matching a glob-style pattern, use:
s3-credentials get-objects my-bucket --pattern '*/*.js'
"""
s3 = make_client("s3", **boto_options)
# If user specified keys and no patterns, use the keys they specified
keys_to_download = list(keys)
key_sizes = {}
if keys and not silent:
# Get sizes of those keys for progress bar
for key in keys:
try:
key_sizes[key] = s3.head_object(Bucket=bucket, Key=key)["ContentLength"]
except botocore.exceptions.ClientError:
# Ignore errors - they will be reported later
key_sizes[key] = 0
if (not keys) or patterns:
# Fetch all keys, then filter them if --pattern
all_key_infos = list(paginate(s3, "list_objects_v2", "Contents", Bucket=bucket))
if patterns:
filtered = []
for pattern in patterns:
filtered.extend(
fnmatch.filter((k["Key"] for k in all_key_infos), pattern)
)
keys_to_download.extend(filtered)
else:
keys_to_download.extend(k["Key"] for k in all_key_infos)
if not silent:
key_set = set(keys_to_download)
for key in all_key_infos:
if key["Key"] in key_set:
key_sizes[key["Key"]] = key["Size"]
output_dir = pathlib.Path(output or ".")
if not output_dir.exists():
output_dir.mkdir(parents=True)
errors = []
def download(key, callback=None):
# Ensure directory for key exists
key_dir = (output_dir / key).parent
if not key_dir.exists():
key_dir.mkdir(parents=True)
try:
s3.download_file(bucket, key, str(output_dir / key), Callback=callback)
except botocore.exceptions.ClientError as e:
errors.append("Not found: {}".format(key))
if not silent:
total_size = sum(key_sizes.values())
with click.progressbar(
length=total_size,
label="Downloading {} ({} file{})".format(
format_bytes(total_size),
len(key_sizes),
"s" if len(key_sizes) != 1 else "",
),
file=sys.stderr,
) as bar:
for key in keys_to_download:
download(key, bar.update)
else:
for key in keys_to_download:
download(key)
if errors:
raise click.ClickException("\n".join(errors))
@cli.command()
@click.argument("bucket")
@click.option(
"allowed_methods",
"-m",
"--allowed-method",
multiple=True,
help="Allowed method e.g. GET",
)
@click.option(
"allowed_headers",
"-h",
"--allowed-header",
multiple=True,
help="Allowed header e.g. Authorization",
)
@click.option(
"allowed_origins",
"-o",
"--allowed-origin",
multiple=True,
help="Allowed origin e.g. https://www.example.com/",
)
@click.option(
"expose_headers",
"-e",
"--expose-header",
multiple=True,
help="Header to expose e.g. ETag",
)
@click.option(
"max_age_seconds",
"--max-age-seconds",
type=int,
help="How long to cache preflight requests",
)
@common_boto3_options
def set_cors_policy(
bucket,
allowed_methods,
allowed_headers,
allowed_origins,
expose_headers,
max_age_seconds,
**boto_options,
):
"""
Set CORS policy for a bucket
To allow GET requests from any origin:
s3-credentials set-cors-policy my-bucket
To allow GET and PUT from a specific origin and expose ETag headers:
\b
s3-credentials set-cors-policy my-bucket \\
--allowed-method GET \\
--allowed-method PUT \\
--allowed-origin https://www.example.com/ \\
--expose-header ETag
"""
s3 = make_client("s3", **boto_options)
if not bucket_exists(s3, bucket):
raise click.ClickException("Bucket {} does not exists".format(bucket))
cors_rule = {
"ID": "set-by-s3-credentials",
"AllowedOrigins": allowed_origins or ["*"],
"AllowedHeaders": allowed_headers,
"AllowedMethods": allowed_methods or ["GET"],
"ExposeHeaders": expose_headers,
}
if max_age_seconds:
cors_rule["MaxAgeSeconds"] = max_age_seconds
try:
s3.put_bucket_cors(Bucket=bucket, CORSConfiguration={"CORSRules": [cors_rule]})
except botocore.exceptions.ClientError as e:
raise click.ClickException(e)
@cli.command()
@click.argument("bucket")
@common_boto3_options
def get_cors_policy(bucket, **boto_options):
"""
Get CORS policy for a bucket
s3-credentials get-cors-policy my-bucket
Returns the CORS policy for this bucket, if set, as JSON
"""
s3 = make_client("s3", **boto_options)
try:
response = s3.get_bucket_cors(Bucket=bucket)
except botocore.exceptions.ClientError as e:
raise click.ClickException(e)
click.echo(json.dumps(response["CORSRules"], indent=4, default=str))
def without_response_metadata(data):
return dict(
(key, value) for key, value in data.items() if key != "ResponseMetadata"
)
@cli.command()
@click.argument("bucket")
@common_boto3_options
def debug_bucket(bucket, **boto_options):
"""
Run a bunch of diagnostics to help debug a bucket
s3-credentials debug-bucket my-bucket
"""
s3 = make_client("s3", **boto_options)
try:
bucket_acl = s3.get_bucket_acl(Bucket=bucket)
click.echo("Bucket ACL:")
click.echo(json.dumps(without_response_metadata(bucket_acl), indent=4))
except Exception as ex:
print(f"Error checking bucket ACL: {ex}")
try:
bucket_policy_status = s3.get_bucket_policy_status(Bucket=bucket)
click.echo("Bucket policy status:")
click.echo(
json.dumps(without_response_metadata(bucket_policy_status), indent=4)
)
except Exception as ex:
print(f"Error checking bucket policy status: {ex}")
try:
bucket_public_access_block = s3.get_public_access_block(Bucket=bucket)
click.echo("Bucket public access block:")
click.echo(
json.dumps(without_response_metadata(bucket_public_access_block), indent=4)
)
except Exception as ex:
print(f"Error checking bucket public access block: {ex}")
@cli.command()
@click.argument("bucket")
@click.argument(
"keys",
nargs=-1,
)
@click.option(
"--prefix",
help="Delete everything with this prefix",
)
@click.option(
"silent", "-s", "--silent", is_flag=True, help="Don't show informational output"
)
@click.option(
"dry_run",
"-d",
"--dry-run",
is_flag=True,
help="Show keys that would be deleted without deleting them",
)
@common_boto3_options
def delete_objects(bucket, keys, prefix, silent, dry_run, **boto_options):
"""
Delete one or more object from an S3 bucket
Pass one or more keys to delete them:
s3-credentials delete-objects my-bucket one.txt two.txt
To delete all files matching a prefix, pass --prefix:
s3-credentials delete-objects my-bucket --prefix my-folder/
"""
s3 = make_client("s3", **boto_options)
if keys and prefix:
raise click.ClickException("Cannot pass both keys and --prefix")
if not keys and not prefix:
raise click.ClickException("Specify one or more keys or use --prefix")
if prefix:
# List all keys with this prefix
paginator = s3.get_paginator("list_objects_v2")
response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
keys = []
for response in response_iterator:
keys.extend([obj["Key"] for obj in response.get("Contents", [])])
if not silent:
click.echo(
"Deleting {} object{} from {}".format(
len(keys), "s" if len(keys) != 1 else "", bucket
),
err=True,
)
if dry_run:
click.echo("The following keys would be deleted:")
for key in keys:
click.echo(key)
return
for batch in batches(keys, 1000):
# Remove any rogue \r characters:
batch = [k.strip() for k in batch]
response = s3.delete_objects(
Bucket=bucket, Delete={"Objects": [{"Key": key} for key in batch]}
)
if response.get("Errors"):
click.echo(
"Errors deleting objects: {}".format(response["Errors"]), err=True
)
def output(iterator, headers, nl, csv, tsv):
if nl:
for item in iterator:
click.echo(json.dumps(item, default=str))
elif csv or tsv:
writer = DictWriter(
sys.stdout, headers, dialect="excel-tab" if tsv else "excel"
)
writer.writeheader()
writer.writerows(fix_json(row) for row in iterator)
else:
for line in stream_indented_json(iterator):
click.echo(line)
def stream_indented_json(iterator, indent=2):
# We have to iterate two-at-a-time so we can know if we
# should output a trailing comma or if we have reached
# the last item.
current_iter, next_iter = itertools.tee(iterator, 2)
next(next_iter, None)
first = True
for item, next_item in itertools.zip_longest(current_iter, next_iter):
is_last = next_item is None
data = item
line = "{first}{serialized}{separator}{last}".format(
first="[\n" if first else "",
serialized=textwrap.indent(
json.dumps(data, indent=indent, default=str), " " * indent
),
separator="," if not is_last else "",
last="\n]" if is_last else "",
)
yield line
first = False
if first:
# We didn't output anything, so yield the empty list
yield "[]"
def paginate(service, method, list_key, **kwargs):
paginator = service.get_paginator(method)
for response in paginator.paginate(**kwargs):
yield from response.get(list_key) or []
def fix_json(row):
# If a key value is list or dict, json encode it
return dict(
[
(
key,
(
json.dumps(value, indent=2, default=str)
if isinstance(value, (dict, list, tuple))
else value
),
)
for key, value in row.items()
]
)
def format_bytes(size):
for x in ("bytes", "KB", "MB", "GB", "TB"):
if size < 1024:
return "{:3.1f} {}".format(size, x)
size /= 1024
return size
def batches(all, batch_size):
return [all[i : i + batch_size] for i in range(0, len(all), batch_size)]