Practical AWS Lambda Automations for Cost Control

Three production-ready Lambda functions: an orphaned Bedrock Knowledge Base scanner, a new-region billing security sensor, and an AI resource tagging enforcer. Copy, deploy, schedule.

Lambda is the right tool for FinOps automation because it runs on a schedule, costs almost nothing, has native access to every AWS API, and can publish to SNS without any infrastructure beyond an IAM role. The three functions below address the AI-era cost and security problems that rules-based alerting misses: orphaned AI infrastructure billing under the wrong service name, credential compromise detectable in billing data before it appears in logs, and AI resources provisioned through console Quick-create that bypass every tagging guardrail you've built.

Each function is standalone โ€” deploy any or all three. The shared IAM role at the end covers permissions for all three.


Function 1 โ€” Orphaned Bedrock Knowledge Base Scanner

When a Bedrock Knowledge Base is deleted from the Bedrock console, the OpenSearch Serverless collection it used as a vector store is not deleted. The collection continues billing at $11.52/day under "Amazon OpenSearch Service" while the Knowledge Base no longer exists. This function finds every OpenSearch Serverless collection that was provisioned by a Knowledge Base and is no longer referenced by any active KB โ€” the definition of an orphan.

Detection logic: Get all active Knowledge Bases โ†’ for each, retrieve its storage configuration and extract the OpenSearch Serverless collection ARN it references โ†’ build the set of actively-referenced ARNs โ†’ list all OpenSearch Serverless collections whose names match the bedrock-knowledge-base-* convention โ†’ any collection whose ARN is not in the active set is orphaned.

import boto3
import json
import os
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

ESTIMATED_MONTHLY_COST_USD = 345.60  # 2 OCUs ร— $0.24/hr ร— 720 hrs
REQUIRED_TAG_KEYS = ['owner', 'project', 'environment']


def lambda_handler(event, context):
    region = os.environ.get('AWS_REGION', 'us-east-1')
    sns_topic_arn = os.environ['SNS_TOPIC_ARN']

    bedrock = boto3.client('bedrock-agent', region_name=region)
    oss = boto3.client('opensearchserverless', region_name=region)
    sns = boto3.client('sns', region_name=region)

    # Step 1: collect ARNs of all OSS collections actively used by a KB
    active_collection_arns = _get_active_kb_collection_arns(bedrock)
    logger.info(f"Active KB collection ARNs: {len(active_collection_arns)}")

    # Step 2: list all OSS collections matching the Bedrock naming convention
    orphaned = []
    response = oss.list_collections()
    for collection in response.get('collectionSummaries', []):
        name = collection.get('name', '')
        if not name.startswith('bedrock-knowledge-base-'):
            continue  # not a Bedrock-provisioned collection

        arn = collection.get('arn', '')
        if arn not in active_collection_arns:
            orphaned.append({
                'name': name,
                'id': collection.get('id', ''),
                'arn': arn,
                'status': collection.get('status', 'UNKNOWN'),
                'estimated_monthly_cost_usd': ESTIMATED_MONTHLY_COST_USD,
            })

    logger.info(f"Orphaned collections found: {len(orphaned)}")

    if orphaned:
        _publish_alert(sns, sns_topic_arn, orphaned)

    return {
        'statusCode': 200,
        'orphaned_count': len(orphaned),
        'active_kb_collection_count': len(active_collection_arns),
        'orphaned_collections': orphaned,
    }


def _get_active_kb_collection_arns(bedrock_client):
    """
    Return the set of OpenSearch Serverless collection ARNs
    currently referenced by at least one active Knowledge Base.
    """
    active_arns = set()

    paginator = bedrock_client.get_paginator('list_knowledge_bases')
    for page in paginator.paginate():
        for kb_summary in page.get('knowledgeBaseSummaries', []):
            kb_id = kb_summary['knowledgeBaseId']
            try:
                detail = bedrock_client.get_knowledge_base(knowledgeBaseId=kb_id)
                storage = detail['knowledgeBase'].get('storageConfiguration', {})
                oss_config = storage.get('opensearchServerlessConfiguration', {})
                collection_arn = oss_config.get('collectionArn')
                if collection_arn:
                    active_arns.add(collection_arn)
            except Exception as e:
                logger.warning(f"Could not retrieve KB detail for {kb_id}: {e}")

    return active_arns


