Cloud9 Min Read

Build a Cloud Security Scanner — Hands-On Project

Gorav Singal

April 04, 2026

TL;DR

Put everything together: build a Python CLI that scans your AWS account for the top 10 security misconfigurations, scores each finding by severity, and generates an HTML report. A capstone project that ties the entire course together.

Build a Cloud Security Scanner — Hands-On Project

You’ve learned the theory. Now let’s build something real.

This capstone project ties together everything from the course: IAM analysis, network security, encryption verification, logging checks, and automated reporting. By the end, you’ll have a working Python CLI that scans an AWS account for the most common security misconfigurations and generates a scored report.

Scanner Architecture

Project Overview

We’re building cloud-sec-scan — a command-line tool that:

  1. Scans five security domains (Security Groups, S3, IAM, EBS, CloudTrail)
  2. Scores each finding by severity (Critical, High, Medium, Low)
  3. Calculates an overall account security score
  4. Generates an HTML report

Architecture

The scanner follows a modular design:

cloud-sec-scan/
├── scanner/
│   ├── __init__.py
│   ├── cli.py           # CLI entry point
│   ├── base.py          # Base scanner class
│   ├── security_groups.py
│   ├── s3_buckets.py
│   ├── iam_policies.py
│   ├── ebs_encryption.py
│   └── cloudtrail.py
├── scoring/
│   ├── __init__.py
│   └── engine.py        # Severity scoring
├── reporting/
│   ├── __init__.py
│   └── html_report.py   # HTML report generator
├── requirements.txt
└── README.md

Setting Up the Project

mkdir cloud-sec-scan && cd cloud-sec-scan
python -m venv venv
source venv/bin/activate

pip install boto3 jinja2
# requirements.txt
boto3>=1.28.0
jinja2>=3.1.0

Base Scanner Class

Every scanner module inherits from a common base:

# scanner/base.py
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from typing import List

class Severity(Enum):
    CRITICAL = 4
    HIGH = 3
    MEDIUM = 2
    LOW = 1
    INFO = 0

@dataclass
class Finding:
    title: str
    description: str
    severity: Severity
    resource_id: str
    resource_type: str
    recommendation: str
    region: str = "global"
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())

class BaseScanner:
    """Base class for all security scanners"""

    def __init__(self, session=None):
        self.session = session or __import__('boto3').Session()
        self.findings: List[Finding] = []

    def scan(self) -> List[Finding]:
        raise NotImplementedError

    def add_finding(self, **kwargs):
        finding = Finding(**kwargs)
        self.findings.append(finding)
        return finding

Scanner Module 1: Security Groups

Detects security groups with overly permissive ingress rules.

# scanner/security_groups.py
import boto3
from .base import BaseScanner, Severity

DANGEROUS_PORTS = {
    22: "SSH",
    3389: "RDP",
    3306: "MySQL",
    5432: "PostgreSQL",
    27017: "MongoDB",
    6379: "Redis",
    9200: "Elasticsearch",
}

