From 7749ad2e014e79c9f9218c7a1e11f44a1ea68278 Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Thu, 18 Nov 2021 12:24:17 -0800 Subject: [PATCH] --dry-run option, closes #35 --- README.md | 3 + s3_credentials/cli.py | 159 +++++++++++++++++++++++++++--------------- tests/test_dry_run.py | 60 ++++++++++++++++ 3 files changed, 165 insertions(+), 57 deletions(-) create mode 100644 tests/test_dry_run.py diff --git a/README.md b/README.md index ee4078d..2899e46 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,7 @@ The `create` command has a number of options: - `--policy filepath-or-string`: A custom policy document (as a file path, literal JSON string or `-` for standard input) - see below - `--bucket-region`: If creating buckets, the region in which they should be created. - `--silent`: Don't output details of what is happening, just output the JSON for the created access credentials at the end. +- `--dry-run`: Output details of AWS changes that would have been made without applying them. - `--user-permissions-boundary`: Custom [permissions boundary](https://docs.aws.amazon.com`/IAM/latest/UserGuide/access_policies_boundaries.html) to use for users created by this tool. This will default to restricting those users to only interacting with S3, taking the `--read-only` option into account. Use `none` to create users without any permissions boundary at all. ### Changes that will be made to your AWS account @@ -115,6 +116,8 @@ For temporary credentials: 2. Check if an AWS role called `s3-credentials.AmazonS3FullAccess` exists. If it does not exist create it, configured to allow the user's AWS account to assume it and with the `arn:aws:iam::aws:policy/AmazonS3FullAccess` policy attached. 3. Use `STS.AssumeRole()` to return temporary credentials that are restricted to just the specified buckets and specified read-only/read-write/write-only policy. +You can run the `create` command with the `--dry-run` option to see a summary of changes that would be applied, including details of generated policy documents, without actually applying those changes. + ### Using a custom policy The policy documents applied by this tool can be seen in [policies.py](https://github.com/simonw/s3-credentials/blob/main/s3_credentials/policies.py). If you want to use a custom policy document you can do so using the `--policy` option. diff --git a/s3_credentials/cli.py b/s3_credentials/cli.py index da6f590..8fa13db 100644 --- a/s3_credentials/cli.py +++ b/s3_credentials/cli.py @@ -147,6 +147,7 @@ class DurationParam(click.ParamType): ) @click.option("--bucket-region", help="Region in which to create buckets") @click.option("--silent", help="Don't show performed steps", is_flag=True) +@click.option("--dry-run", help="Show steps without executing them", is_flag=True) @click.option( "--user-permissions-boundary", help=( @@ -168,6 +169,7 @@ def create( bucket_region, user_permissions_boundary, silent, + dry_run, **boto_options ): "Create and return new AWS credentials for specified S3 buckets" @@ -185,20 +187,27 @@ def create( permission = "read-only" if write_only: permission = "write-only" - s3 = make_client("s3", **boto_options) - iam = make_client("iam", **boto_options) - sts = boto3.client("sts") + + s3 = None + iam = None + sts = None + + if not dry_run: + s3 = make_client("s3", **boto_options) + iam = make_client("iam", **boto_options) + sts = make_client("sts", **boto_options) + # Verify buckets for bucket in buckets: # Create bucket if it doesn't exist - if not bucket_exists(s3, bucket): - if not create_bucket: + if dry_run or (not bucket_exists(s3, bucket)): + if (not dry_run) and (not create_bucket): raise click.ClickException( "Bucket does not exist: {} - try --create-bucket to create it".format( bucket ) ) - if create_bucket: + if dry_run or create_bucket: kwargs = {} if bucket_region: kwargs = { @@ -206,11 +215,23 @@ def create( "LocationConstraint": bucket_region } } - s3.create_bucket(Bucket=bucket, **kwargs) - info = "Created bucket: {}".format(bucket) - if bucket_region: - info += "in region: {}".format(bucket_region) - log(info) + if dry_run: + click.echo( + "Would create bucket: '{}'{}".format( + bucket, + ( + " with args {}".format(json.dumps(kwargs, indent=4)) + if kwargs + else "" + ), + ) + ) + else: + s3.create_bucket(Bucket=bucket, **kwargs) + info = "Created bucket: {}".format(bucket) + if bucket_region: + info += "in region: {}".format(bucket_region) + log(info) # At this point the buckets definitely exist - create the inline policy bucket_access_policy = {} if policy: @@ -232,31 +253,39 @@ def create( if duration: # We're going to use sts.assume_role() rather than creating a user - s3_role_arn = ensure_s3_role_exists(iam, sts) - log("Assume role against {} for {}s".format(s3_role_arn, duration)) - credentials_response = sts.assume_role( - RoleArn=s3_role_arn, - RoleSessionName="s3.{permission}.{buckets}".format( - permission="custom" if policy else permission, buckets=",".join(buckets) - ), - Policy=json.dumps(bucket_access_policy), - DurationSeconds=duration, - ) - if format_ == "ini": - click.echo( - ( - "[default]\naws_access_key_id={}\n" - "aws_secret_access_key={}\naws_session_token={}" - ).format( - credentials_response["Credentials"]["AccessKeyId"], - credentials_response["Credentials"]["SecretAccessKey"], - credentials_response["Credentials"]["SessionToken"], - ) - ) + if dry_run: + click.echo("Would ensure role: 's3-credentials.AmazonS3FullAccess'") + click.echo("Would assume role using following policy:") + click.echo(json.dumps(bucket_access_policy, indent=4)) else: - click.echo( - json.dumps(credentials_response["Credentials"], indent=4, default=str) + s3_role_arn = ensure_s3_role_exists(iam, sts) + log("Assume role against {} for {}s".format(s3_role_arn, duration)) + credentials_response = sts.assume_role( + RoleArn=s3_role_arn, + RoleSessionName="s3.{permission}.{buckets}".format( + permission="custom" if policy else permission, + buckets=",".join(buckets), + ), + Policy=json.dumps(bucket_access_policy), + DurationSeconds=duration, ) + if format_ == "ini": + click.echo( + ( + "[default]\naws_access_key_id={}\n" + "aws_secret_access_key={}\naws_session_token={}" + ).format( + credentials_response["Credentials"]["AccessKeyId"], + credentials_response["Credentials"]["SecretAccessKey"], + credentials_response["Credentials"]["SessionToken"], + ) + ) + else: + click.echo( + json.dumps( + credentials_response["Credentials"], indent=4, default=str + ) + ) return # No duration, so wo create a new user so we can issue non-expiring credentials if not username: @@ -264,7 +293,7 @@ def create( username = "s3.{permission}.{buckets}".format( permission="custom" if policy else permission, buckets=",".join(buckets) ) - if not user_exists(iam, username): + if dry_run or (not user_exists(iam, username)): kwargs = {"UserName": username} if user_permissions_boundary != "none": # This is a user-account level limitation, it does not grant @@ -282,11 +311,14 @@ def create( "arn:aws:iam::aws:policy/AmazonS3FullAccess" ) kwargs["PermissionsBoundary"] = user_permissions_boundary - iam.create_user(**kwargs) - info = "Created user: {}".format(username) + info = " user: '{}'".format(username) if user_permissions_boundary != "none": - info += " with permissions boundary: {}".format(user_permissions_boundary) - log(info) + info += " with permissions boundary: '{}'".format(user_permissions_boundary) + if dry_run: + click.echo("Would create{}".format(info)) + else: + iam.create_user(**kwargs) + log("Created {}".format(info)) # Add inline policies to the user so they can access the buckets for bucket in buckets: @@ -305,27 +337,40 @@ def create( policy_dict = policies.write_only(bucket) else: assert False, "Unknown permission: {}".format(permission) - iam.put_user_policy( - PolicyDocument=json.dumps(policy_dict), - PolicyName=policy_name, - UserName=username, - ) - log("Attached policy {} to user {}".format(policy_name, username)) + + if dry_run: + click.echo( + "Would attach policy called '{}' to user '{}', details:\n{}".format( + policy_name, + username, + json.dumps(policy_dict, indent=4), + ) + ) + else: + iam.put_user_policy( + PolicyDocument=json.dumps(policy_dict), + PolicyName=policy_name, + UserName=username, + ) + log("Attached policy {} to user {}".format(policy_name, username)) # Retrieve and print out the credentials - response = iam.create_access_key( - UserName=username, - ) - log("Created access key for user: {}".format(username)) - if format_ == "ini": - click.echo( - ("[default]\naws_access_key_id={}\n" "aws_secret_access_key={}").format( - response["AccessKey"]["AccessKeyId"], - response["AccessKey"]["SecretAccessKey"], - ) + if dry_run: + click.echo("Would call create access key for user '{}'".format(username)) + else: + response = iam.create_access_key( + UserName=username, ) - elif format_ == "json": - click.echo(json.dumps(response["AccessKey"], indent=4, default=str)) + log("Created access key for user: {}".format(username)) + if format_ == "ini": + click.echo( + ("[default]\naws_access_key_id={}\n" "aws_secret_access_key={}").format( + response["AccessKey"]["AccessKeyId"], + response["AccessKey"]["SecretAccessKey"], + ) + ) + elif format_ == "json": + click.echo(json.dumps(response["AccessKey"], indent=4, default=str)) @cli.command() diff --git a/tests/test_dry_run.py b/tests/test_dry_run.py new file mode 100644 index 0000000..24e6939 --- /dev/null +++ b/tests/test_dry_run.py @@ -0,0 +1,60 @@ +from click.testing import CliRunner +from s3_credentials.cli import cli +import pytest +import re +import textwrap + + +def assert_match_with_wildcards(pattern, input): + # Pattern language is simple: '*' becomes '*?' + bits = pattern.split("*") + regex = "^{}$".format(".*?".join(re.escape(bit) for bit in bits)) + print(regex) + match = re.compile(regex.strip(), re.DOTALL).match(input.strip()) + if match is None: + # Build a useful message + message = "Pattern:\n{}\n\nDoes not match input:\n\n{}".format(pattern, input) + bad_bits = [bit for bit in bits if bit not in input] + if bad_bits: + message += "\nThese parts were not found in the input:\n\n" + for bit in bad_bits: + message += textwrap.indent("{}\n\n".format(bit), " ") + assert False, message + + +@pytest.mark.parametrize( + "options,expected", + ( + ( + [], + ( + """Would create bucket: 'my-bucket' +Would create user: 's3.read-write.my-bucket' with permissions boundary: 'arn:aws:iam::aws:policy/AmazonS3FullAccess' +Would attach policy called 's3.read-write.my-bucket' to user 's3.read-write.my-bucket', details:* +Would call create access key for user 's3.read-write.my-bucket'""" + ), + ), + ( + ["--username", "frank"], + ( + """Would create bucket: 'my-bucket' +Would create user: 'frank' with permissions boundary: 'arn:aws:iam::aws:policy/AmazonS3FullAccess' +Would attach policy called 's3.read-write.my-bucket' to user 'frank', details:* +Would call create access key for user 'frank'""" + ), + ), + ( + ["--duration", "20m"], + ( + """Would create bucket: 'my-bucket' +Would ensure role: 's3-credentials.AmazonS3FullAccess' +Would assume role using following policy:*""" + ), + ), + ), +) +def test_dry_run(options, expected): + runner = CliRunner() + result = runner.invoke(cli, ["create", "my-bucket", "--dry-run"] + options) + assert result.exit_code == 0, result.output + assert_match_with_wildcards(expected, result.output)