Copy # lambda/incident_response.py
import boto3
import json
import os
from datetime import datetime
iam = boto3.client('iam')
ec2 = boto3.client('ec2')
sns = boto3.client('sns')
s3 = boto3.client('s3')
def handler(event, context):
"""
Automated incident response based on GuardDuty findings
"""
finding = event['detail']
finding_type = finding['type']
severity = finding['severity']
resource = finding['resource']
print(f"Processing finding: {finding_type} (Severity: {severity})")
# Route to appropriate response handler
if 'UnauthorizedAccess:IAMUser' in finding_type:
handle_compromised_iam_user(finding)
elif 'CryptoCurrency:EC2' in finding_type:
handle_crypto_mining(finding)
elif 'Recon:IAMUser/MaliciousIPCaller' in finding_type:
handle_malicious_ip(finding)
elif 'UnauthorizedAccess:EC2' in finding_type:
handle_compromised_instance(finding)
elif 'Exfiltration:S3' in finding_type:
handle_data_exfiltration(finding)
# Always create incident ticket and notify SOC
create_incident_ticket(finding)
notify_soc(finding)
return {'statusCode': 200}
def handle_compromised_iam_user(finding):
"""Response to compromised IAM credentials"""
user_name = finding['resource']['accessKeyDetails']['userName']
access_key_id = finding['resource']['accessKeyDetails']['accessKeyId']
print(f"RESPONSE: Compromised IAM user detected: {user_name}")
# 1. Disable access key immediately
iam.update_access_key(
UserName=user_name,
AccessKeyId=access_key_id,
Status='Inactive'
)
print(f"✓ Disabled access key: {access_key_id}")
# 2. Attach explicit deny policy
deny_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Action": "*",
"Resource": "*"
}]
}
iam.put_user_policy(
UserName=user_name,
PolicyName='IncidentResponseDenyAll',
PolicyDocument=json.dumps(deny_policy)
)
print(f"✓ Attached deny-all policy to user: {user_name}")
# 3. Force password reset
try:
iam.update_login_profile(
UserName=user_name,
PasswordResetRequired=True
)
print(f"✓ Forced password reset for: {user_name}")
except:
pass # User might not have console access
# 4. Log all actions to forensics bucket
log_incident_action({
'timestamp': datetime.utcnow().isoformat(),
'finding_type': 'CompromisedIAMUser',
'user_name': user_name,
'access_key_id': access_key_id,
'actions_taken': [
'Disabled access key',
'Attached deny-all policy',
'Forced password reset'
]
})
def handle_crypto_mining(finding):
"""Response to cryptocurrency mining activity"""
instance_id = finding['resource']['instanceDetails']['instanceId']
print(f"RESPONSE: Crypto mining detected on instance: {instance_id}")
# 1. Create snapshot for forensics
volumes = ec2.describe_instances(InstanceIds=[instance_id])
for reservation in volumes['Reservations']:
for instance in reservation['Instances']:
for volume in instance.get('BlockDeviceMappings', []):
volume_id = volume['Ebs']['VolumeId']
snapshot = ec2.create_snapshot(
VolumeId=volume_id,
Description=f'Forensic snapshot - crypto mining incident {instance_id}',
TagSpecifications=[{
'ResourceType': 'snapshot',
'Tags': [
{'Key': 'IncidentType', 'Value': 'CryptoMining'},
{'Key': 'InstanceId', 'Value': instance_id},
{'Key': 'Timestamp', 'Value': datetime.utcnow().isoformat()}
]
}]
)
print(f"✓ Created forensic snapshot: {snapshot['SnapshotId']}")
# 2. Isolate instance (remove from security groups, attach to isolated SG)
isolated_sg = get_or_create_isolated_security_group()
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[isolated_sg]
)
print(f"✓ Isolated instance {instance_id} (attached to isolated SG)")
# 3. Stop instance
ec2.stop_instances(InstanceIds=[instance_id])
print(f"✓ Stopped instance: {instance_id}")
# 4. Log incident
log_incident_action({
'timestamp': datetime.utcnow().isoformat(),
'finding_type': 'CryptoMining',
'instance_id': instance_id,
'actions_taken': [
'Created forensic snapshots',
'Isolated instance',
'Stopped instance'
]
})
def handle_data_exfiltration(finding):
"""Response to potential data exfiltration from S3"""
bucket_name = finding['resource']['s3BucketDetails'][0]['name']
print(f"RESPONSE: Data exfiltration detected from bucket: {bucket_name}")
# 1. Block all public access
s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': True
}
)
print(f"✓ Blocked public access to bucket: {bucket_name}")
# 2. Enable MFA Delete
s3.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={
'MFADelete': 'Enabled',
'Status': 'Enabled'
},
MFA=os.environ['MFA_DEVICE_SERIAL'] # Requires MFA
)
print(f"✓ Enabled MFA Delete for bucket: {bucket_name}")
# 3. Log incident
log_incident_action({
'timestamp': datetime.utcnow().isoformat(),
'finding_type': 'DataExfiltration',
'bucket_name': bucket_name,
'actions_taken': [
'Blocked public access',
'Enabled MFA Delete',
'Created incident ticket'
]
})
def get_or_create_isolated_security_group():
"""Get or create security group for isolating compromised instances"""
sg_name = 'incident-response-isolated'
try:
response = ec2.describe_security_groups(
Filters=[{'Name': 'group-name', 'Values': [sg_name]}]
)
return response['SecurityGroups'][0]['GroupId']
except:
# Create isolated SG (no ingress, no egress)
sg = ec2.create_security_group(
GroupName=sg_name,
Description='Isolated security group for compromised instances',
VpcId=os.environ['DEFAULT_VPC_ID']
)
# Remove default egress rule
ec2.revoke_security_group_egress(
GroupId=sg['GroupId'],
IpPermissions=[{
'IpProtocol': '-1',
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}]
)
return sg['GroupId']
def create_incident_ticket(finding):
"""Create incident ticket in ticketing system"""
# Integration with ServiceNow, Jira, etc.
pass
def notify_soc(finding):
"""Notify SOC team via PagerDuty/Slack"""
severity = finding['severity']
finding_type = finding['type']
# High/Critical severity: PagerDuty
if severity >= 7:
trigger_pagerduty_incident(finding)
# All findings: Slack notification
send_slack_alert(finding)
def trigger_pagerduty_incident(finding):
"""Trigger PagerDuty incident for high-severity findings"""
# PagerDuty API integration
pass
def send_slack_alert(finding):
"""Send Slack alert to #security-alerts channel"""
# Slack webhook integration
pass
def log_incident_action(action):
"""Log all incident response actions to forensics bucket"""
s3.put_object(
Bucket=os.environ['FORENSICS_BUCKET'],
Key=f"incident-actions/{datetime.utcnow().date()}/{datetime.utcnow().isoformat()}.json",
Body=json.dumps(action, indent=2),
ServerSideEncryption='aws:kms',
SSEKMSKeyId=os.environ['FORENSICS_KMS_KEY']
)