--policy option for custom policy documents, closes #14

Refs #11
pull/16/head
Simon Willison 2021-11-03 11:38:51 -07:00
rodzic cad8a87e38
commit 34d0350d28
3 zmienionych plików z 179 dodań i 12 usunięć

Wyświetl plik

@ -53,6 +53,7 @@ The command has several additional options:
- `-c, --create-bucket`: Create the buckts if they do not exist. Without this any missing buckets will be treated as an error.
- `--read-only`: The user should only be allowed to read files from the bucket.-
- `--write-only`: The user should only be allowed to write files to the bucket, but not read them. This is useful for logging use-cases.
- `--policy filepath-or-string`: A custom policy document (as a file path, literal JSON string or `-` for standard input) - see below
- `--bucket-region`: If creating buckets, the region in which they should be created.
- `--silent`: Don't output details of what is happening, just output the JSON for the created access credentials at the end.
- `--user-permissions-boundary`: Custom [permissions boundary](https://docs.aws.amazon.com`/IAM/latest/UserGuide/access_policies_boundaries.html) to use for users created by this tool. This will default to restricting those users to only interacting with S3, taking the `--read-only` option into account. Use `none` to create users without any permissions boundary at all.
@ -65,6 +66,51 @@ Here's the full sequence of events that take place when you run this command:
4. For each specified bucket, add an inline IAM policy to the user that gives them permission to either read-only, write-only or read-write against that bucket.
5. Create a new access key for that user and output the key and its secret to the console.
### Using a custom policy
The policy documents applied by this tool can be seen in [policies.py](https://github.com/simonw/s3-credentials/blob/main/s3_credentials/policies.py). If you want to use a custom policy document you can do so using the `--policy` option.
First, create your policy document as a JSON file that looks something like this:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject*", "s3:ListBucket"],
"Resource": [
"arn:aws:s3:::$!BUCKET_NAME!$",
"arn:aws:s3:::$!BUCKET_NAME!$/*"
],
}
]
}
```
Note the `$!BUCKET_NAME!$` strings - these will be replaced with the name of the relevant S3 bucket before the policy is applied.
Save that as `custom-policy.json` and apply it using the following command:
% s3-credentials create my-s3-bucket \
--policy custom-policy.json
You can also pass `-` to read from standard input, or you can pass the literal JSON string directly to the `--policy` option:
```
% s3-credentials create my-s3-bucket --policy '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject*", "s3:ListBucket"],
"Resource": [
"arn:aws:s3:::$!BUCKET_NAME!$",
"arn:aws:s3:::$!BUCKET_NAME!$/*"
],
}
]
}'
```
## Other commands
### whoami

Wyświetl plik

@ -27,6 +27,36 @@ def cli():
"A tool for creating credentials for accessing S3 buckets"
class PolicyParam(click.ParamType):
"Returns string of guaranteed well-formed JSON"
name = "policy"
def convert(self, policy, param, ctx):
if policy.strip().startswith("{"):
# Verify policy string is valid JSON
try:
json.loads(policy)
except ValueError:
self.fail("Invalid JSON string")
return policy
else:
# Assume policy is a file path or '-'
try:
with click.open_file(policy) as f:
contents = f.read()
try:
json.loads(contents)
return contents
except ValueError:
self.fail(
"{} contained invalid JSON".format(
"Input" if policy == "-" else "File"
)
)
except FileNotFoundError:
self.fail("File not found")
@cli.command()
@click.argument(
"buckets",
@ -42,6 +72,11 @@ def cli():
)
@click.option("--read-only", help="Only allow reading from the bucket", is_flag=True)
@click.option("--write-only", help="Only allow writing to the bucket", is_flag=True)
@click.option(
"--policy",
type=PolicyParam(),
help="Path to a policy.json file, or literal JSON string - $!BUCKET_NAME!$ will be replaced with the name of the bucket",
)
@click.option("--bucket-region", help="Region in which to create buckets")
@click.option("--silent", help="Don't show performed steps", is_flag=True)
@click.option(
@ -58,6 +93,7 @@ def create(
create_bucket,
read_only,
write_only,
policy,
bucket_region,
user_permissions_boundary,
silent,
@ -138,17 +174,19 @@ def create(
permission=permission,
bucket=bucket,
)
policy = {}
if permission == "read-write":
policy = policies.read_write(bucket)
elif permission == "read-only":
policy = policies.read_only(bucket)
elif permission == "write-only":
policy = policies.write_only(bucket)
if policy:
policy_dict = json.loads(policy.replace("$!BUCKET_NAME!$", bucket))
else:
assert False, "Unknown permission: {}".format(permission)
if permission == "read-write":
policy_dict = policies.read_write(bucket)
elif permission == "read-only":
policy_dict = policies.read_only(bucket)
elif permission == "write-only":
policy_dict = policies.write_only(bucket)
else:
assert False, "Unknown permission: {}".format(permission)
iam.put_user_policy(
PolicyDocument=json.dumps(policy),
PolicyDocument=json.dumps(policy_dict),
PolicyName=policy_name,
UserName=username,
)

Wyświetl plik

@ -1,3 +1,4 @@
import botocore
from click.testing import CliRunner
from s3_credentials.cli import cli
import json
@ -61,7 +62,20 @@ def test_list_buckets(mocker, option, expected):
assert result.output == expected
def test_create(mocker):
CUSTOM_POLICY = '{"custom": "policy", "bucket": "$!BUCKET_NAME!$"}'
DEFAULT_POLICY = '{"Version": "2012-10-17", "Statement": [{"Sid": "ListObjectsInBucket", "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": ["arn:aws:s3:::pytest-bucket-simonw-1"]}, {"Sid": "AllObjectActions", "Effect": "Allow", "Action": "s3:*Object", "Resource": ["arn:aws:s3:::pytest-bucket-simonw-1/*"]}]}'
@pytest.mark.parametrize(
"custom_policy,policy_strategy",
(
(False, None),
(True, "filepath"),
(True, "stdin"),
(True, "string"),
),
)
def test_create(mocker, tmpdir, custom_policy, policy_strategy):
boto3 = mocker.patch("boto3.client")
boto3.return_value = Mock()
boto3.return_value.create_access_key.return_value = {
@ -70,9 +84,25 @@ def test_create(mocker):
"SecretAccessKey": "secret",
}
}
expected_policy = DEFAULT_POLICY
runner = CliRunner()
with runner.isolated_filesystem():
result = runner.invoke(cli, ["create", "pytest-bucket-simonw-1", "-c"])
args = ["create", "pytest-bucket-simonw-1", "-c"]
kwargs = {}
if policy_strategy:
expected_policy = CUSTOM_POLICY.replace(
"$!BUCKET_NAME!$", "pytest-bucket-simonw-1"
)
if policy_strategy == "filepath":
filepath = str(tmpdir / "policy.json")
open(filepath, "w").write(CUSTOM_POLICY)
args.extend(["--policy", filepath])
elif policy_strategy == "stdin":
kwargs["input"] = CUSTOM_POLICY
args.extend(["--policy", "-"])
elif policy_strategy == "string":
args.extend(["--policy", CUSTOM_POLICY])
result = runner.invoke(cli, args, **kwargs)
assert result.exit_code == 0
assert result.output == (
"Attached policy s3.read-write.pytest-bucket-simonw-1 to user s3.read-write.pytest-bucket-simonw-1\n"
@ -84,7 +114,9 @@ def test_create(mocker):
"call('iam')",
"call().head_bucket(Bucket='pytest-bucket-simonw-1')",
"call().get_user(UserName='s3.read-write.pytest-bucket-simonw-1')",
'call().put_user_policy(PolicyDocument=\'{"Version": "2012-10-17", "Statement": [{"Sid": "ListObjectsInBucket", "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": ["arn:aws:s3:::pytest-bucket-simonw-1"]}, {"Sid": "AllObjectActions", "Effect": "Allow", "Action": "s3:*Object", "Resource": ["arn:aws:s3:::pytest-bucket-simonw-1/*"]}]}\', PolicyName=\'s3.read-write.pytest-bucket-simonw-1\', UserName=\'s3.read-write.pytest-bucket-simonw-1\')',
"call().put_user_policy(PolicyDocument='{}', PolicyName='s3.read-write.pytest-bucket-simonw-1', UserName='s3.read-write.pytest-bucket-simonw-1')".format(
expected_policy
),
"call().create_access_key(UserName='s3.read-write.pytest-bucket-simonw-1')",
]
@ -182,3 +214,54 @@ def test_delete_user(mocker):
call().delete_access_key(UserName="user-123", AccessKeyId="two"),
call().delete_user(UserName="user-123"),
]
@pytest.mark.parametrize(
"strategy,expected_error",
(
("stdin", "Input contained invalid JSON"),
("filepath", "File contained invalid JSON"),
("string", "Invalid JSON string"),
),
)
@pytest.mark.parametrize("use_valid_string", (True, False))
def test_verify_create_policy_option(
tmpdir, mocker, strategy, expected_error, use_valid_string
):
# Ensure "bucket does not exist" error to terminate after verification
boto3 = mocker.patch("boto3.client")
boto3.return_value.head_bucket.side_effect = botocore.exceptions.ClientError(
error_response={}, operation_name=""
)
if use_valid_string:
content = '{"policy": "..."}'
else:
content = "{Invalid JSON"
# Only used by strategy==filepath
filepath = str(tmpdir / "policy.json")
open(filepath, "w").write(content)
runner = CliRunner()
args = ["create", "my-bucket", "--policy"]
kwargs = {}
if strategy == "stdin":
args.append("-")
kwargs["input"] = content
elif strategy == "filepath":
args.append(filepath)
elif strategy == "string":
args.append(content)
result = runner.invoke(cli, args, **kwargs)
if use_valid_string:
assert result.exit_code == 1
assert (
result.output
== "Error: Bucket does not exist: my-bucket - try --create-bucket to create it\n"
)
else:
assert result.exit_code
assert (
"Error: Invalid value for '--policy': {}".format(expected_error)
in result.output
)