Lambda-Based Custom Policies
Add a Custom Configuration Audit Policy
In this example, a Python script is used to check if access logging is enabled for the CloudTrail S3 bucket. If it does not, a policy violation will be generated. The Python script is used to run Lambda for verification and then determines if there is a policy violation.
1. Open your AWS account and navigate to S3. Then select your CloudTrail bucket and confirm logging is disabled.
2. Navigate to the Skyhigh CASB dashboard and to Policy > Policy Templates and select Security Configuration.
3. Find the Custom policy using AWS Lambda template and select Create Policy.
4. Give the new policy a name and description.
- Name: CloudTrail access logging
- Description: Check if access logging is enabled on the CloudTrail bucket.
5. Select Upload a Lambda Script.
NOTE: This is where you could select an existing AWS Lambda script if it has already been uploaded to AWS. |
6. Select accounts and select the AWS account that has the CloudTrail log.
7. Download cloudtrail_bucket_access_logging_enabled.py.zip from Box. This is a Python package with the script we need.
8. Select file and select the cloudtrail_bucket_access_logging_enabled.py.zip.
9. Additional properties will be requested, please enter as follows:
NOTES:
|
- Function Name: shn_Cloudtrail_Bucket_Access_Logging_Enabled
- Function Description: Checks if logging is enabled on the CloudTrail bucket
- Region: us-west-1
- Runtime: Python 2.7
- Handler: cloudtrail_bucket_access_logging_enabled.check_buckets_handler
- Timeout: 200 (default)
- Memory size: 254 (default)
10. Press the Test Script button.
Select the account and run the test.
The test should run successfully and issue a message similar to the one below:
11. (Optional) Select email addresses to receive notifications when this policy triggers.
12. Review and press Save for the new policy.
13. Activate the new policy.
14. Run On-Demand Scan.
15. Review the policy incidents.
Resolve the Incident
Resolve the issue and refresh the incident.
1. Navigate to AWS S3 and find the CloudTrail bucket. Edit the properties.
2. Enable server access logging
3. Refresh the policy incident.
4. Notice the policy incident is marked as resolved.
AWS Lambda
When we uploaded the script, a function was automatically created using AWS Lambda. It can be viewed in AWS.
Notice, the function was automatically prefixed with skyhigh_shn_.
Add Support for NRT Config Audit for Lambda Policies
As of 3.9.2 we also support Lambda Config Audit policies with the NRT configuration audit capabilities. Follow these steps to add this functionality to your tenant.
1. If you have not already done so, complete the near real-time configuration auditing capabilities lab.
2. Edit the CloudTrail access logging policy you created earlier in this lab.
3. Edit the Rules and exceptions part of the rule
4. Press the Select Service button. Then select Amazon S3.
NOTE: By selecting S3, this configures this rule to run in the event S3 activity is detected from CloudTrail. Without this being selected the rule does not know when to run based on real-time activity. |
5. Press next until you get to the Done button. To exit out of the rule, press the Done button.
6. Login to Amazon S3 and create a new S3 bucket with logging disabled.
7. Wait for a policy incident in the Skyhigh CASB dashboard. This should show up within 5 minutes.
Lambda Remediation Response Samples
Name | Description | Link |
---|---|---|
incident_remediation_world_readable_s3 | Remediates the incident by making the bucket ACL private. | incident_remediation_world_readable_s3 |
incident_remediation_redshift_publicly_accessible | Remediates the incident by Disabling PubliclyAccessible. | incident_remediation_redshift_publicly_accessible |
incident_remediation_s3_object_versioning | Remediates the incident by enabling bucket versioning. | incident_remediation_s3_object_versioning |
incident_remediation_unrestricted_access_http_https_ports | Remediates the incident by deleting the security group permission. | incident_remediation_unrestricted_access_http_https_ports |
incident_remediation_unrestricted_access_policies | Remediates the incident for Unrestricted Access policies. | incident_remediation_unrestricted_access_policies |
incident_remediation_unrestricted_access_rds | Remediates the incident by Disabling PubliclyAccessible. | incident_remediation_unrestricted_access_rds |
incident_remediation_unrestricted_access_s3_bucket | Remediates the incident by making bucket ACL private. | incident_remediation_unrestricted_access_s3_bucket |
incident_remediation_unrestricted_icmp_access | Remediates the incident by deleting security group permissions. | incident_remediation_unrestricted_icmp_access |
incident_remediation_unrestricted_inbound_access | Remediates the incident by deleting security group permissions. | incident_remediation_unrestricted_inbound_access |
incident_remediation_unrestricted_outbound_access | Remediates the incident by deleting security group permissions. | incident_remediation_unrestricted_outbound_access |
Lambda Custom Policy Code Samples
Name | Description | Link |
---|---|---|
cloudtrail_integration_with_cloudwatch_enabled | Check to see if Cloudwatch has been integrated with CloudTrail. | cloudtrail_integration_with_cloudwatch_enabled |
cloudtrail_logs_encrypted_at_rest | Checks to see if CloudTrail logs are being encrypted. | cloudtrail_logs_encrypted_at_rest |
cloudtrail_multiregion_logging_enabled | Checks to see if CloudTrail multi-regions loggin has been enabled. | cloudtrail_multiregion_logging_enabled |
database_encryption_for_rds | Checks to see if encryption for RDS has been enabled. | database_encryption_for_rds |
incident_remediation_unrestricted_inbound_access | Remediates the incident by deleting security group permissions. | incident_remediation_unrestricted_inbound_access |
Lambda code samples for custom policy | Checks to see if CloudTrail Integration with CloudWatch is Enabled. | Lambda code samples for custom policy |
provisioning_access_to_resources_using_iam_roles | Checks to see if IAM roles are being used to access resources. | provisioning_access_to_resources_using_iam_roles |
publicly_writable_s3_buckets | Checks to see if there are any publicly writable S3 buckets. | publicly_writable_s3_buckets |
vpc_flowlogs_enabled | Checks to see if VPC flow logs have been enabled. | vpc_flowlogs_enabled |
Code Samples: Lambda-based Remediation
incident_remediation_world_readable_s3
import json
import boto3
def lambda_handler(event, context):
"""
Lambda for World Readable S3 bucket policy incident remediation.It will remediation the incident by making bucket ACL private.
:param event:
:param context:
:return:
"""
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
bucket_name = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"]["region"]
print "Remediation for policy {0}, bucket_name:{1} ,region:{2}".format(policy_name, bucket_name, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
s3 = boto3.client('s3', region)
s3.put_bucket_acl(Bucket=bucket_name, ACL="private")
print "Successfully Updated S3 bucket {0} ACL to Private".format(bucket_name)
incident_remediation_redshift_publicly_accessible
import json
import boto3
def lambda_handler(event, context):
"""
Lambda for Redshift Cluster Publicly Accessible policy incident remediation. Remediate the incident by Disabling PubliclyAccessible
:param event:
:param context:
:return:
"""
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
cluster_id = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"][
"region"]
print "Remediation for policy {0}, entityId:{1} ,region:{2}".format(policy_name,cluster_id,region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
redshift = boto3.client('redshift', region)
redshift_instance = redshift.describe_clusters(ClusterIdentifier=cluster_id)
for cluster in redshift_instance["Clusters"]:
if cluster["PubliclyAccessible"] == True:
redshift_modification_reponse = redshift.modify_cluster(ClusterIdentifier=cluster_id,
PubliclyAccessible=False)
if redshift_modification_reponse["Cluster"]["PubliclyAccessible"] == False:
print "Redhshift is not Publicly Accessible, clusterId: {0}".format(cluster_id)
else:
print "Redhshift is Publicly Accessible, clusterId: {0}".format(cluster_id)
incident_remediation_s3_object_versioning
import json
import boto3
def lambda_handler(event, context):
"""
Lambda for S3 object versioning enabled policy incident remediation.Remediate the incident by enabling bucket versioning
:param event:
:param context:
:return:
"""
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
bucket_name = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"][
"region"]
print "Remediation for policy {0}, bucketName:{1} ,region:{2}".format(policy_name, bucket_name, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
s3 = boto3.client('s3', region)
s3.put_bucket_versioning(Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"})
print "Versioning Enabled for bucket {0}".format(bucket_name)
incident_remediation_unrestricted_access_http_https_ports
import json
import boto3
def get_security_group_attached_to_ec2(region,instance_id):
ec2 = boto3.client('ec2', region)
reservations = ec2.describe_instances()['Reservations']
for reservation in reservations:
for instance in reservation['Instances']:
if instance["InstanceId"] == instance_id:
print "Started fetching instance details for instance {0}".format(instance["InstanceId"])
security_groups = []
for security_group in instance["SecurityGroups"]:
security_groups.append(security_group["GroupId"])
print "EC2 instance {0} has security group attached :{1}".format(instance["InstanceId"],
str(security_groups))
break
return security_groups
def lambda_handler(event, context):
"""
Lambda for Unrestricted Access to non HTTP/HTTPS ports policy incident remediation. Remediate the incident by deleting
security group permission where ipv4=0.0.0.0/0 or ipv6=::/0 and FromPort or ToPort NOT Equal to [80,443]
:param event:
:param context:
:return:
"""
ipv4_check = "0.0.0.0/0"
ipv6_check = "::/0"
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
instance_id = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"][
"region"]
print "Remediation for policy {0}, instanceId:{1} ,region:{2}".format(policy_name, instance_id, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
ec2 = boto3.client('ec2', region)
security_groups=get_security_group_attached_to_ec2(region,instance_id)
response = ec2.describe_security_groups(GroupIds=security_groups)
for security_group in response["SecurityGroups"]:
for security_group_permission in security_group["IpPermissions"]:
if "FromPort" in security_group_permission and "ToPort" in security_group_permission:
if (security_group_permission["FromPort"] not in [80, 443] or security_group_permission['ToPort'] not in [80,
443]) and \
(any(ip["CidrIp"] == ipv4_check for ip in security_group_permission["IpRanges"]) or
any(ip["CidrIpv6"] == ipv6_check for ip in
security_group_permission["Ipv6Ranges"])):
ec2.revoke_security_group_ingress(GroupId=security_group["GroupId"],
IpPermissions=[security_group_permission])
print"Delete Inbound rule {0} from security group Id {1}".
format(security_group_permission,security_group["GroupId"]
incident_remediation_unrestricted_access_policies
import boto3
import json
def get_security_group_attached_to_ec2(region, instance_id):
ec2 = boto3.client('ec2', region)
reservations = ec2.describe_instances()['Reservations']
for reservation in reservations:
for instance in reservation['Instances']:
if instance["InstanceId"] == instance_id:
print "Started fetching instance details for instance {0}".format(instance["InstanceId"])
security_groups = []
for security_group in instance["SecurityGroups"]:
security_groups.append(security_group["GroupId"])
print "EC2 instance {0} has security group attached :{1}".format(instance["InstanceId"],
str(security_groups))
break
return security_groups
def lambda_handler(event, context):
"""
Lambda will remediate the incident for Unrestricted Access policies:
1.Unrestricted CIFS Access
2.Unrestricted DNS Access
3.Unrestricted FTP Access
4.Unrestricted MongoDB Access
5.Unrestricted MSSQL Access
6.Unrestricted MSSQL Database Access (UDP)
7.Unrestricted MySQL Access
8.Unrestricted NetBIOS Access
9.Unrestricted Oracle Database Access
10.Unrestricted PostgreSQL Access
11.Unrestricted Remote Desktop Access
12.Unrestricted RPC Access
13.Unrestricted SMTP Access
14.Unrestricted SSH Access
15.Unrestricted Telnet Access
16.Unrestricted VNC Listener Access
17.Unrestricted VNC Server Access
:param event:
:param context:
:return:
"""
ipv4_check = "0.0.0.0/0"
ipv6_check = "::/0"
aws_policy_and_ports_config = {"Unrestricted CIFS Access": [445], "Unrestricted DNS Access": [53],
"Unrestricted FTP Access": [20, 21], "Unrestricted MongoDB Access": [27017],
"Unrestricted MSSQL Access": [1433],
"Unrestricted MSSQL Database Access (UDP)": [1434],
"Unrestricted MySQL Access": [3306], "Unrestricted NetBIOS Access": [138, 139, 137],
"Unrestricted Oracle Database Access": [1521],
"Unrestricted PostgreSQL Access": [5432],
"Unrestricted Remote Desktop Access": [3389], "Unrestricted RPC Access": [135],
"Unrestricted SMTP Access": [25], "Unrestricted SSH Access": [22],
"Unrestricted Telnet Access": [23], "Unrestricted VNC Listener Access": [5500],
"Unrestricted VNC Server Access": [5900]}
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
instance_id = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"][
"region"]
print "Remediation for policy {0}, instanceId:{1} ,region:{2}".format(policy_name, instance_id, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
ec2 = boto3.client('ec2', region)
security_groups = get_security_group_attached_to_ec2(region, instance_id)
response = ec2.describe_security_groups(GroupIds=security_groups)
for security_group in response["SecurityGroups"]:
for security_group_permission in security_group["IpPermissions"]:
if "FromPort" in security_group_permission and "ToPort" in security_group_permission:
if (security_group_permission["FromPort"] in aws_policy_and_ports_config[
policy_name] or security_group_permission['ToPort'] in
aws_policy_and_ports_config[policy_name]) and \
(any(ip["CidrIp"] == ipv4_check for ip in security_group_permission["IpRanges"]) or
any(ip["CidrIpv6"] == ipv6_check for ip in
security_group_permission["Ipv6Ranges"])):
ec2.revoke_security_group_ingress(GroupId=security_group["GroupId"],
IpPermissions=[security_group_permission])
print "Deleted Inbound rule {0} from security group Id {1}".format(security_group_permission,
security_group["GroupId"])
incident_remediation_unrestricted_access_rds
import json
import boto3
def lambda_handler(event, context):
"""
Lambda for Unrestricted Access to RDS Instances policy incident remediation. Remediate the incident by Disabling PubliclyAccessible.
:param event:
:param context:
:return:
"""
ipv4_check = "0.0.0.0/0"
ipv6_check = "::/0"
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
is_publicly_accessible = False
instance_id = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"]["region"]
print "Remediation for policy {0}, instanceId:{1} ,region:{2}".format(policy_name, instance_id, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
rds = boto3.client('rds', region)
security_groups = []
rds_instances = rds.describe_db_instances(DBInstanceIdentifier=instance_id)
for db_instance in rds_instances["DBInstances"]:
if db_instance["PubliclyAccessible"] == True:
for vpc_security_group in db_instance["VpcSecurityGroups"]:
security_group_id = vpc_security_group["VpcSecurityGroupId"]
security_groups.append(security_group_id)
ec2 = boto3.client('ec2', region)
response = ec2.describe_security_groups(GroupIds=security_groups)
for security_group in response["SecurityGroups"]:
for ipPermission in security_group["IpPermissions"]:
if (any(ip["CidrIp"] == ipv4_check for ip in ipPermission["IpRanges"]) or
any(ip["CidrIpv6"] == ipv6_check for ip in
ipPermission["Ipv6Ranges"])):
is_publicly_accessible = True
break
if len(security_groups) > 0 and is_publicly_accessible:
response = rds.modify_db_instance(DBInstanceIdentifier=instance_id, PubliclyAccessible=False)
for db_instance in response["DBInstance"]:
if db_instance["PubliclyAccessible"] == False:
return "DBInstance is not Publicly Accessible"
else:
return "DBInstance is Publicly Accessible"
incident_remediation_unrestricted_access_s3_bucket
import json
import boto3
def lambda_handler(event, context):
"""
Lambda for Unrestricted Access to S3 bucket policy incident remediation. It will remediate the incident by making bucket ACL private.
:param event:
:param context:
:return:
"""
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
bucket_name = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"]["region"]
print "Remediation for policy {0}, bucket_name:{1} ,region:{2}".format(policy_name, bucket_name, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
s3 = boto3.client('s3', region)
s3.put_bucket_acl(Bucket=bucket_name, ACL="private")
print "Successfully Updated S3 bucket {0} ACL to Private".format(bucket_name)
incident_remediation_unrestricted_icmp_access
import json
import boto3
def get_security_group_attached_to_ec2(region, instance_id):
ec2 = boto3.client('ec2', region)
reservations = ec2.describe_instances()['Reservations']
for reservation in reservations:
for instance in reservation['Instances']:
if instance["InstanceId"] == instance_id:
print "Started fetching instance details for instance {0}".format(instance["InstanceId"])
security_groups = []
for security_group in instance["SecurityGroups"]:
security_groups.append(security_group["GroupId"])
print "EC2 instance {0} has security group attached :{1}".format(instance["InstanceId"],
str(security_groups))
break
return security_groups
def lambda_handler(event, context):
"""
Lambda for Unrestricted ICMP Access policy incident remediation. Remediate the incident by deleting
security group permission where ipv4=0.0.0.0/0 or ipv6=::/0 and IpProtocol=ICMP
:param event:
:param context:
:return:
"""
ipv4_check = "0.0.0.0/0"
ipv6_check = "::/0"
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"]["region"]
instance_id = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
print "Remediation for policy {0}, instanceId:{1} ,region:{2}".format(policy_name, instance_id, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
ec2 = boto3.client('ec2', region)
security_groups = get_security_group_attached_to_ec2(region, instance_id)
response = ec2.describe_security_groups(GroupIds=security_groups)
for security_group in response["SecurityGroups"]:
for security_group_permission in security_group["IpPermissions"]:
if (security_group_permission["IpProtocol"] == "icmp") and \
(any(ip["CidrIp"] == ipv4_check for ip in security_group_permission["IpRanges"]) or
any(ip["CidrIpv6"] == ipv6_check for ip in
security_group_permission["Ipv6Ranges"])):
ec2.revoke_security_group_ingress(GroupId=security_group["GroupId"],
IpPermissions=[security_group_permission])
print "Deleted Inbound rule {0} from security group Id {1}".format(security_group_permission,
security_group["GroupId"])
incident_remediation_unrestricted_inbound_access
import json
import boto3
def get_security_group_attached_to_ec2(region, instance_id):
ec2 = boto3.client('ec2', region)
reservations = ec2.describe_instances()['Reservations']
for reservation in reservations:
for instance in reservation['Instances']:
if instance["InstanceId"] == instance_id:
print "Started fetching instance details for instance {0}".format(instance["InstanceId"])
security_groups = []
for security_group in instance["SecurityGroups"]:
security_groups.append(security_group["GroupId"])
print "EC2 instance {0} has security group attached :{1}".format(instance["InstanceId"],
str(security_groups))
break
return security_groups
def lambda_handler(event, context):
"""
Lambda for Unrestricted Inbound Access on Uncommon Ports policy incident remediation. Remediate the incident by deleting
security group permission where ipv4=0.0.0.0/0 or ipv6=::/0 and FromPort or ToPort Greater then 1024
:param event:
:param context:
:return:
"""
ipv4_check = "0.0.0.0/0"
ipv6_check = "::/0"
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
instance_id = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"]["region"]
print "Remediation for policy {0}, instanceId:{1} ,region:{2}".format(policy_name, instance_id, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
ec2 = boto3.client('ec2', region)
security_groups = get_security_group_attached_to_ec2(region, instance_id)
response = ec2.describe_security_groups(GroupIds=security_groups)
for security_group in response["SecurityGroups"]:
for security_group_permission in security_group["IpPermissions"]:
if "FromPort" in security_group_permission and "ToPort" in security_group_permission:
if (security_group_permission["FromPort"] > 1024 or security_group_permission['ToPort'] >
1024) and \
(any(ip["CidrIp"] == ipv4_check for ip in security_group_permission["IpRanges"]) or
any(ip["CidrIpv6"] == ipv6_check for ip in
security_group_permission["Ipv6Ranges"])):
ec2.revoke_security_group_ingress(GroupId=security_group["GroupId"],
IpPermissions=[security_group_permission])
print "Deleted Inbound rule {0} from security group Id {1}".format(security_group_permission,
security_group["GroupId"])
incident_remediation_unrestricted_outbound_access
import boto3
import json
def get_security_group_attached_to_ec2(region, instance_id):
ec2 = boto3.client('ec2', region)
reservations = ec2.describe_instances()['Reservations']
for reservation in reservations:
for instance in reservation['Instances']:
if instance["InstanceId"] == instance_id:
print "Started fetching instance details for instance {0}".format(instance["InstanceId"])
security_groups = []
for security_group in instance["SecurityGroups"]:
security_groups.append(security_group["GroupId"])
print "EC2 instance {0} has security group attached :{1}".format(instance["InstanceId"],
str(security_groups))
break
return security_groups
def lambda_handler(event, context):
"""
Lambda for Unrestricted Outbound Access policy incident remediation. Remediate the incident by deleting
security group permission where ipv4=0.0.0.0/0 or ipv6=::/0
:param event:
:param context:
:return:
"""
ipv4_check = "0.0.0.0/0"
ipv6_check = "::/0"
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
instance_id = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"][
"region"]
print "Remediation for policy {0}, instanceId:{1} ,region:{2}".format(policy_name, instance_id, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
ec2 = boto3.client('ec2', region)
security_groups = get_security_group_attached_to_ec2(region, instance_id)
response = ec2.describe_security_groups(GroupIds=security_groups)
for security_group in response["SecurityGroups"]:
for security_group_permission in security_group["IpPermissionsEgress"]:
if (any(ip["CidrIp"] == ipv4_check for ip in security_group_permission["IpRanges"]) or
any(ip["CidrIpv6"] == ipv6_check for ip in
security_group_permission["Ipv6Ranges"])):
ec2.revoke_security_group_egress(GroupId=security_group["GroupId"],
IpPermissions=[security_group_permission])
print "Deleted Outbound rule {0} from security group Id {1}".format(security_group_permission,
security_group["GroupId"])
Code Samples: Lambda-based Custom Policies
cloudtrail_integration_with_cloudwatch_enabled
import boto3
def lambda_handler(event, context):
regions = boto3.session.Session().get_available_regions('cloudtrail')
non_compliance=[]
cloudtrail_names=[]
for region in regions:
cloudtrail = boto3.client('cloudtrail', region)
cloudtrail_response=cloudtrail.describe_trails()
for cloudtrail_detail in cloudtrail_response["trailList"]:
if "CloudWatchLogsLogGroupArn" in cloudtrail_detail.keys():
cloudtrail_names.append(cloudtrail_detail["Name"])
for cloudtrail_name in cloudtrail_names:
if cloudtrail_name not in non_compliance:
non_compliance.append(cloudtrail_name)
lambda_response = dict()
lambda_response["status"] = "success"
lambda_response["message"] = "CloudTrail Integration with CloudWatch Enabled"
lambda_response["non_compliance"] = non_compliance
return lambda_response
cloudtrail_logs_encrypted_at_rest
import boto3
def lambda_handler(event, context):
regions = boto3.session.Session().get_available_regions('cloudtrail')
cloudtrail_logs_not_encrypted=[]
non_compliance=[]
for region in regions:
cloudtrail = boto3.client('cloudtrail', region)
cloudtrail_response=cloudtrail.describe_trails()
for cloudtrail_detail in cloudtrail_response["trailList"]:
if "KmsKeyId" not in cloudtrail_detail.keys():
cloudtrail_logs_not_encrypted.append(cloudtrail_detail["Name"])
for cloudtrail_name in cloudtrail_logs_not_encrypted:
if cloudtrail_name not in non_compliance:
non_compliance.append(cloudtrail_name)
lambda_response = dict()
lambda_response["status"] = "success"
lambda_response["message"] = "CloudTrail Logs Encrypted at Rest"
lambda_response["non_compliance"] = non_compliance
return lambda_response
cloudtrail_multiregion_logging_enabled
import boto3
def lambda_handler(event, context):
regions = boto3.session.Session().get_available_regions('cloudtrail')
non_compliance=[]
cloudtrail_names=[]
for region in regions:
cloudtrail = boto3.client('cloudtrail', region)
cloudtrail_response=cloudtrail.describe_trails()
for cloudtrail_detail in cloudtrail_response["trailList"]:
if cloudtrail_detail["IsMultiRegionTrail"] == False:
cloudtrail_names.append(cloudtrail_detail["Name"])
for cloudtrail_name in cloudtrail_names:
if cloudtrail_name not in non_compliance:
non_compliance.append(cloudtrail_name)
lambda_response = dict()
lambda_response["status"] = "success"
lambda_response["message"] = "CloudTrail Multi-region Logging Enabled"
lambda_response["non_compliance"] = non_compliance
return lambda_response
database_encryption_for_rds
import boto3
def lambda_handler(event, context):
regions = boto3.session.Session().get_available_regions('rds')
non_compliance=[]
for region in regions:
rds_client = boto3.client('rds', region)
rds_instances=rds_client.describe_db_instances()
for rds_instance in rds_instances["DBInstances"]:
if rds_instance["StorageEncrypted"] == False:
non_compliance.append(rds_instance["DBInstanceIdentifier"])
lambda_response = dict()
lambda_response["status"] = "success"
lambda_response["message"] = "Database Encryption for RDS"
lambda_response["non_compliance"] = non_compliance
return lambda_response
incident_remediation_unrestricted_inbound_access
import json
import boto3
def get_security_group_attached_to_ec2(region, instance_id):
ec2 = boto3.client('ec2', region)
reservations = ec2.describe_instances()['Reservations']
for reservation in reservations:
for instance in reservation['Instances']:
if instance["InstanceId"] == instance_id:
print "Started fetching instance details for instance {0}".format(instance["InstanceId"])
security_groups = []
for security_group in instance["SecurityGroups"]:
security_groups.append(security_group["GroupId"])
print "EC2 instance {0} has security group attached :{1}".format(instance["InstanceId"],
str(security_groups))
break
return security_groups
def lambda_handler(event, context):
"""
Lambda for Unrestricted Inbound Access on Uncommon Ports policy incident remediation. Remediate the incident by deleting
security group permission where ipv4=0.0.0.0/0 or ipv6=::/0 and FromPort or ToPort Greater then 1024
:param event:
:param context:
:return:
"""
ipv4_check = "0.0.0.0/0"
ipv6_check = "::/0"
try:
incident_message = event["Records"][0]
incident_data = json.loads(incident_message["Sns"]["Message"])
instance_id = incident_data["entity_id"]
policy_name = incident_data["policy_name"]
region = incident_data["more_info"]["incident_detail"]["content"]["item"]["internal_metadata"]["region"]
print "Remediation for policy {0}, instanceId:{1} ,region:{2}".format(policy_name, instance_id, region)
except Exception as e:
print "Exception in sns event {0}".format(e.message)
ec2 = boto3.client('ec2', region)
security_groups = get_security_group_attached_to_ec2(region, instance_id)
response = ec2.describe_security_groups(GroupIds=security_groups)
for security_group in response["SecurityGroups"]:
for security_group_permission in security_group["IpPermissions"]:
if "FromPort" in security_group_permission and "ToPort" in security_group_permission:
if (security_group_permission["FromPort"] > 1024 or security_group_permission['ToPort'] >
1024) and \
(any(ip["CidrIp"] == ipv4_check for ip in security_group_permission["IpRanges"]) or
any(ip["CidrIpv6"] == ipv6_check for ip in
security_group_permission["Ipv6Ranges"])):
ec2.revoke_security_group_ingress(GroupId=security_group["GroupId"],
IpPermissions=[security_group_permission])
print "Deleted Inbound rule {0} from security group Id {1}".format(security_group_permission,
security_group["GroupId"])
Lambda code samples for custom policy
import boto3
def lambda_handler(event, context):
regions = boto3.session.Session().get_available_regions('cloudtrail')
non_compliance=[]
cloudtrail_names=[]
for region in regions:
cloudtrail = boto3.client('cloudtrail', region)
cloudtrail_response=cloudtrail.describe_trails()
for cloudtrail_detail in cloudtrail_response["trailList"]:
if "CloudWatchLogsLogGroupArn" in cloudtrail_detail.keys():
cloudtrail_names.append(cloudtrail_detail["Name"])
for cloudtrail_name in cloudtrail_names:
if cloudtrail_name not in non_compliance:
non_compliance.append(cloudtrail_name)
lambda_response = dict()
lambda_response["status"] = "success"
lambda_response["message"] = "CloudTrail Integration with CloudWatch Enabled"
lambda_response["non_compliance"] = non_compliance
return lambda_response
provisioning_access_to_resources_using_iam_roles
import boto3
def lambda_handler(event, context):
regions = boto3.session.Session().get_available_regions('ec2')
non_compliance=[]
for region in regions:
ec2 = boto3.client('ec2', region)
ec2_response=ec2.describe_instances()
for reservation in ec2_response["Reservations"]:
for instance in reservation["Instances"]:
instance_id=instance["InstanceId"]
if is_iam_profile_associated_to_resource(instance_id):
non_compliance.append(instance_id)
lambda_response = dict()
lambda_response["status"] = "success"
lambda_response["message"] = "Provisioning Access to Resources Using IAM Roles"
lambda_response["non_compliance"] = non_compliance
return lambda_response
def is_iam_profile_associated_to_resource(instance_id):
ec2 = boto3.client('ec2')
response = ec2.describe_iam_instance_profile_associations(Filters=[{"Name":"instance-id","Values":[str(instance_id)]}])
if len(response[u'IamInstanceProfileAssociations']) == 0: return True
publicly_writable_s3_buckets
import boto3
def lambda_handler(event, context):
regions = boto3.session.Session().get_available_regions('s3')
bucket_names=[]
non_compliance = []
for region in regions:
uri="http://acs.amazonaws.com/groups/global/AllUsers"
client=boto3.client('s3', region)
buckets = client.list_buckets()
for bucket_info in buckets["Buckets"]:
bucket_name = bucket_info["Name"]
bucket_acl = client.get_bucket_acl(Bucket=bucket_name)
for grants in bucket_acl["Grants"]:
if "URI" in grants["Grantee"].keys():
if grants["Grantee"]["URI"] == uri:
bucket_permission = grants["Permission"]
if bucket_permission == "WRITE":
non_compliance.append(bucket_name)
lambda_response = dict()
lambda_response["status"] = "success"
lambda_response["message"] = "Publicly writable s3 buckets"
lambda_response["non_compliance"] = non_compliance
return lambda_response
vpc_flowlogs_enabled
import boto3
def lambda_handler(event, context):
regions = boto3.session.Session().get_available_regions('ec2')
non_compliance=[]
for region in regions:
ec2 = boto3.client('ec2', region)
vpc_response=ec2.describe_vpcs()
for vpc_info in vpc_response["Vpcs"]:
vpc_id=vpc_info["VpcId"]
if is_flow_logs_enabled(vpc_id):
non_compliance.append(vpc_id)
lambda_response = dict()
lambda_response["status"] = "success"
lambda_response["message"] = "VPC Flow Logs Enabled"
lambda_response["non_compliance"] = non_compliance
return lambda_response
def is_flow_logs_enabled(vpc_id):
ec2 = boto3.client('ec2')
response = ec2.describe_flow_logs(
Filter=[
{
'Name': 'resource-id',
'Values': [
vpc_id,
]
},
],
)
if len(response[u'FlowLogs']) == 0: return True