class SecurityGroupScanner(BaseScanner):

    def scan(self):
        ec2 = self.session.client('ec2')
        paginator = ec2.get_paginator('describe_security_groups')

        for page in paginator.paginate():
            for sg in page['SecurityGroups']:
                self._check_ingress_rules(sg)

        return self.findings

    def _check_ingress_rules(self, sg):
        sg_id = sg['GroupId']
        sg_name = sg.get('GroupName', 'unknown')

        for rule in sg.get('IpPermissions', []):
            from_port = rule.get('FromPort', 0)
            to_port = rule.get('ToPort', 65535)

            for ip_range in rule.get('IpRanges', []):
                cidr = ip_range.get('CidrIp', '')
                if cidr == '0.0.0.0/0':
                    self._report_open_rule(sg_id, sg_name, from_port, to_port, cidr)

            for ip_range in rule.get('Ipv6Ranges', []):
                cidr = ip_range.get('CidrIpv6', '')
                if cidr == '::/0':
                    self._report_open_rule(sg_id, sg_name, from_port, to_port, cidr)

    def _report_open_rule(self, sg_id, sg_name, from_port, to_port, cidr):
        # Check if it's a dangerous port
        for port, service in DANGEROUS_PORTS.items():
            if from_port <= port <= to_port:
                self.add_finding(
                    title=f"Security group allows {service} ({port}) from internet",
                    description=f"Security group {sg_id} ({sg_name}) allows inbound traffic on port {port} ({service}) from {cidr}",
                    severity=Severity.CRITICAL,
                    resource_id=sg_id,
                    resource_type="AWS::EC2::SecurityGroup",
                    recommendation=f"Restrict port {port} to specific IP ranges or remove the rule"
                )
                return

        # Generic open port
        if from_port == 0 and to_port == 65535:
            self.add_finding(
                title=f"Security group allows ALL ports from internet",
                description=f"Security group {sg_id} ({sg_name}) allows all inbound traffic from {cidr}",
                severity=Severity.CRITICAL,
                resource_id=sg_id,
                resource_type="AWS::EC2::SecurityGroup",
                recommendation="Remove the 0.0.0.0/0 rule or restrict to specific ports"
            )
        else:
            self.add_finding(
                title=f"Security group allows port {from_port}-{to_port} from internet",
                description=f"Security group {sg_id} ({sg_name}) allows inbound traffic on ports {from_port}-{to_port} from {cidr}",
                severity=Severity.HIGH,
                resource_id=sg_id,
                resource_type="AWS::EC2::SecurityGroup",
                recommendation="Restrict to specific IP ranges"
            )

Scanner Module 2: S3 Buckets

# scanner/s3_buckets.py
from .base import BaseScanner, Severity

class S3BucketScanner(BaseScanner):

    def scan(self):
        s3 = self.session.client('s3')
        s3control = self.session.client('s3control')
        account_id = self.session.client('sts').get_caller_identity()['Account']

        # Check account-level public access block
        try:
            account_block = s3control.get_public_access_block(AccountId=account_id)
            config = account_block['PublicAccessBlockConfiguration']
            if not all([config.get('BlockPublicAcls'), config.get('BlockPublicPolicy'),
                       config.get('IgnorePublicAcls'), config.get('RestrictPublicBuckets')]):
                self.add_finding(
                    title="Account-level S3 Block Public Access is not fully enabled",
                    description="One or more Block Public Access settings are disabled at the account level",
                    severity=Severity.HIGH,
                    resource_id=account_id,
                    resource_type="AWS::S3::AccountPublicAccessBlock",
                    recommendation="Enable all four Block Public Access settings at the account level"
                )
        except Exception:
            self.add_finding(
                title="Account-level S3 Block Public Access is not configured",
                severity=Severity.HIGH,
                description="No account-level public access block found",
                resource_id=account_id,
                resource_type="AWS::S3::AccountPublicAccessBlock",
                recommendation="Enable Block Public Access at the account level"
            )

        # Check individual buckets
        buckets = s3.list_buckets().get('Buckets', [])
        for bucket in buckets:
            name = bucket['Name']
            self._check_encryption(s3, name)
            self._check_versioning(s3, name)
            self._check_logging(s3, name)

        return self.findings

    def _check_encryption(self, s3, bucket_name):
        try:
            s3.get_bucket_encryption(Bucket=bucket_name)
        except s3.exceptions.ClientError:
            self.add_finding(
                title=f"S3 bucket '{bucket_name}' has no default encryption",
                description=f"Bucket {bucket_name} does not have server-side encryption enabled",
                severity=Severity.HIGH,
                resource_id=bucket_name,
                resource_type="AWS::S3::Bucket",
                recommendation="Enable default encryption with KMS or AES-256"
            )

    def _check_versioning(self, s3, bucket_name):
        response = s3.get_bucket_versioning(Bucket=bucket_name)
        if response.get('Status') != 'Enabled':
            self.add_finding(
                title=f"S3 bucket '{bucket_name}' versioning is not enabled",
                description=f"Bucket {bucket_name} does not have versioning enabled",
                severity=Severity.MEDIUM,
                resource_id=bucket_name,
                resource_type="AWS::S3::Bucket",
                recommendation="Enable versioning for data protection and recovery"
            )

    def _check_logging(self, s3, bucket_name):
        response = s3.get_bucket_logging(Bucket=bucket_name)
        if 'LoggingEnabled' not in response:
            self.add_finding(
                title=f"S3 bucket '{bucket_name}' access logging is not enabled",
                description=f"Bucket {bucket_name} does not have access logging enabled",
                severity=Severity.MEDIUM,
                resource_id=bucket_name,
                resource_type="AWS::S3::Bucket",
                recommendation="Enable access logging for audit trail"
            )