def _publish_alert(sns_client, topic_arn, orphaned_collections):
    total_monthly_waste = sum(
        c['estimated_monthly_cost_usd'] for c in orphaned_collections
    )
    lines = [
        f"ORPHANED BEDROCK KB COLLECTIONS โ€” {len(orphaned_collections)} found",
        f"Estimated waste: ${total_monthly_waste:.2f}/month",
        "",
    ]
    for c in orphaned_collections:
        lines += [
            f"Collection: {c['name']}",
            f"Status:     {c['status']}",
            f"ARN:        {c['arn']}",
            f"Cost:       ~${c['estimated_monthly_cost_usd']:.2f}/month",
            f"Fix:        AWS Console โ†’ OpenSearch Service โ†’ Serverless โ†’",
            f"            Collections โ†’ select โ†’ Delete",
            "",
        ]
    lines.append("Billing appears under 'Amazon OpenSearch Service' (not Bedrock).")
    lines.append("Deletion stops billing immediately.")

    sns_client.publish(
        TopicArn=topic_arn,
        Subject=f"[COST ALERT] {len(orphaned_collections)} orphaned Bedrock KB collection(s) โ€” "
                f"${total_monthly_waste:.0f}/month",
        Message="\n".join(lines),
    )

Schedule: Daily. A new orphaned collection appears within hours of a developer deleting a Knowledge Base โ€” daily scanning limits waste to at most one day's floor cost ($11.52) before detection.

Environment variables required: SNS_TOPIC_ARN โ€” the ARN of the SNS topic that routes to your team's alert channel.

IAM permissions required: bedrock:ListKnowledgeBases, bedrock:GetKnowledgeBase, aoss:ListCollections, sns:Publish.


Function 2 โ€” New-Region Billing Security Sensor

When an attacker compromises AWS credentials, the first observable evidence in billing data is often a new AWS region appearing in the account โ€” GPU instances for cryptomining, EC2 in a region the team has never used, or managed AI services in an unexpected geography. This function queries AWS Cost Explorer for the regions that have billed in the last 48 hours and compares them against the regions that billed in the prior 90 days. Any region not in the historical set is an immediate alert โ€” the zero-threshold security rule.

This is distinct from volume-based anomaly detection: it does not require a spend threshold, it does not require a learning period, and it fires on the first dollar of a compromised-credential campaign. Legitimate new-region deployments go through change management; they do not appear silently in billing data.

import boto3
import json
import os
import logging
from datetime import datetime, timedelta, timezone

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
    region = os.environ.get('AWS_REGION', 'us-east-1')
    sns_topic_arn = os.environ['SNS_TOPIC_ARN']
    # Optional: comma-separated list of regions to ignore (e.g. a planned new deployment)
    suppressed_regions = set(
        r.strip() for r in os.environ.get('SUPPRESSED_REGIONS', '').split(',') if r.strip()
    )

    ce = boto3.client('ce', region_name='us-east-1')  # CE is always us-east-1
    sns = boto3.client('sns', region_name=region)

    today = datetime.now(timezone.utc).date()
    window_end = today.isoformat()
    window_recent_start = (today - timedelta(days=2)).isoformat()
    window_history_start = (today - timedelta(days=92)).isoformat()
    window_history_end = (today - timedelta(days=2)).isoformat()

    # Regions billed in the last 48 hours
    recent_regions = _get_billed_regions(ce, window_recent_start, window_end)

    # Regions billed in the prior 90-day history window
    historical_regions = _get_billed_regions(ce, window_history_start, window_history_end)

    # New = present in recent, absent from history, not suppressed
    new_regions = {
        r for r in recent_regions
        if r not in historical_regions and r not in suppressed_regions
        and r != 'NoRegion'  # Cost Explorer uses 'NoRegion' for global charges
    }

    logger.info(f"Recent regions: {recent_regions}")
    logger.info(f"Historical regions: {historical_regions}")
    logger.info(f"New regions detected: {new_regions}")

    if new_regions:
        _publish_security_alert(sns, sns_topic_arn, new_regions)

    return {
        'statusCode': 200,
        'new_regions_detected': list(new_regions),
        'recent_region_count': len(recent_regions),
        'historical_region_count': len(historical_regions),
    }


