s3-credentials/s3_credentials/cli.py

191 wiersze
6.2 KiB
Python

import boto3
import botocore
import click
import json
from . import policies
def bucket_exists(s3, bucket):
try:
s3.head_bucket(Bucket=bucket)
return True
except botocore.exceptions.ClientError:
return False
def user_exists(iam, username):
try:
iam.get_user(UserName=username)
return True
except iam.exceptions.NoSuchEntityException:
return False
@click.group()
@click.version_option()
def cli():
"A tool for creating credentials for accessing S3 buckets"
@cli.command()
@click.argument(
"buckets",
nargs=-1,
required=True,
)
@click.option("--username", help="Username to create or existing user to use")
@click.option(
"-c",
"--create-bucket",
help="Create buckets if they do not already exist",
is_flag=True,
)
@click.option("--read-only", help="Only allow reading from the bucket", is_flag=True)
@click.option("--write-only", help="Only allow writing to the bucket", is_flag=True)
@click.option("--bucket-region", help="Region in which to create buckets")
@click.option("--silent", help="Don't show performed steps", is_flag=True)
@click.option(
"--user-permissions-boundary",
help=(
"Custom permissions boundary to use for created users, or 'none' to "
"create without. Defaults to limiting to S3 based on "
"--read-only and --write-only options."
),
)
def create(
buckets,
username,
create_bucket,
read_only,
write_only,
bucket_region,
user_permissions_boundary,
silent,
):
"Create and return new AWS credentials for specified S3 buckets"
if read_only and write_only:
raise click.ClickException(
"Cannot use --read-only and --write-only at the same time"
)
def log(message):
if not silent:
click.echo(message, err=True)
permission = "read-write"
if read_only:
permission = "read-only"
if write_only:
permission = "write-only"
s3 = boto3.client("s3")
iam = boto3.client("iam")
# Verify buckets
for bucket in buckets:
# Create bucket if it doesn't exist
if not bucket_exists(s3, bucket):
if not create_bucket:
raise click.ClickException(
"Bucket does not exist: {} - try --create-bucket to create it".format(
bucket
)
)
if create_bucket:
kwargs = {}
if bucket_region:
kwargs = {
"CreateBucketConfiguration": {
"LocationConstraint": bucket_region
}
}
s3.create_bucket(Bucket=bucket, **kwargs)
info = "Created bucket: {}".format(bucket)
if bucket_region:
info += "in region: {}".format(bucket_region)
log(info)
# Buckets created - now create the user, if needed
if not username:
# Default username is "s3.read-write.bucket1,bucket2"
username = "s3.{permission}.{buckets}".format(
permission=permission, buckets=",".join(buckets)
)
if not user_exists(iam, username):
kwargs = {"UserName": username}
if user_permissions_boundary != "none":
# This is a user-account level limitation, it does not grant
# permissions on its own but is a useful extra level of defense
# https://github.com/simonw/s3-credentials/issues/1#issuecomment-958201717
if not user_permissions_boundary:
# Pick one based on --read-only/--write-only
if read_only:
user_permissions_boundary = (
"arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
)
else:
# Need full access in order to be able to write
user_permissions_boundary = (
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
)
kwargs["PermissionsBoundary"] = user_permissions_boundary
iam.create_user(**kwargs)
info = "Created user: {}".format(username)
if user_permissions_boundary != "none":
info += " with permissions boundary: {}".format(user_permissions_boundary)
log(info)
# Add inline policies to the user so they can access the buckets
for bucket in buckets:
policy_name = "s3.{permission}.{bucket}".format(
permission=permission,
bucket=bucket,
)
policy = {}
if permission == "read-write":
policy = policies.read_write(bucket)
elif permission == "read-only":
policy = policies.read_only(bucket)
elif permission == "write-only":
policy = policies.write_only(bucket)
else:
assert False, "Unknown permission: {}".format(permission)
iam.put_user_policy(
PolicyDocument=json.dumps(policy),
PolicyName=policy_name,
UserName=username,
)
log("Attached policy {} to user {}".format(policy_name, username))
# Retrieve and print out the credentials
response = iam.create_access_key(
UserName=username,
)
log("Created access key for user: {}".format(username))
click.echo(json.dumps(response["AccessKey"], indent=4, default=str))
@cli.command()
def whoami():
"Identify currently authenticated user"
iam = boto3.client("iam")
click.echo(json.dumps(iam.get_user()["User"], indent=4, default=str))
@cli.command()
@click.option("--array", help="Output a valid JSON array", is_flag=True)
@click.option("--nl", help="Output newline-delimited JSON", is_flag=True)
def list_users(array, nl):
"List all users"
iam = boto3.client("iam")
paginator = iam.get_paginator("list_users")
gathered = []
for response in paginator.paginate():
for user in response["Users"]:
if array:
gathered.append(user)
else:
if nl:
click.echo(json.dumps(user, default=str))
else:
click.echo(json.dumps(user, indent=4, default=str))
if gathered:
click.echo(json.dumps(gathered, indent=4, default=str))