Scanner Module 3: IAM Policies

# scanner/iam_policies.py
from .base import BaseScanner, Severity
import json

class IAMScanner(BaseScanner):

    def scan(self):
        iam = self.session.client('iam')

        self._check_root_access_keys(iam)
        self._check_mfa(iam)
        self._check_admin_policies(iam)
        self._check_old_access_keys(iam)

        return self.findings

    def _check_root_access_keys(self, iam):
        summary = iam.get_account_summary()['SummaryMap']
        if summary.get('AccountAccessKeysPresent', 0) > 0:
            self.add_finding(
                title="Root account has access keys",
                description="The root account has active access keys, which is a critical security risk",
                severity=Severity.CRITICAL,
                resource_id="root",
                resource_type="AWS::IAM::User",
                recommendation="Delete root access keys and use IAM roles instead"
            )

    def _check_mfa(self, iam):
        users = iam.list_users()['Users']
        for user in users:
            mfa_devices = iam.list_mfa_devices(UserName=user['UserName'])['MFADevices']
            login_profile_exists = True
            try:
                iam.get_login_profile(UserName=user['UserName'])
            except iam.exceptions.NoSuchEntityException:
                login_profile_exists = False

            if login_profile_exists and not mfa_devices:
                self.add_finding(
                    title=f"IAM user '{user['UserName']}' has console access without MFA",
                    description=f"User {user['UserName']} can log into the console but has no MFA device configured",
                    severity=Severity.HIGH,
                    resource_id=user['UserName'],
                    resource_type="AWS::IAM::User",
                    recommendation="Enable MFA for all console users"
                )

    def _check_admin_policies(self, iam):
        roles = iam.list_roles()['Roles']
        for role in roles:
            policies = iam.list_attached_role_policies(RoleName=role['RoleName'])['AttachedPolicies']
            for policy in policies:
                if policy['PolicyArn'] == 'arn:aws:iam::aws:policy/AdministratorAccess':
                    self.add_finding(
                        title=f"Role '{role['RoleName']}' has AdministratorAccess",
                        description=f"IAM role {role['RoleName']} has the AdministratorAccess managed policy attached",
                        severity=Severity.HIGH,
                        resource_id=role['RoleName'],
                        resource_type="AWS::IAM::Role",
                        recommendation="Replace with least-privilege policy using IAM Access Analyzer"
                    )

    def _check_old_access_keys(self, iam):
        from datetime import datetime, timezone, timedelta
        users = iam.list_users()['Users']
        threshold = datetime.now(timezone.utc) - timedelta(days=90)

        for user in users:
            keys = iam.list_access_keys(UserName=user['UserName'])['AccessKeyMetadata']
            for key in keys:
                if key['Status'] == 'Active' and key['CreateDate'] < threshold:
                    age_days = (datetime.now(timezone.utc) - key['CreateDate']).days
                    self.add_finding(
                        title=f"Access key for '{user['UserName']}' is {age_days} days old",
                        description=f"Access key {key['AccessKeyId']} for user {user['UserName']} was created {age_days} days ago",
                        severity=Severity.MEDIUM,
                        resource_id=key['AccessKeyId'],
                        resource_type="AWS::IAM::AccessKey",
                        recommendation="Rotate access keys every 90 days"
                    )

Scanner Module 4: EBS Encryption

# scanner/ebs_encryption.py
from .base import BaseScanner, Severity