def _get_billed_regions(ce_client, start_date, end_date):
    """
    Return the set of AWS regions that appear in billing data
    for the given date range.
    """
    regions = set()
    kwargs = {
        'TimePeriod': {'Start': start_date, 'End': end_date},
        'Granularity': 'DAILY',
        'Metrics': ['BlendedCost'],
        'GroupBy': [{'Type': 'DIMENSION', 'Key': 'REGION'}],
        'Filter': {
            'Dimensions': {
                'Key': 'RECORD_TYPE',
                'Values': ['Usage'],
            }
        },
    }
    try:
        while True:
            response = ce_client.get_cost_and_usage(**kwargs)
            for result in response.get('ResultsByTime', []):
                for group in result.get('Groups', []):
                    region_value = group['Keys'][0]
                    cost = float(
                        group['Metrics']['BlendedCost']['Amount']
                    )
                    if cost > 0:
                        regions.add(region_value)
            next_token = response.get('NextPageToken')
            if not next_token:
                break
            kwargs['NextPageToken'] = next_token
    except Exception as e:
        logger.error(f"Cost Explorer query failed ({start_date} โ†’ {end_date}): {e}")

    return regions


def _publish_security_alert(sns_client, topic_arn, new_regions):
    lines = [
        "SECURITY BILLING ALERT โ€” NEW REGION(S) DETECTED",
        "",
        "One or more AWS regions appeared in billing data that have no",
        "history in the prior 90 days. This may indicate:",
        "  โ€ข Credential compromise (cryptomining, data exfiltration)",
        "  โ€ข Unauthorised shadow IT deployment",
        "  โ€ข A legitimate new region not yet suppressed in this function",
        "",
        f"New region(s): {', '.join(sorted(new_regions))}",
        "",
        "Immediate actions:",
        "  1. Check AWS Console โ†’ EC2 โ†’ switch to each flagged region โ†’",
        "     confirm all running instances are expected.",
        "  2. Check AWS Console โ†’ Cost Explorer โ†’ filter by flagged region โ†’",
        "     identify the service and resource generating spend.",
        "  3. If unauthorised: rotate IAM credentials immediately,",
        "     terminate resources, and open an AWS support case.",
        "  4. If legitimate: add the region to SUPPRESSED_REGIONS env var.",
    ]
    sns_client.publish(
        TopicArn=topic_arn,
        Subject=f"[SECURITY] New AWS region(s) in billing: {', '.join(sorted(new_regions))}",
        Message="\n".join(lines),
    )

Schedule: Every 6 hours. Cost Explorer data has up to 24-hour delivery lag โ€” running more frequently than every 6 hours does not improve latency. For near-real-time detection (under 30 minutes), use the Athena-based query against FOCUS billing exports described in the CUR Deep Dive article instead.

Environment variables required: SNS_TOPIC_ARN; optionally SUPPRESSED_REGIONS (comma-separated list of regions to skip, e.g. us-west-2,eu-west-1 for a planned expansion).

IAM permissions required: ce:GetCostAndUsage, sns:Publish. Note: Cost Explorer API calls are only valid from us-east-1 regardless of where the Lambda runs โ€” the function handles this by always creating the CE client with region_name='us-east-1'.


Function 3 โ€” AI Resource Tagging Enforcer

