You’ve learned the theory. Now let’s build something real.
This capstone project ties together everything from the course: IAM analysis, network security, encryption verification, logging checks, and automated reporting. By the end, you’ll have a working Python CLI that scans an AWS account for the most common security misconfigurations and generates a scored report.
Project Overview
We’re building cloud-sec-scan — a command-line tool that:
- Scans five security domains (Security Groups, S3, IAM, EBS, CloudTrail)
- Scores each finding by severity (Critical, High, Medium, Low)
- Calculates an overall account security score
- Generates an HTML report
Architecture
The scanner follows a modular design:
cloud-sec-scan/
├── scanner/
│ ├── __init__.py
│ ├── cli.py # CLI entry point
│ ├── base.py # Base scanner class
│ ├── security_groups.py
│ ├── s3_buckets.py
│ ├── iam_policies.py
│ ├── ebs_encryption.py
│ └── cloudtrail.py
├── scoring/
│ ├── __init__.py
│ └── engine.py # Severity scoring
├── reporting/
│ ├── __init__.py
│ └── html_report.py # HTML report generator
├── requirements.txt
└── README.mdSetting Up the Project
mkdir cloud-sec-scan && cd cloud-sec-scan
python -m venv venv
source venv/bin/activate
pip install boto3 jinja2# requirements.txt
boto3>=1.28.0
jinja2>=3.1.0Base Scanner Class
Every scanner module inherits from a common base:
# scanner/base.py
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from typing import List
class Severity(Enum):
CRITICAL = 4
HIGH = 3
MEDIUM = 2
LOW = 1
INFO = 0
@dataclass
class Finding:
title: str
description: str
severity: Severity
resource_id: str
resource_type: str
recommendation: str
region: str = "global"
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
class BaseScanner:
"""Base class for all security scanners"""
def __init__(self, session=None):
self.session = session or __import__('boto3').Session()
self.findings: List[Finding] = []
def scan(self) -> List[Finding]:
raise NotImplementedError
def add_finding(self, **kwargs):
finding = Finding(**kwargs)
self.findings.append(finding)
return findingScanner Module 1: Security Groups
Detects security groups with overly permissive ingress rules.
# scanner/security_groups.py
import boto3
from .base import BaseScanner, Severity
DANGEROUS_PORTS = {
22: "SSH",
3389: "RDP",
3306: "MySQL",
5432: "PostgreSQL",
27017: "MongoDB",
6379: "Redis",
9200: "Elasticsearch",
}
class SecurityGroupScanner(BaseScanner):
def scan(self):
ec2 = self.session.client('ec2')
paginator = ec2.get_paginator('describe_security_groups')
for page in paginator.paginate():
for sg in page['SecurityGroups']:
self._check_ingress_rules(sg)
return self.findings
def _check_ingress_rules(self, sg):
sg_id = sg['GroupId']
sg_name = sg.get('GroupName', 'unknown')
for rule in sg.get('IpPermissions', []):
from_port = rule.get('FromPort', 0)
to_port = rule.get('ToPort', 65535)
for ip_range in rule.get('IpRanges', []):
cidr = ip_range.get('CidrIp', '')
if cidr == '0.0.0.0/0':
self._report_open_rule(sg_id, sg_name, from_port, to_port, cidr)
for ip_range in rule.get('Ipv6Ranges', []):
cidr = ip_range.get('CidrIpv6', '')
if cidr == '::/0':
self._report_open_rule(sg_id, sg_name, from_port, to_port, cidr)
def _report_open_rule(self, sg_id, sg_name, from_port, to_port, cidr):
# Check if it's a dangerous port
for port, service in DANGEROUS_PORTS.items():
if from_port <= port <= to_port:
self.add_finding(
title=f"Security group allows {service} ({port}) from internet",
description=f"Security group {sg_id} ({sg_name}) allows inbound traffic on port {port} ({service}) from {cidr}",
severity=Severity.CRITICAL,
resource_id=sg_id,
resource_type="AWS::EC2::SecurityGroup",
recommendation=f"Restrict port {port} to specific IP ranges or remove the rule"
)
return
# Generic open port
if from_port == 0 and to_port == 65535:
self.add_finding(
title=f"Security group allows ALL ports from internet",
description=f"Security group {sg_id} ({sg_name}) allows all inbound traffic from {cidr}",
severity=Severity.CRITICAL,
resource_id=sg_id,
resource_type="AWS::EC2::SecurityGroup",
recommendation="Remove the 0.0.0.0/0 rule or restrict to specific ports"
)
else:
self.add_finding(
title=f"Security group allows port {from_port}-{to_port} from internet",
description=f"Security group {sg_id} ({sg_name}) allows inbound traffic on ports {from_port}-{to_port} from {cidr}",
severity=Severity.HIGH,
resource_id=sg_id,
resource_type="AWS::EC2::SecurityGroup",
recommendation="Restrict to specific IP ranges"
)Scanner Module 2: S3 Buckets
# scanner/s3_buckets.py
from .base import BaseScanner, Severity
class S3BucketScanner(BaseScanner):
def scan(self):
s3 = self.session.client('s3')
s3control = self.session.client('s3control')
account_id = self.session.client('sts').get_caller_identity()['Account']
# Check account-level public access block
try:
account_block = s3control.get_public_access_block(AccountId=account_id)
config = account_block['PublicAccessBlockConfiguration']
if not all([config.get('BlockPublicAcls'), config.get('BlockPublicPolicy'),
config.get('IgnorePublicAcls'), config.get('RestrictPublicBuckets')]):
self.add_finding(
title="Account-level S3 Block Public Access is not fully enabled",
description="One or more Block Public Access settings are disabled at the account level",
severity=Severity.HIGH,
resource_id=account_id,
resource_type="AWS::S3::AccountPublicAccessBlock",
recommendation="Enable all four Block Public Access settings at the account level"
)
except Exception:
self.add_finding(
title="Account-level S3 Block Public Access is not configured",
severity=Severity.HIGH,
description="No account-level public access block found",
resource_id=account_id,
resource_type="AWS::S3::AccountPublicAccessBlock",
recommendation="Enable Block Public Access at the account level"
)
# Check individual buckets
buckets = s3.list_buckets().get('Buckets', [])
for bucket in buckets:
name = bucket['Name']
self._check_encryption(s3, name)
self._check_versioning(s3, name)
self._check_logging(s3, name)
return self.findings
def _check_encryption(self, s3, bucket_name):
try:
s3.get_bucket_encryption(Bucket=bucket_name)
except s3.exceptions.ClientError:
self.add_finding(
title=f"S3 bucket '{bucket_name}' has no default encryption",
description=f"Bucket {bucket_name} does not have server-side encryption enabled",
severity=Severity.HIGH,
resource_id=bucket_name,
resource_type="AWS::S3::Bucket",
recommendation="Enable default encryption with KMS or AES-256"
)
def _check_versioning(self, s3, bucket_name):
response = s3.get_bucket_versioning(Bucket=bucket_name)
if response.get('Status') != 'Enabled':
self.add_finding(
title=f"S3 bucket '{bucket_name}' versioning is not enabled",
description=f"Bucket {bucket_name} does not have versioning enabled",
severity=Severity.MEDIUM,
resource_id=bucket_name,
resource_type="AWS::S3::Bucket",
recommendation="Enable versioning for data protection and recovery"
)
def _check_logging(self, s3, bucket_name):
response = s3.get_bucket_logging(Bucket=bucket_name)
if 'LoggingEnabled' not in response:
self.add_finding(
title=f"S3 bucket '{bucket_name}' access logging is not enabled",
description=f"Bucket {bucket_name} does not have access logging enabled",
severity=Severity.MEDIUM,
resource_id=bucket_name,
resource_type="AWS::S3::Bucket",
recommendation="Enable access logging for audit trail"
)Scanner Module 3: IAM Policies
# scanner/iam_policies.py
from .base import BaseScanner, Severity
import json
class IAMScanner(BaseScanner):
def scan(self):
iam = self.session.client('iam')
self._check_root_access_keys(iam)
self._check_mfa(iam)
self._check_admin_policies(iam)
self._check_old_access_keys(iam)
return self.findings
def _check_root_access_keys(self, iam):
summary = iam.get_account_summary()['SummaryMap']
if summary.get('AccountAccessKeysPresent', 0) > 0:
self.add_finding(
title="Root account has access keys",
description="The root account has active access keys, which is a critical security risk",
severity=Severity.CRITICAL,
resource_id="root",
resource_type="AWS::IAM::User",
recommendation="Delete root access keys and use IAM roles instead"
)
def _check_mfa(self, iam):
users = iam.list_users()['Users']
for user in users:
mfa_devices = iam.list_mfa_devices(UserName=user['UserName'])['MFADevices']
login_profile_exists = True
try:
iam.get_login_profile(UserName=user['UserName'])
except iam.exceptions.NoSuchEntityException:
login_profile_exists = False
if login_profile_exists and not mfa_devices:
self.add_finding(
title=f"IAM user '{user['UserName']}' has console access without MFA",
description=f"User {user['UserName']} can log into the console but has no MFA device configured",
severity=Severity.HIGH,
resource_id=user['UserName'],
resource_type="AWS::IAM::User",
recommendation="Enable MFA for all console users"
)
def _check_admin_policies(self, iam):
roles = iam.list_roles()['Roles']
for role in roles:
policies = iam.list_attached_role_policies(RoleName=role['RoleName'])['AttachedPolicies']
for policy in policies:
if policy['PolicyArn'] == 'arn:aws:iam::aws:policy/AdministratorAccess':
self.add_finding(
title=f"Role '{role['RoleName']}' has AdministratorAccess",
description=f"IAM role {role['RoleName']} has the AdministratorAccess managed policy attached",
severity=Severity.HIGH,
resource_id=role['RoleName'],
resource_type="AWS::IAM::Role",
recommendation="Replace with least-privilege policy using IAM Access Analyzer"
)
def _check_old_access_keys(self, iam):
from datetime import datetime, timezone, timedelta
users = iam.list_users()['Users']
threshold = datetime.now(timezone.utc) - timedelta(days=90)
for user in users:
keys = iam.list_access_keys(UserName=user['UserName'])['AccessKeyMetadata']
for key in keys:
if key['Status'] == 'Active' and key['CreateDate'] < threshold:
age_days = (datetime.now(timezone.utc) - key['CreateDate']).days
self.add_finding(
title=f"Access key for '{user['UserName']}' is {age_days} days old",
description=f"Access key {key['AccessKeyId']} for user {user['UserName']} was created {age_days} days ago",
severity=Severity.MEDIUM,
resource_id=key['AccessKeyId'],
resource_type="AWS::IAM::AccessKey",
recommendation="Rotate access keys every 90 days"
)Scanner Module 4: EBS Encryption
# scanner/ebs_encryption.py
from .base import BaseScanner, Severity
class EBSScanner(BaseScanner):
def scan(self):
ec2 = self.session.client('ec2')
# Check default encryption
try:
default_enc = ec2.get_ebs_encryption_by_default()
if not default_enc.get('EbsEncryptionByDefault'):
self.add_finding(
title="EBS default encryption is not enabled",
description="New EBS volumes will not be encrypted by default",
severity=Severity.HIGH,
resource_id="ebs-default-encryption",
resource_type="AWS::EC2::Volume",
recommendation="Enable EBS encryption by default for the region"
)
except Exception:
pass
# Check existing volumes
paginator = ec2.get_paginator('describe_volumes')
for page in paginator.paginate():
for volume in page['Volumes']:
if not volume.get('Encrypted', False):
self.add_finding(
title=f"EBS volume {volume['VolumeId']} is not encrypted",
description=f"Volume {volume['VolumeId']} (state: {volume['State']}, size: {volume['Size']}GB) is not encrypted",
severity=Severity.HIGH,
resource_id=volume['VolumeId'],
resource_type="AWS::EC2::Volume",
recommendation="Create an encrypted snapshot and replace the volume"
)
return self.findingsScanner Module 5: CloudTrail Status
# scanner/cloudtrail.py
from .base import BaseScanner, Severity
class CloudTrailScanner(BaseScanner):
def scan(self):
ct = self.session.client('cloudtrail')
trails = ct.describe_trails()['trailList']
if not trails:
self.add_finding(
title="No CloudTrail trails configured",
description="This account has no CloudTrail trails. API calls are not being logged.",
severity=Severity.CRITICAL,
resource_id="cloudtrail",
resource_type="AWS::CloudTrail::Trail",
recommendation="Create a multi-region trail with log file validation"
)
return self.findings
for trail in trails:
name = trail['Name']
status = ct.get_trail_status(Name=name)
if not status.get('IsLogging'):
self.add_finding(
title=f"CloudTrail '{name}' is not logging",
description=f"Trail {name} exists but logging is stopped",
severity=Severity.CRITICAL,
resource_id=name,
resource_type="AWS::CloudTrail::Trail",
recommendation="Start logging immediately — this may indicate tampering"
)
if not trail.get('IsMultiRegionTrail'):
self.add_finding(
title=f"CloudTrail '{name}' is not multi-region",
description=f"Trail {name} only covers one region. Activity in other regions is not logged.",
severity=Severity.HIGH,
resource_id=name,
resource_type="AWS::CloudTrail::Trail",
recommendation="Enable multi-region logging"
)
if not trail.get('LogFileValidationEnabled'):
self.add_finding(
title=f"CloudTrail '{name}' log file validation is disabled",
description=f"Without log validation, tampered logs cannot be detected",
severity=Severity.MEDIUM,
resource_id=name,
resource_type="AWS::CloudTrail::Trail",
recommendation="Enable log file validation"
)
return self.findingsScoring Engine
# scoring/engine.py
from scanner.base import Severity, Finding
from typing import List
SEVERITY_WEIGHTS = {
Severity.CRITICAL: 10,
Severity.HIGH: 5,
Severity.MEDIUM: 2,
Severity.LOW: 1,
Severity.INFO: 0,
}
def calculate_score(findings: List[Finding]) -> dict:
if not findings:
return {"score": 100, "grade": "A+", "risk_level": "Low"}
total_deductions = sum(SEVERITY_WEIGHTS[f.severity] for f in findings)
max_score = 100
score = max(0, max_score - total_deductions)
counts = {}
for severity in Severity:
counts[severity.name] = len([f for f in findings if f.severity == severity])
if score >= 90:
grade, risk = "A", "Low"
elif score >= 75:
grade, risk = "B", "Moderate"
elif score >= 60:
grade, risk = "C", "Elevated"
elif score >= 40:
grade, risk = "D", "High"
else:
grade, risk = "F", "Critical"
return {
"score": score,
"grade": grade,
"risk_level": risk,
"total_findings": len(findings),
"counts": counts,
}CLI Entry Point
# scanner/cli.py
import argparse
import json
from datetime import datetime
from scanner.security_groups import SecurityGroupScanner
from scanner.s3_buckets import S3BucketScanner
from scanner.iam_policies import IAMScanner
from scanner.ebs_encryption import EBSScanner
from scanner.cloudtrail import CloudTrailScanner
from scoring.engine import calculate_score
SCANNERS = {
"security-groups": SecurityGroupScanner,
"s3": S3BucketScanner,
"iam": IAMScanner,
"ebs": EBSScanner,
"cloudtrail": CloudTrailScanner,
}
def main():
parser = argparse.ArgumentParser(description="Cloud Security Scanner")
parser.add_argument("--modules", nargs="+", choices=list(SCANNERS.keys()) + ["all"],
default=["all"], help="Modules to run")
parser.add_argument("--output", choices=["json", "html", "table"],
default="table", help="Output format")
parser.add_argument("--output-file", type=str, help="Output file path")
args = parser.parse_args()
modules = list(SCANNERS.keys()) if "all" in args.modules else args.modules
all_findings = []
print(f"\n{'='*60}")
print(f" Cloud Security Scanner — {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}")
print(f"{'='*60}\n")
for module_name in modules:
scanner_class = SCANNERS[module_name]
print(f" Scanning: {module_name}...", end=" ", flush=True)
scanner = scanner_class()
findings = scanner.scan()
all_findings.extend(findings)
print(f"found {len(findings)} findings")
# Score
result = calculate_score(all_findings)
print(f"\n{'='*60}")
print(f" Score: {result['score']}/100 (Grade: {result['grade']})")
print(f" Risk Level: {result['risk_level']}")
print(f" Total Findings: {result['total_findings']}")
for severity, count in result['counts'].items():
if count > 0:
print(f" {severity}: {count}")
print(f"{'='*60}\n")
if args.output == "json":
output = {
"scan_time": datetime.utcnow().isoformat(),
"score": result,
"findings": [
{
"title": f.title,
"severity": f.severity.name,
"resource_id": f.resource_id,
"resource_type": f.resource_type,
"description": f.description,
"recommendation": f.recommendation,
}
for f in all_findings
]
}
if args.output_file:
with open(args.output_file, 'w') as f:
json.dump(output, f, indent=2)
print(f"Report saved to {args.output_file}")
else:
print(json.dumps(output, indent=2))
elif args.output == "table":
for f in sorted(all_findings, key=lambda x: x.severity.value, reverse=True):
icon = {"CRITICAL": "🔴", "HIGH": "🟠", "MEDIUM": "🟡", "LOW": "🟢"}.get(f.severity.name, "⚪")
print(f" {icon} [{f.severity.name}] {f.title}")
print(f" Resource: {f.resource_id}")
print(f" Fix: {f.recommendation}\n")
if __name__ == "__main__":
main()Running the Scanner
# Scan everything
python -m scanner.cli
# Scan specific modules
python -m scanner.cli --modules security-groups s3
# JSON output
python -m scanner.cli --output json --output-file report.json
# Example output:
# ============================================================
# Cloud Security Scanner — 2026-04-04 14:30 UTC
# ============================================================
#
# Scanning: security-groups... found 3 findings
# Scanning: s3... found 5 findings
# Scanning: iam... found 4 findings
# Scanning: ebs... found 2 findings
# Scanning: cloudtrail... found 1 findings
#
# ============================================================
# Score: 55/100 (Grade: C)
# Risk Level: Elevated
# Total Findings: 15
# CRITICAL: 2
# HIGH: 7
# MEDIUM: 6
# ============================================================Extending the Scanner
The modular design makes it easy to add new scanners:
# scanner/rds_security.py — add RDS checks
class RDSScanner(BaseScanner):
def scan(self):
rds = self.session.client('rds')
instances = rds.describe_db_instances()['DBInstances']
for db in instances:
if db.get('PubliclyAccessible'):
self.add_finding(
title=f"RDS instance '{db['DBInstanceIdentifier']}' is publicly accessible",
severity=Severity.CRITICAL,
# ...
)
return self.findingsIdeas for extension:
- RDS security — public access, encryption, backup retention
- Lambda security — runtime versions, VPC config, environment variables
- VPC security — flow logs, default VPC usage, peering
- GuardDuty — check if enabled, unresolved findings
- Secrets Manager — rotation status, age of secrets
Course Roadmap
Key Takeaways
- Modular design — each scanner is independent and testable
- Severity scoring — not all findings are equal, weight them appropriately
- Actionable output — every finding includes a specific recommendation
- Easy to extend — add new scanners by implementing
BaseScanner.scan() - Run regularly — integrate into CI/CD or run as a scheduled Lambda
Course Wrap-Up
Over 15 articles, we’ve covered the full spectrum of cloud security engineering:
- Foundations (1-3): Security mindset, IAM mastery, secrets management
- Hardening (4-7): Container security, dependency scanning, code signing, supply chain
- Detection (8-10): CloudTrail observability, auto-remediation, incident response
- Testing (11-12): OWASP for cloud, penetration testing basics
- Governance (13-14): Compliance automation, security pipelines
- Capstone (15): Building a real security scanner
Security engineering isn’t a destination — it’s a practice. The tools and techniques in this course give you the foundation to build, operate, and defend cloud systems with confidence. Now go build something secure.











