--dry-run option, closes #35

pull/39/head
Simon Willison 2021-11-18 12:24:17 -08:00
rodzic 600a01f2e7
commit 7749ad2e01
3 zmienionych plików z 165 dodań i 57 usunięć

Wyświetl plik

@ -95,6 +95,7 @@ The `create` command has a number of options:
- `--policy filepath-or-string`: A custom policy document (as a file path, literal JSON string or `-` for standard input) - see below
- `--bucket-region`: If creating buckets, the region in which they should be created.
- `--silent`: Don't output details of what is happening, just output the JSON for the created access credentials at the end.
- `--dry-run`: Output details of AWS changes that would have been made without applying them.
- `--user-permissions-boundary`: Custom [permissions boundary](https://docs.aws.amazon.com`/IAM/latest/UserGuide/access_policies_boundaries.html) to use for users created by this tool. This will default to restricting those users to only interacting with S3, taking the `--read-only` option into account. Use `none` to create users without any permissions boundary at all.
### Changes that will be made to your AWS account
@ -115,6 +116,8 @@ For temporary credentials:
2. Check if an AWS role called `s3-credentials.AmazonS3FullAccess` exists. If it does not exist create it, configured to allow the user's AWS account to assume it and with the `arn:aws:iam::aws:policy/AmazonS3FullAccess` policy attached.
3. Use `STS.AssumeRole()` to return temporary credentials that are restricted to just the specified buckets and specified read-only/read-write/write-only policy.
You can run the `create` command with the `--dry-run` option to see a summary of changes that would be applied, including details of generated policy documents, without actually applying those changes.
### Using a custom policy
The policy documents applied by this tool can be seen in [policies.py](https://github.com/simonw/s3-credentials/blob/main/s3_credentials/policies.py). If you want to use a custom policy document you can do so using the `--policy` option.

Wyświetl plik

@ -147,6 +147,7 @@ class DurationParam(click.ParamType):
)
@click.option("--bucket-region", help="Region in which to create buckets")
@click.option("--silent", help="Don't show performed steps", is_flag=True)
@click.option("--dry-run", help="Show steps without executing them", is_flag=True)
@click.option(
"--user-permissions-boundary",
help=(
@ -168,6 +169,7 @@ def create(
bucket_region,
user_permissions_boundary,
silent,
dry_run,
**boto_options
):
"Create and return new AWS credentials for specified S3 buckets"
@ -185,20 +187,27 @@ def create(
permission = "read-only"
if write_only:
permission = "write-only"
s3 = make_client("s3", **boto_options)
iam = make_client("iam", **boto_options)
sts = boto3.client("sts")
s3 = None
iam = None
sts = None
if not dry_run:
s3 = make_client("s3", **boto_options)
iam = make_client("iam", **boto_options)
sts = make_client("sts", **boto_options)
# Verify buckets
for bucket in buckets:
# Create bucket if it doesn't exist
if not bucket_exists(s3, bucket):
if not create_bucket:
if dry_run or (not bucket_exists(s3, bucket)):
if (not dry_run) and (not create_bucket):
raise click.ClickException(
"Bucket does not exist: {} - try --create-bucket to create it".format(
bucket
)
)
if create_bucket:
if dry_run or create_bucket:
kwargs = {}
if bucket_region:
kwargs = {
@ -206,11 +215,23 @@ def create(
"LocationConstraint": bucket_region
}
}
s3.create_bucket(Bucket=bucket, **kwargs)
info = "Created bucket: {}".format(bucket)
if bucket_region:
info += "in region: {}".format(bucket_region)
log(info)
if dry_run:
click.echo(
"Would create bucket: '{}'{}".format(
bucket,
(
" with args {}".format(json.dumps(kwargs, indent=4))
if kwargs
else ""
),
)
)
else:
s3.create_bucket(Bucket=bucket, **kwargs)
info = "Created bucket: {}".format(bucket)
if bucket_region:
info += "in region: {}".format(bucket_region)
log(info)
# At this point the buckets definitely exist - create the inline policy
bucket_access_policy = {}
if policy:
@ -232,31 +253,39 @@ def create(
if duration:
# We're going to use sts.assume_role() rather than creating a user
s3_role_arn = ensure_s3_role_exists(iam, sts)
log("Assume role against {} for {}s".format(s3_role_arn, duration))
credentials_response = sts.assume_role(
RoleArn=s3_role_arn,
RoleSessionName="s3.{permission}.{buckets}".format(
permission="custom" if policy else permission, buckets=",".join(buckets)
),
Policy=json.dumps(bucket_access_policy),
DurationSeconds=duration,
)
if format_ == "ini":
click.echo(
(
"[default]\naws_access_key_id={}\n"
"aws_secret_access_key={}\naws_session_token={}"
).format(
credentials_response["Credentials"]["AccessKeyId"],
credentials_response["Credentials"]["SecretAccessKey"],
credentials_response["Credentials"]["SessionToken"],
)
)
if dry_run:
click.echo("Would ensure role: 's3-credentials.AmazonS3FullAccess'")
click.echo("Would assume role using following policy:")
click.echo(json.dumps(bucket_access_policy, indent=4))
else:
click.echo(
json.dumps(credentials_response["Credentials"], indent=4, default=str)
s3_role_arn = ensure_s3_role_exists(iam, sts)
log("Assume role against {} for {}s".format(s3_role_arn, duration))
credentials_response = sts.assume_role(
RoleArn=s3_role_arn,
RoleSessionName="s3.{permission}.{buckets}".format(
permission="custom" if policy else permission,
buckets=",".join(buckets),
),
Policy=json.dumps(bucket_access_policy),
DurationSeconds=duration,
)
if format_ == "ini":
click.echo(
(
"[default]\naws_access_key_id={}\n"
"aws_secret_access_key={}\naws_session_token={}"
).format(
credentials_response["Credentials"]["AccessKeyId"],
credentials_response["Credentials"]["SecretAccessKey"],
credentials_response["Credentials"]["SessionToken"],
)
)
else:
click.echo(
json.dumps(
credentials_response["Credentials"], indent=4, default=str
)
)
return
# No duration, so wo create a new user so we can issue non-expiring credentials
if not username:
@ -264,7 +293,7 @@ def create(
username = "s3.{permission}.{buckets}".format(
permission="custom" if policy else permission, buckets=",".join(buckets)
)
if not user_exists(iam, username):
if dry_run or (not user_exists(iam, username)):
kwargs = {"UserName": username}
if user_permissions_boundary != "none":
# This is a user-account level limitation, it does not grant
@ -282,11 +311,14 @@ def create(
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
)
kwargs["PermissionsBoundary"] = user_permissions_boundary
iam.create_user(**kwargs)
info = "Created user: {}".format(username)
info = " user: '{}'".format(username)
if user_permissions_boundary != "none":
info += " with permissions boundary: {}".format(user_permissions_boundary)
log(info)
info += " with permissions boundary: '{}'".format(user_permissions_boundary)
if dry_run:
click.echo("Would create{}".format(info))
else:
iam.create_user(**kwargs)
log("Created {}".format(info))
# Add inline policies to the user so they can access the buckets
for bucket in buckets:
@ -305,27 +337,40 @@ def create(
policy_dict = policies.write_only(bucket)
else:
assert False, "Unknown permission: {}".format(permission)
iam.put_user_policy(
PolicyDocument=json.dumps(policy_dict),
PolicyName=policy_name,
UserName=username,
)
log("Attached policy {} to user {}".format(policy_name, username))
if dry_run:
click.echo(
"Would attach policy called '{}' to user '{}', details:\n{}".format(
policy_name,
username,
json.dumps(policy_dict, indent=4),
)
)
else:
iam.put_user_policy(
PolicyDocument=json.dumps(policy_dict),
PolicyName=policy_name,
UserName=username,
)
log("Attached policy {} to user {}".format(policy_name, username))
# Retrieve and print out the credentials
response = iam.create_access_key(
UserName=username,
)
log("Created access key for user: {}".format(username))
if format_ == "ini":
click.echo(
("[default]\naws_access_key_id={}\n" "aws_secret_access_key={}").format(
response["AccessKey"]["AccessKeyId"],
response["AccessKey"]["SecretAccessKey"],
)
if dry_run:
click.echo("Would call create access key for user '{}'".format(username))
else:
response = iam.create_access_key(
UserName=username,
)
elif format_ == "json":
click.echo(json.dumps(response["AccessKey"], indent=4, default=str))
log("Created access key for user: {}".format(username))
if format_ == "ini":
click.echo(
("[default]\naws_access_key_id={}\n" "aws_secret_access_key={}").format(
response["AccessKey"]["AccessKeyId"],
response["AccessKey"]["SecretAccessKey"],
)
)
elif format_ == "json":
click.echo(json.dumps(response["AccessKey"], indent=4, default=str))
@cli.command()

Wyświetl plik

@ -0,0 +1,60 @@
from click.testing import CliRunner
from s3_credentials.cli import cli
import pytest
import re
import textwrap
def assert_match_with_wildcards(pattern, input):
# Pattern language is simple: '*' becomes '*?'
bits = pattern.split("*")
regex = "^{}$".format(".*?".join(re.escape(bit) for bit in bits))
print(regex)
match = re.compile(regex.strip(), re.DOTALL).match(input.strip())
if match is None:
# Build a useful message
message = "Pattern:\n{}\n\nDoes not match input:\n\n{}".format(pattern, input)
bad_bits = [bit for bit in bits if bit not in input]
if bad_bits:
message += "\nThese parts were not found in the input:\n\n"
for bit in bad_bits:
message += textwrap.indent("{}\n\n".format(bit), " ")
assert False, message
@pytest.mark.parametrize(
"options,expected",
(
(
[],
(
"""Would create bucket: 'my-bucket'
Would create user: 's3.read-write.my-bucket' with permissions boundary: 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
Would attach policy called 's3.read-write.my-bucket' to user 's3.read-write.my-bucket', details:*
Would call create access key for user 's3.read-write.my-bucket'"""
),
),
(
["--username", "frank"],
(
"""Would create bucket: 'my-bucket'
Would create user: 'frank' with permissions boundary: 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
Would attach policy called 's3.read-write.my-bucket' to user 'frank', details:*
Would call create access key for user 'frank'"""
),
),
(
["--duration", "20m"],
(
"""Would create bucket: 'my-bucket'
Would ensure role: 's3-credentials.AmazonS3FullAccess'
Would assume role using following policy:*"""
),
),
),
)
def test_dry_run(options, expected):
runner = CliRunner()
result = runner.invoke(cli, ["create", "my-bucket", "--dry-run"] + options)
assert result.exit_code == 0, result.output
assert_match_with_wildcards(expected, result.output)