Console Quick-create flows for Bedrock Knowledge Bases, SageMaker notebooks, and OpenSearch Serverless collections provision resources without enforcing the tag policies you apply to IaC-managed infrastructure. Untagged AI resources have two failure modes: cost alerts cannot route to an owner, and security tooling cannot match the resource to an authorized project. This function scans all three AI service types, checks each resource for required tag keys, and publishes a report for any resource missing a tag.

import boto3
import json
import os
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Define the tags every AI resource must have
REQUIRED_TAGS = ['owner', 'project', 'environment']


def lambda_handler(event, context):
    region = os.environ.get('AWS_REGION', 'us-east-1')
    sns_topic_arn = os.environ['SNS_TOPIC_ARN']

    bedrock = boto3.client('bedrock-agent', region_name=region)
    bedrock_base = boto3.client('bedrock', region_name=region)
    oss = boto3.client('opensearchserverless', region_name=region)
    sm = boto3.client('sagemaker', region_name=region)
    sns = boto3.client('sns', region_name=region)

    violations = []

    # --- Bedrock Knowledge Bases ---
    paginator = bedrock.get_paginator('list_knowledge_bases')
    for page in paginator.paginate():
        for kb in page.get('knowledgeBaseSummaries', []):
            kb_arn = kb['knowledgeBaseArn']
            kb_name = kb.get('name', kb['knowledgeBaseId'])
            try:
                tag_response = bedrock_base.list_tags_for_resource(resourceARN=kb_arn)
                tags = tag_response.get('tags', {})
                missing = [k for k in REQUIRED_TAGS if k not in tags]
                if missing:
                    violations.append({
                        'service': 'Bedrock Knowledge Base',
                        'resource': kb_name,
                        'arn': kb_arn,
                        'missing_tags': missing,
                    })
            except Exception as e:
                logger.warning(f"Could not get tags for KB {kb_arn}: {e}")

    # --- OpenSearch Serverless Collections ---
    oss_response = oss.list_collections()
    for collection in oss_response.get('collectionSummaries', []):
        col_arn = collection.get('arn', '')
        col_name = collection.get('name', '')
        try:
            tag_response = oss.list_tags_for_resource(resourceArn=col_arn)
            tags = {t['key']: t['value'] for t in tag_response.get('tags', [])}
            missing = [k for k in REQUIRED_TAGS if k not in tags]
            if missing:
                violations.append({
                    'service': 'OpenSearch Serverless Collection',
                    'resource': col_name,
                    'arn': col_arn,
                    'missing_tags': missing,
                })
        except Exception as e:
            logger.warning(f"Could not get tags for OSS collection {col_arn}: {e}")

    # --- SageMaker Endpoints ---
    sm_response = sm.list_endpoints(StatusEquals='InService')
    for endpoint in sm_response.get('Endpoints', []):
        ep_arn = endpoint['EndpointArn']
        ep_name = endpoint['EndpointName']
        try:
            tag_response = sm.list_tags(ResourceArn=ep_arn)
            tags = {t['Key']: t['Value'] for t in tag_response.get('Tags', [])}
            missing = [k for k in REQUIRED_TAGS if k not in tags]
            if missing:
                violations.append({
                    'service': 'SageMaker Endpoint',
                    'resource': ep_name,
                    'arn': ep_arn,
                    'missing_tags': missing,
                })
        except Exception as e:
            logger.warning(f"Could not get tags for SageMaker endpoint {ep_arn}: {e}")

    logger.info(f"Tag violations found: {len(violations)}")

    if violations:
        _publish_tag_alert(sns, sns_topic_arn, violations)

    return {
        'statusCode': 200,
        'violation_count': len(violations),
        'violations': violations,
    }


