import boto3 import botocore import click import json from . import policies def bucket_exists(s3, bucket): try: s3.head_bucket(Bucket=bucket) return True except botocore.exceptions.ClientError: return False def user_exists(iam, username): try: iam.get_user(UserName=username) return True except iam.exceptions.NoSuchEntityException: return False @click.group() @click.version_option() def cli(): "A tool for creating credentials for accessing S3 buckets" @cli.command() @click.argument( "buckets", nargs=-1, required=True, ) @click.option("--username", help="Username to create or existing user to use") @click.option( "-c", "--create-bucket", help="Create buckets if they do not already exist", is_flag=True, ) @click.option("--read-only", help="Only allow reading from the bucket", is_flag=True) @click.option("--write-only", help="Only allow writing to the bucket", is_flag=True) @click.option("--bucket-region", help="Region in which to create buckets") @click.option("--silent", help="Don't show performed steps", is_flag=True) @click.option( "--user-permissions-boundary", help=( "Custom permissions boundary to use for created users, or 'none' to " "create without. Defaults to limiting to S3 based on " "--read-only and --write-only options." ), ) def create( buckets, username, create_bucket, read_only, write_only, bucket_region, user_permissions_boundary, silent, ): "Create and return new AWS credentials for specified S3 buckets" if read_only and write_only: raise click.ClickException( "Cannot use --read-only and --write-only at the same time" ) def log(message): if not silent: click.echo(message, err=True) permission = "read-write" if read_only: permission = "read-only" if write_only: permission = "write-only" s3 = boto3.client("s3") iam = boto3.client("iam") # Verify buckets for bucket in buckets: # Create bucket if it doesn't exist if not bucket_exists(s3, bucket): if not create_bucket: raise click.ClickException( "Bucket does not exist: {} - try --create-bucket to create it".format( bucket ) ) if create_bucket: kwargs = {} if bucket_region: kwargs = { "CreateBucketConfiguration": { "LocationConstraint": bucket_region } } s3.create_bucket(Bucket=bucket, **kwargs) info = "Created bucket: {}".format(bucket) if bucket_region: info += "in region: {}".format(bucket_region) log(info) # Buckets created - now create the user, if needed if not username: # Default username is "s3.read-write.bucket1,bucket2" username = "s3.{permission}.{buckets}".format( permission=permission, buckets=",".join(buckets) ) if not user_exists(iam, username): kwargs = {"UserName": username} if user_permissions_boundary != "none": # This is a user-account level limitation, it does not grant # permissions on its own but is a useful extra level of defense # https://github.com/simonw/s3-credentials/issues/1#issuecomment-958201717 if not user_permissions_boundary: # Pick one based on --read-only/--write-only if read_only: user_permissions_boundary = ( "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess" ) else: # Need full access in order to be able to write user_permissions_boundary = ( "arn:aws:iam::aws:policy/AmazonS3FullAccess" ) kwargs["PermissionsBoundary"] = user_permissions_boundary iam.create_user(**kwargs) info = "Created user: {}".format(username) if user_permissions_boundary != "none": info += " with permissions boundary: {}".format(user_permissions_boundary) log(info) # Add inline policies to the user so they can access the buckets for bucket in buckets: policy_name = "s3.{permission}.{bucket}".format( permission=permission, bucket=bucket, ) policy = {} if permission == "read-write": policy = policies.read_write(bucket) elif permission == "read-only": policy = policies.read_only(bucket) elif permission == "write-only": policy = policies.write_only(bucket) else: assert False, "Unknown permission: {}".format(permission) iam.put_user_policy( PolicyDocument=json.dumps(policy), PolicyName=policy_name, UserName=username, ) log("Attached policy {} to user {}".format(policy_name, username)) # Retrieve and print out the credentials response = iam.create_access_key( UserName=username, ) log("Created access key for user: {}".format(username)) click.echo(json.dumps(response["AccessKey"], indent=4, default=str)) @cli.command() def whoami(): "Identify currently authenticated user" iam = boto3.client("iam") click.echo(json.dumps(iam.get_user()["User"], indent=4, default=str)) @cli.command() @click.option("--array", help="Output a valid JSON array", is_flag=True) @click.option("--nl", help="Output newline-delimited JSON", is_flag=True) def list_users(array, nl): "List all users" iam = boto3.client("iam") paginator = iam.get_paginator("list_users") gathered = [] for response in paginator.paginate(): for user in response["Users"]: if array: gathered.append(user) else: if nl: click.echo(json.dumps(user, default=str)) else: click.echo(json.dumps(user, indent=4, default=str)) if gathered: click.echo(json.dumps(gathered, indent=4, default=str))