class EBSScanner(BaseScanner):

    def scan(self):
        ec2 = self.session.client('ec2')

        # Check default encryption
        try:
            default_enc = ec2.get_ebs_encryption_by_default()
            if not default_enc.get('EbsEncryptionByDefault'):
                self.add_finding(
                    title="EBS default encryption is not enabled",
                    description="New EBS volumes will not be encrypted by default",
                    severity=Severity.HIGH,
                    resource_id="ebs-default-encryption",
                    resource_type="AWS::EC2::Volume",
                    recommendation="Enable EBS encryption by default for the region"
                )
        except Exception:
            pass

        # Check existing volumes
        paginator = ec2.get_paginator('describe_volumes')
        for page in paginator.paginate():
            for volume in page['Volumes']:
                if not volume.get('Encrypted', False):
                    self.add_finding(
                        title=f"EBS volume {volume['VolumeId']} is not encrypted",
                        description=f"Volume {volume['VolumeId']} (state: {volume['State']}, size: {volume['Size']}GB) is not encrypted",
                        severity=Severity.HIGH,
                        resource_id=volume['VolumeId'],
                        resource_type="AWS::EC2::Volume",
                        recommendation="Create an encrypted snapshot and replace the volume"
                    )

        return self.findings

Scanner Module 5: CloudTrail Status

# scanner/cloudtrail.py
from .base import BaseScanner, Severity

class CloudTrailScanner(BaseScanner):

    def scan(self):
        ct = self.session.client('cloudtrail')
        trails = ct.describe_trails()['trailList']

        if not trails:
            self.add_finding(
                title="No CloudTrail trails configured",
                description="This account has no CloudTrail trails. API calls are not being logged.",
                severity=Severity.CRITICAL,
                resource_id="cloudtrail",
                resource_type="AWS::CloudTrail::Trail",
                recommendation="Create a multi-region trail with log file validation"
            )
            return self.findings

        for trail in trails:
            name = trail['Name']
            status = ct.get_trail_status(Name=name)

            if not status.get('IsLogging'):
                self.add_finding(
                    title=f"CloudTrail '{name}' is not logging",
                    description=f"Trail {name} exists but logging is stopped",
                    severity=Severity.CRITICAL,
                    resource_id=name,
                    resource_type="AWS::CloudTrail::Trail",
                    recommendation="Start logging immediately — this may indicate tampering"
                )

            if not trail.get('IsMultiRegionTrail'):
                self.add_finding(
                    title=f"CloudTrail '{name}' is not multi-region",
                    description=f"Trail {name} only covers one region. Activity in other regions is not logged.",
                    severity=Severity.HIGH,
                    resource_id=name,
                    resource_type="AWS::CloudTrail::Trail",
                    recommendation="Enable multi-region logging"
                )

            if not trail.get('LogFileValidationEnabled'):
                self.add_finding(
                    title=f"CloudTrail '{name}' log file validation is disabled",
                    description=f"Without log validation, tampered logs cannot be detected",
                    severity=Severity.MEDIUM,
                    resource_id=name,
                    resource_type="AWS::CloudTrail::Trail",
                    recommendation="Enable log file validation"
                )

        return self.findings

Scoring Engine

Scanner Scoring Model

# scoring/engine.py
from scanner.base import Severity, Finding
from typing import List

SEVERITY_WEIGHTS = {
    Severity.CRITICAL: 10,
    Severity.HIGH: 5,
    Severity.MEDIUM: 2,
    Severity.LOW: 1,
    Severity.INFO: 0,
}

def calculate_score(findings: List[Finding]) -> dict:
    if not findings:
        return {"score": 100, "grade": "A+", "risk_level": "Low"}

    total_deductions = sum(SEVERITY_WEIGHTS[f.severity] for f in findings)
    max_score = 100
    score = max(0, max_score - total_deductions)

    counts = {}
    for severity in Severity:
        counts[severity.name] = len([f for f in findings if f.severity == severity])

    if score >= 90:
        grade, risk = "A", "Low"
    elif score >= 75:
        grade, risk = "B", "Moderate"
    elif score >= 60:
        grade, risk = "C", "Elevated"
    elif score >= 40:
        grade, risk = "D", "High"
    else:
        grade, risk = "F", "Critical"

    return {
        "score": score,
        "grade": grade,
        "risk_level": risk,
        "total_findings": len(findings),
        "counts": counts,
    }