def _publish_tag_alert(sns_client, topic_arn, violations):
    lines = [
        f"AI RESOURCE TAGGING VIOLATIONS โ€” {len(violations)} resource(s)",
        f"Required tags: {', '.join(REQUIRED_TAGS)}",
        "",
        "Untagged AI resources cannot be attributed for cost alerts",
        "or matched to authorized projects in security reviews.",
        "",
    ]
    for v in violations:
        lines += [
            f"Service:      {v['service']}",
            f"Resource:     {v['resource']}",
            f"ARN:          {v['arn']}",
            f"Missing tags: {', '.join(v['missing_tags'])}",
            "",
        ]
    lines += [
        "To fix: add the missing tags via the AWS Console or CLI:",
        "  aws bedrock tag-resource --resource-arn ARN \\",
        "    --tags owner=team-name project=project-name environment=prod",
        "",
        "To prevent recurrence: require IaC (Terraform/CDK) for all",
        "AI resource provisioning โ€” no console Quick-create in prod/staging.",
    ]
    sns_client.publish(
        TopicArn=topic_arn,
        Subject=f"[COMPLIANCE] {len(violations)} AI resource(s) missing required tags",
        Message="\n".join(lines),
    )

Schedule: Daily. Weekly is acceptable for stable environments; daily catches Quick-create resources before they accumulate a week of unattributed spend.

Environment variables required: SNS_TOPIC_ARN. To change the required tag keys, update the REQUIRED_TAGS list at the top of the function โ€” no environment variable needed.

IAM permissions required: bedrock:ListKnowledgeBases, bedrock:ListTagsForResource, aoss:ListCollections, aoss:ListTagsForResource, sagemaker:ListEndpoints, sagemaker:ListTags, sns:Publish.


Shared IAM Role (Terraform)

resource "aws_iam_role" "finops_lambda" {
  name = "finops-lambda-${var.environment}"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect    = "Allow"
      Principal = { Service = "lambda.amazonaws.com" }
      Action    = "sts:AssumeRole"
    }]
  })
}

resource "aws_iam_role_policy" "finops_lambda" {
  name = "finops-lambda-policy"
  role = aws_iam_role.finops_lambda.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "BedrockRead"
        Effect = "Allow"
        Action = [
          "bedrock:ListKnowledgeBases",
          "bedrock:GetKnowledgeBase",
          "bedrock:ListTagsForResource"
        ]
        Resource = "*"
      },
      {
        Sid    = "OpenSearchServerlessRead"
        Effect = "Allow"
        Action = [
          "aoss:ListCollections",
          "aoss:ListTagsForResource"
        ]
        Resource = "*"
      },
      {
        Sid    = "SageMakerRead"
        Effect = "Allow"
        Action = [
          "sagemaker:ListEndpoints",
          "sagemaker:ListTags"
        ]
        Resource = "*"
      },
      {
        Sid    = "CostExplorer"
        Effect = "Allow"
        Action = ["ce:GetCostAndUsage"]
        Resource = "*"
      },
      {
        Sid    = "SNSPublish"
        Effect = "Allow"
        Action = ["sns:Publish"]
        Resource = aws_sns_topic.cost_alerts.arn
      },
      {
        Sid    = "CloudWatchLogs"
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      }
    ]
  })
}

What Each Function Catches

FunctionDetectsAlert latencyCost to run
KB Collection Scanner Orphaned OpenSearch Serverless collections billing $11.52/day under wrong service name Within 24 hours of orphaning ~$0.01/month (milliseconds of Bedrock + OSS API calls daily)
New-Region Sensor Credential compromise, cryptomining, shadow IT โ€” first billing signal before logs Within 6โ€“30 hours of first charge ~$0.01/month (Cost Explorer API calls every 6 hours)
Tagging Enforcer AI resources without owner/project/environment tags โ€” blocks alert routing and security attribution Within 24 hours of provisioning ~$0.01/month (service list APIs daily)

Total cost for all three functions running at the schedules above: under $0.05/month. The SNS email subscriptions are free for the first 1,000 emails/month. The Lambda free tier (1M requests/month) is not approached by three daily-to-6-hourly executions. These automations cost nothing measurable to operate.


Find out which of these patterns are already in your billing data

The DropInFinOps free assessment identifies which cost and security patterns your current setup can detect โ€” and which require additional automation like the functions above.

Take the free assessment โ†’