通过Identity Store API大规模管理和审计 AWS IAM 身份中心的用户和组操作
Manage and audit AWS IAM Identity Center User and Group operations at scale using Identity Store APIs. With these APIs
import boto3
import csv
from botocore.exceptions import ClientError
def create_identity_store_user(identity_store_client, identity_store_id, user_data):
"""
Create a user in the IAM Identity Store
"""
try:
response = identity_store_client.create_user(
IdentityStoreId=identity_store_id,
UserName=user_data['userName'],
Name={
'FamilyName': user_data['lastName'],
'GivenName': user_data['firstName']
},
DisplayName=user_data['displayName'],
Emails=[
{
'Value': user_data['emailAddress'],
'Primary': True
}
]
)
print(f"Created user: {user_data['userName']}")
return response['UserId']
except ClientError as e:
if e.response['Error']['Code'] == 'ConflictException':
print(f"User {user_data['userName']} already exists")
# Get existing user ID
response = identity_store_client.list_users(
IdentityStoreId=identity_store_id,
Filters=[{
'AttributePath': 'UserName',
'AttributeValue': user_data['userName']
}]
)
return response['Users'][0]['UserId']
else:
print(f"Error creating user {user_data['userName']}: {e}")
return None
def create_group(identity_store_client, identity_store_id, group_name):
"""
Create a group in the IAM Identity Store
"""
try:
response = identity_store_client.create_group(
IdentityStoreId=identity_store_id,
DisplayName=group_name,
Description=f"Group for {group_name}"
)
print(f"Created group: {group_name}")
return response['GroupId']
except ClientError as e:
if e.response['Error']['Code'] == 'ConflictException':
print(f"Group {group_name} already exists")
# Get existing group ID
response = identity_store_client.list_groups(
IdentityStoreId=identity_store_id,
Filters=[{
'AttributePath': 'DisplayName',
'AttributeValue': group_name
}]
)
return response['Groups'][0]['GroupId']
else:
print(f"Error creating group {group_name}: {e}")
return None
def add_user_to_group(identity_store_client, identity_store_id, user_id, group_id):
"""
Add a user to a group in the IAM Identity Store
"""
try:
identity_store_client.create_group_membership(
IdentityStoreId=identity_store_id,
GroupId=group_id,
MemberId={
'UserId': user_id
}
)
print(f"Added user {user_id} to group {group_id}")
except ClientError as e:
if e.response['Error']['Code'] == 'ConflictException':
print(f"User {user_id} is already a member of group {group_id}")
else:
print(f"Error adding user to group: {e}")
def main():
# Initialize AWS clients
identity_store_client = boto3.client('identitystore')
# Get the Identity Store ID
sso_admin_client = boto3.client('sso-admin')
identity_store = sso_admin_client.list_instances()['Instances'][0]
identity_store_id = identity_store['IdentityStoreId']
# Read CSV file
with open('/Your/File/Path/users.csv', 'r') as file:
csv_reader = csv.DictReader(file)
# Track created groups
created_groups = {}
for row in csv_reader:
# Create user
user_id = create_identity_store_user(identity_store_client, identity_store_id, row)
if user_id and row['withinGroup']:
# Create group if it doesn't exist
if row['withinGroup'] not in created_groups:
group_id = create_group(identity_store_client, identity_store_id, row['withinGroup'])
created_groups[row['withinGroup']] = group_id
# Add user to group
if created_groups[row['withinGroup']]:
add_user_to_group(
identity_store_client,
identity_store_id,
user_id,
created_groups[row['withinGroup']]
)
if __name__ == "__main__":
main()最后更新于