CLI Entry Point

# scanner/cli.py
import argparse
import json
from datetime import datetime

from scanner.security_groups import SecurityGroupScanner
from scanner.s3_buckets import S3BucketScanner
from scanner.iam_policies import IAMScanner
from scanner.ebs_encryption import EBSScanner
from scanner.cloudtrail import CloudTrailScanner
from scoring.engine import calculate_score

SCANNERS = {
    "security-groups": SecurityGroupScanner,
    "s3": S3BucketScanner,
    "iam": IAMScanner,
    "ebs": EBSScanner,
    "cloudtrail": CloudTrailScanner,
}

def main():
    parser = argparse.ArgumentParser(description="Cloud Security Scanner")
    parser.add_argument("--modules", nargs="+", choices=list(SCANNERS.keys()) + ["all"],
                       default=["all"], help="Modules to run")
    parser.add_argument("--output", choices=["json", "html", "table"],
                       default="table", help="Output format")
    parser.add_argument("--output-file", type=str, help="Output file path")
    args = parser.parse_args()

    modules = list(SCANNERS.keys()) if "all" in args.modules else args.modules
    all_findings = []

    print(f"\n{'='*60}")
    print(f"  Cloud Security Scanner — {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}")
    print(f"{'='*60}\n")

    for module_name in modules:
        scanner_class = SCANNERS[module_name]
        print(f"  Scanning: {module_name}...", end=" ", flush=True)
        scanner = scanner_class()
        findings = scanner.scan()
        all_findings.extend(findings)
        print(f"found {len(findings)} findings")

    # Score
    result = calculate_score(all_findings)

    print(f"\n{'='*60}")
    print(f"  Score: {result['score']}/100 (Grade: {result['grade']})")
    print(f"  Risk Level: {result['risk_level']}")
    print(f"  Total Findings: {result['total_findings']}")
    for severity, count in result['counts'].items():
        if count > 0:
            print(f"    {severity}: {count}")
    print(f"{'='*60}\n")

    if args.output == "json":
        output = {
            "scan_time": datetime.utcnow().isoformat(),
            "score": result,
            "findings": [
                {
                    "title": f.title,
                    "severity": f.severity.name,
                    "resource_id": f.resource_id,
                    "resource_type": f.resource_type,
                    "description": f.description,
                    "recommendation": f.recommendation,
                }
                for f in all_findings
            ]
        }
        if args.output_file:
            with open(args.output_file, 'w') as f:
                json.dump(output, f, indent=2)
            print(f"Report saved to {args.output_file}")
        else:
            print(json.dumps(output, indent=2))

    elif args.output == "table":
        for f in sorted(all_findings, key=lambda x: x.severity.value, reverse=True):
            icon = {"CRITICAL": "🔴", "HIGH": "🟠", "MEDIUM": "🟡", "LOW": "🟢"}.get(f.severity.name, "⚪")
            print(f"  {icon} [{f.severity.name}] {f.title}")
            print(f"     Resource: {f.resource_id}")
            print(f"     Fix: {f.recommendation}\n")

if __name__ == "__main__":
    main()

Running the Scanner

# Scan everything
python -m scanner.cli

# Scan specific modules
python -m scanner.cli --modules security-groups s3

# JSON output
python -m scanner.cli --output json --output-file report.json

# Example output:
# ============================================================
#   Cloud Security Scanner — 2026-04-04 14:30 UTC
# ============================================================
#
#   Scanning: security-groups... found 3 findings
#   Scanning: s3... found 5 findings
#   Scanning: iam... found 4 findings
#   Scanning: ebs... found 2 findings
#   Scanning: cloudtrail... found 1 findings
#
# ============================================================
#   Score: 55/100 (Grade: C)
#   Risk Level: Elevated
#   Total Findings: 15
#     CRITICAL: 2
#     HIGH: 7
#     MEDIUM: 6
# ============================================================

Extending the Scanner

The modular design makes it easy to add new scanners:

# scanner/rds_security.py — add RDS checks
class RDSScanner(BaseScanner):
    def scan(self):
        rds = self.session.client('rds')
        instances = rds.describe_db_instances()['DBInstances']
        for db in instances:
            if db.get('PubliclyAccessible'):
                self.add_finding(
                    title=f"RDS instance '{db['DBInstanceIdentifier']}' is publicly accessible",
                    severity=Severity.CRITICAL,
                    # ...
                )
        return self.findings

Ideas for extension:

  • RDS security — public access, encryption, backup retention
  • Lambda security — runtime versions, VPC config, environment variables
  • VPC security — flow logs, default VPC usage, peering
  • GuardDuty — check if enabled, unresolved findings
  • Secrets Manager — rotation status, age of secrets

Course Roadmap

Cloud Security Engineering Course Roadmap

Key Takeaways

  1. Modular design — each scanner is independent and testable
  2. Severity scoring — not all findings are equal, weight them appropriately
  3. Actionable output — every finding includes a specific recommendation
  4. Easy to extend — add new scanners by implementing BaseScanner.scan()
  5. Run regularly — integrate into CI/CD or run as a scheduled Lambda

Course Wrap-Up

Over 15 articles, we’ve covered the full spectrum of cloud security engineering:

  • Foundations (1-3): Security mindset, IAM mastery, secrets management
  • Hardening (4-7): Container security, dependency scanning, code signing, supply chain
  • Detection (8-10): CloudTrail observability, auto-remediation, incident response
  • Testing (11-12): OWASP for cloud, penetration testing basics
  • Governance (13-14): Compliance automation, security pipelines
  • Capstone (15): Building a real security scanner

Security engineering isn’t a destination — it’s a practice. The tools and techniques in this course give you the foundation to build, operate, and defend cloud systems with confidence. Now go build something secure.

Share

Related Posts

CloudTrail and Security Observability

CloudTrail and Security Observability

You can’t secure what you can’t see. That sounds like a bumper sticker, but it’s…

AWS IAM Security — Beyond Basic Roles

AWS IAM Security — Beyond Basic Roles

IAM is the front door to your AWS account. And most teams leave it wide open. I…

Supply Chain Security — Protecting Your Software Pipeline

Supply Chain Security — Protecting Your Software Pipeline

In 2024, a single malicious contributor nearly compromised every Linux system on…

Security Ticketing and Incident Response

Security Ticketing and Incident Response

The worst time to figure out your incident response process is during an…

Security Mindset for Engineers — Think Like an Attacker

Security Mindset for Engineers — Think Like an Attacker

Most engineers think about security the way they think about flossing — they…

Secrets Management — Vault, SSM, and Secrets Manager

Secrets Management — Vault, SSM, and Secrets Manager

I’ve watched a production database get wiped because someone committed a root…

Latest Posts

AI Video Generation in 2025 — Models, Costs, and How to Build a Cost-Effective Pipeline

AI Video Generation in 2025 — Models, Costs, and How to Build a Cost-Effective Pipeline

AI video generation went from “cool demo” to “usable in production” in 2024-202…

AI Models in 2025 — Cost, Capabilities, and Which One to Use

AI Models in 2025 — Cost, Capabilities, and Which One to Use

Choosing the right AI model is one of the most impactful decisions you’ll make…

AI Image Generation in 2025 — Models, Costs, and How to Optimize Spend

AI Image Generation in 2025 — Models, Costs, and How to Optimize Spend

Generating one image with AI costs between $0.002 and $0.12. That might sound…

AI Agents Demystified — It's Just Automation With a Better Brain

AI Agents Demystified — It's Just Automation With a Better Brain

Let’s cut through the noise. If you read Twitter or LinkedIn, you’d think “AI…

AI Coding Assistants in 2025 — Every Tool Compared, and Which One to Actually Use

AI Coding Assistants in 2025 — Every Tool Compared, and Which One to Actually Use

Two years ago, AI coding meant one thing: GitHub Copilot autocompleting your…

Supply Chain Security — Protecting Your Software Pipeline

Supply Chain Security — Protecting Your Software Pipeline

In 2024, a single malicious contributor nearly compromised every Linux system on…