Architecture

Centralized Logging Pattern: Shipping CloudWatch Logs to OpenSearch

Updated By Zak Kann

Key takeaways

  • CloudWatch Logs costs $0.50/GB ingestion + $0.03/GB storage vs OpenSearch at $0.10/GB storage, making OpenSearch 5-10× cheaper at scale (over 500GB/month)
  • Kinesis Data Firehose provides reliable, managed streaming from CloudWatch to OpenSearch with automatic batching, buffering, and retry logic
  • OpenSearch enables complex analytics (aggregations, percentile queries, full-text search) impossible or expensive in CloudWatch Logs Insights
  • Proper log transformation, index lifecycle management, and shard optimization are critical for OpenSearch performance and cost control
  • Hybrid approach (recent 7 days in CloudWatch, long-term in OpenSearch) balances convenience, cost, and compliance requirements

The CloudWatch Logs Scaling Problem

Your startup just shipped the MVP. You're logging everything to CloudWatch Logs because it's the default, it's simple, and it works.

Month 1:

  • 50GB logs
  • CloudWatch cost: $25 ingestion + $1.50 storage = $26.50
  • No complaints

Month 6:

  • 800GB logs
  • CloudWatch cost: $400 ingestion + $24 storage = $424
  • CFO asks: "Why is our logging budget higher than our database?"

Month 12:

  • 2TB logs
  • CloudWatch cost: $1,000 ingestion + $60 storage = $1,060/month
  • Queries timing out
  • 90-day retention limit hitting compliance requirements
  • It's time to centralize

Cost Analysis: CloudWatch vs. OpenSearch

CloudWatch Logs Pricing (us-east-1)

Ingestion: $0.50/GB
Storage: $0.03/GB/month
Insights queries: $0.005 per GB scanned

Monthly example (1TB logs):
- Ingestion: 1,000GB × $0.50 = $500
- Storage (90 days): 1,000GB × 3 × $0.03 = $90
- Queries (100GB scanned/day): 100GB × 30 × $0.005 = $150
Total: $740/month

OpenSearch Pricing (Centralized)

OpenSearch domain (3 × r6g.large.search):
- Instance cost: 3 × $0.141/hour × 730 hours = $308.43
- EBS storage (1TB): 1,000GB × $0.135/GB = $135
- Data transfer (negligible for same region): ~$5

Kinesis Data Firehose:
- Ingestion: 1,000GB × $0.029 = $29
- Data format conversion: $0

Total: $477.43/month
Savings: $262.57/month (35%)

At 2TB/month, savings jump to $780/month (50%)

The Real Wins Beyond Cost

  1. Unlimited retention (compliance requirements)
  2. Complex analytics (aggregations, machine learning)
  3. Full-text search across all fields
  4. Kibana dashboards for visualization
  5. Centralized multi-account/region logs
  6. Anomaly detection with built-in ML

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                       Application Layer                         │
│  ECS Tasks, Lambda Functions, EC2 Instances, RDS, etc.         │
└────────────┬────────────────────────────────────┬───────────────┘
             │                                    │
             ▼                                    ▼
      ┌─────────────┐                    ┌─────────────┐
      │ CloudWatch  │                    │ CloudWatch  │
      │ Log Group 1 │                    │ Log Group N │
      └──────┬──────┘                    └──────┬──────┘
             │                                  │
             └──────────────┬───────────────────┘
                            │ Subscription Filter
                            ▼
                   ┌─────────────────┐
                   │ Kinesis Data    │
                   │ Firehose        │
                   │                 │
                   │ - Buffering     │
                   │ - Batching      │
                   │ - Retry logic   │
                   └────────┬────────┘
                            │
                            ▼ (Optional)
                   ┌─────────────────┐
                   │ Lambda          │
                   │ Transformer     │
                   │                 │
                   │ - Parse JSON    │
                   │ - Enrich fields │
                   │ - Filter noise  │
                   └────────┬────────┘
                            │
                            ▼
                   ┌─────────────────┐
                   │ OpenSearch      │
                   │ Domain          │
                   │                 │
                   │ - Index data    │
                   │ - Kibana UI     │
                   │ - Alerting      │
                   └─────────────────┘

Implementation: Step-by-Step

Step 1: OpenSearch Domain Setup

Terraform configuration:

resource "aws_opensearch_domain" "logs" {
  domain_name    = "centralized-logs"
  engine_version = "OpenSearch_2.11"
 
  cluster_config {
    instance_type            = "r6g.large.search"
    instance_count           = 3
    dedicated_master_enabled = true
    dedicated_master_type    = "r6g.large.search"
    dedicated_master_count   = 3
    zone_awareness_enabled   = true
 
    zone_awareness_config {
      availability_zone_count = 3
    }
  }
 
  ebs_options {
    ebs_enabled = true
    volume_type = "gp3"
    volume_size = 500  # Per node
    iops        = 3000
    throughput  = 125
  }
 
  # Encryption at rest
  encrypt_at_rest {
    enabled    = true
    kms_key_id = aws_kms_key.opensearch.arn
  }
 
  # Encryption in transit
  node_to_node_encryption {
    enabled = true
  }
 
  domain_endpoint_options {
    enforce_https       = true
    tls_security_policy = "Policy-Min-TLS-1-2-2019-07"
  }
 
  # Advanced security options
  advanced_security_options {
    enabled                        = true
    internal_user_database_enabled = false
    master_user_options {
      master_user_arn = aws_iam_role.opensearch_master.arn
    }
  }
 
  # Fine-grained access control
  access_policies = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          AWS = aws_iam_role.firehose.arn
        }
        Action = [
          "es:ESHttpPost",
          "es:ESHttpPut"
        ]
        Resource = "arn:aws:es:${var.aws_region}:${data.aws_caller_identity.current.account_id}:domain/centralized-logs/*"
      }
    ]
  })
 
  # Automated snapshots
  snapshot_options {
    automated_snapshot_start_hour = 3
  }
 
  # Advanced options
  advanced_options = {
    "rest.action.multi.allow_explicit_index" = "true"
    "override_main_response_version"         = "false"
  }
 
  tags = {
    Name        = "centralized-logs"
    Environment = "production"
  }
}
 
# Index template for log structure
resource "aws_opensearch_domain_policy" "logs_index_template" {
  domain_name = aws_opensearch_domain.logs.domain_name
 
  access_policies = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          AWS = "*"
        }
        Action   = "es:*"
        Resource = "${aws_opensearch_domain.logs.arn}/*"
      }
    ]
  })
}

Step 2: Kinesis Data Firehose Setup

resource "aws_kinesis_firehose_delivery_stream" "logs_to_opensearch" {
  name        = "cloudwatch-to-opensearch"
  destination = "elasticsearch"
 
  elasticsearch_configuration {
    domain_arn = aws_opensearch_domain.logs.arn
    role_arn   = aws_iam_role.firehose.arn
 
    # Index rotation
    index_name              = "logs"
    index_rotation_period   = "OneDay"  # Creates logs-2025-01-15, logs-2025-01-16, etc.
    type_name               = "_doc"
 
    # Buffering
    buffering_interval = 60   # Flush every 60 seconds
    buffering_size     = 5    # Or when buffer reaches 5MB
 
    # Retry configuration
    retry_duration = 300  # Retry for 5 minutes
 
    # S3 backup for failed records
    s3_backup_mode = "FailedDocumentsOnly"
 
    s3_configuration {
      role_arn           = aws_iam_role.firehose.arn
      bucket_arn         = aws_s3_bucket.failed_logs.arn
      prefix             = "failed/"
      error_output_prefix = "error/"
      compression_format = "GZIP"
 
      buffering_interval = 300
      buffering_size     = 5
    }
 
    # CloudWatch logging
    cloudwatch_logging_options {
      enabled         = true
      log_group_name  = aws_cloudwatch_log_group.firehose.name
      log_stream_name = "opensearch-delivery"
    }
 
    # Lambda transformation (optional)
    processing_configuration {
      enabled = true
 
      processors {
        type = "Lambda"
 
        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = "${aws_lambda_function.log_transformer.arn}:$LATEST"
        }
 
        parameters {
          parameter_name  = "BufferSizeInMBs"
          parameter_value = "3"
        }
 
        parameters {
          parameter_name  = "BufferIntervalInSeconds"
          parameter_value = "60"
        }
      }
    }
  }
}
 
# IAM role for Firehose
resource "aws_iam_role" "firehose" {
  name = "firehose-opensearch-role"
 
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "firehose.amazonaws.com"
        }
        Action = "sts:AssumeRole"
      }
    ]
  })
}
 
resource "aws_iam_role_policy" "firehose_opensearch" {
  role = aws_iam_role.firehose.id
 
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "es:DescribeElasticsearchDomain",
          "es:DescribeElasticsearchDomains",
          "es:DescribeElasticsearchDomainConfig",
          "es:ESHttpPost",
          "es:ESHttpPut"
        ]
        Resource = [
          aws_opensearch_domain.logs.arn,
          "${aws_opensearch_domain.logs.arn}/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "s3:AbortMultipartUpload",
          "s3:GetBucketLocation",
          "s3:GetObject",
          "s3:ListBucket",
          "s3:ListBucketMultipartUploads",
          "s3:PutObject"
        ]
        Resource = [
          aws_s3_bucket.failed_logs.arn,
          "${aws_s3_bucket.failed_logs.arn}/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "lambda:InvokeFunction",
          "lambda:GetFunctionConfiguration"
        ]
        Resource = "${aws_lambda_function.log_transformer.arn}:*"
      },
      {
        Effect = "Allow"
        Action = [
          "logs:PutLogEvents"
        ]
        Resource = "${aws_cloudwatch_log_group.firehose.arn}:*"
      }
    ]
  })
}

Step 3: CloudWatch Subscription Filters

# Subscription filter for each log group
resource "aws_cloudwatch_log_subscription_filter" "ecs_logs" {
  name            = "ecs-to-firehose"
  log_group_name  = "/aws/ecs/cluster/production"
  filter_pattern  = ""  # Empty = send all logs
  destination_arn = aws_kinesis_firehose_delivery_stream.logs_to_opensearch.arn
  role_arn        = aws_iam_role.cloudwatch_to_firehose.arn
}
 
resource "aws_cloudwatch_log_subscription_filter" "lambda_logs" {
  name            = "lambda-to-firehose"
  log_group_name  = "/aws/lambda/api-handler"
  filter_pattern  = ""
  destination_arn = aws_kinesis_firehose_delivery_stream.logs_to_opensearch.arn
  role_arn        = aws_iam_role.cloudwatch_to_firehose.arn
}
 
# IAM role for CloudWatch to write to Firehose
resource "aws_iam_role" "cloudwatch_to_firehose" {
  name = "cloudwatch-to-firehose-role"
 
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "logs.amazonaws.com"
        }
        Action = "sts:AssumeRole"
      }
    ]
  })
}
 
resource "aws_iam_role_policy" "cloudwatch_to_firehose" {
  role = aws_iam_role.cloudwatch_to_firehose.id
 
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "firehose:PutRecord",
          "firehose:PutRecordBatch"
        ]
        Resource = aws_kinesis_firehose_delivery_stream.logs_to_opensearch.arn
      }
    ]
  })
}

Step 4: Lambda Log Transformer

Why transform? CloudWatch logs are base64-encoded and gzip-compressed. You need to:

  • Decompress and decode
  • Parse JSON logs
  • Enrich with metadata
  • Filter noise
  • Structure for OpenSearch
import { FirehoseTransformationHandler, FirehoseTransformationEvent } from 'aws-lambda';
import { gunzipSync } from 'zlib';
 
interface CloudWatchLog {
  messageType: string;
  owner: string;
  logGroup: string;
  logStream: string;
  subscriptionFilters: string[];
  logEvents: Array<{
    id: string;
    timestamp: number;
    message: string;
  }>;
}
 
interface TransformedLog {
  '@timestamp': string;
  message: string;
  logGroup: string;
  logStream: string;
  level?: string;
  requestId?: string;
  duration?: number;
  memory?: number;
  service?: string;
  environment?: string;
  [key: string]: any;
}
 
export const handler: FirehoseTransformationHandler = async (
  event: FirehoseTransformationEvent
) => {
  const output = event.records.map((record) => {
    try {
      // 1. Decode base64
      const payload = Buffer.from(record.data, 'base64');
 
      // 2. Decompress gzip
      const decompressed = gunzipSync(payload);
      const cloudwatchLog: CloudWatchLog = JSON.parse(decompressed.toString('utf8'));
 
      // 3. Transform each log event
      const transformedLogs = cloudwatchLog.logEvents.map((logEvent) => {
        const transformed: TransformedLog = {
          '@timestamp': new Date(logEvent.timestamp).toISOString(),
          message: logEvent.message,
          logGroup: cloudwatchLog.logGroup,
          logStream: cloudwatchLog.logStream
        };
 
        // 4. Parse JSON logs
        try {
          const parsedMessage = JSON.parse(logEvent.message);
          Object.assign(transformed, parsedMessage);
        } catch {
          // Not JSON, keep as plain text
        }
 
        // 5. Extract Lambda metadata
        if (cloudwatchLog.logGroup.startsWith('/aws/lambda/')) {
          const lambdaMatch = logEvent.message.match(
            /REPORT RequestId: ([\w-]+)\s+Duration: ([\d.]+) ms.*Memory Size: (\d+) MB/
          );
          if (lambdaMatch) {
            transformed.requestId = lambdaMatch[1];
            transformed.duration = parseFloat(lambdaMatch[2]);
            transformed.memory = parseInt(lambdaMatch[3]);
          }
        }
 
        // 6. Enrich with service name and environment
        const serviceMatch = cloudwatchLog.logGroup.match(/\/([\w-]+)$/);
        if (serviceMatch) {
          transformed.service = serviceMatch[1];
        }
 
        transformed.environment = process.env.ENVIRONMENT || 'production';
 
        // 7. Extract log level
        const levelMatch = logEvent.message.match(/\b(DEBUG|INFO|WARN|ERROR|FATAL)\b/i);
        if (levelMatch) {
          transformed.level = levelMatch[1].toUpperCase();
        }
 
        // 8. Filter noise (optional)
        if (shouldFilterLog(transformed)) {
          return null;
        }
 
        return transformed;
      }).filter(Boolean);
 
      // 9. Convert back to newline-delimited JSON for OpenSearch
      const data = transformedLogs
        .map(log => JSON.stringify(log))
        .join('\n') + '\n';
 
      return {
        recordId: record.recordId,
        result: 'Ok' as const,
        data: Buffer.from(data).toString('base64')
      };
    } catch (error) {
      console.error('Transformation error:', error);
      return {
        recordId: record.recordId,
        result: 'ProcessingFailed' as const,
        data: record.data
      };
    }
  });
 
  return { records: output };
};
 
function shouldFilterLog(log: TransformedLog): boolean {
  // Filter health check logs
  if (log.message?.includes('GET /health')) {
    return true;
  }
 
  // Filter debug logs in production
  if (log.environment === 'production' && log.level === 'DEBUG') {
    return true;
  }
 
  // Filter AWS internal logs
  if (log.message?.startsWith('START RequestId') ||
      log.message?.startsWith('END RequestId')) {
    return true;
  }
 
  return false;
}

Step 5: OpenSearch Index Template

Create index template via Kibana Dev Tools or API:

PUT _index_template/logs-template
{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "refresh_interval": "5s",
      "index.codec": "best_compression",
      "index.lifecycle.name": "logs-lifecycle-policy"
    },
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "message": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "level": {
          "type": "keyword"
        },
        "service": {
          "type": "keyword"
        },
        "environment": {
          "type": "keyword"
        },
        "logGroup": {
          "type": "keyword"
        },
        "logStream": {
          "type": "keyword"
        },
        "requestId": {
          "type": "keyword"
        },
        "duration": {
          "type": "float"
        },
        "memory": {
          "type": "integer"
        },
        "statusCode": {
          "type": "integer"
        },
        "userId": {
          "type": "keyword"
        },
        "traceId": {
          "type": "keyword"
        }
      }
    }
  },
  "priority": 100
}

Step 6: Index Lifecycle Management (ILM)

Hot-Warm-Cold-Delete lifecycle:

PUT _plugins/_ism/policies/logs-lifecycle-policy
{
  "policy": {
    "description": "Logs lifecycle: hot (7d) → warm (30d) → cold (90d) → delete",
    "default_state": "hot",
    "states": [
      {
        "name": "hot",
        "actions": [
          {
            "rollover": {
              "min_index_age": "1d",
              "min_primary_shard_size": "50gb"
            }
          }
        ],
        "transitions": [
          {
            "state_name": "warm",
            "conditions": {
              "min_index_age": "7d"
            }
          }
        ]
      },
      {
        "name": "warm",
        "actions": [
          {
            "replica_count": {
              "number_of_replicas": 0
            }
          },
          {
            "force_merge": {
              "max_num_segments": 1
            }
          }
        ],
        "transitions": [
          {
            "state_name": "cold",
            "conditions": {
              "min_index_age": "30d"
            }
          }
        ]
      },
      {
        "name": "cold",
        "actions": [
          {
            "cold_migration": {}
          }
        ],
        "transitions": [
          {
            "state_name": "delete",
            "conditions": {
              "min_index_age": "90d"
            }
          }
        ]
      },
      {
        "name": "delete",
        "actions": [
          {
            "delete": {}
          }
        ]
      }
    ]
  }
}

Lifecycle phases:

  • Hot (0-7 days): Active indexing, searchable, 1 replica
  • Warm (7-30 days): Read-only, force-merged, 0 replicas (cost savings)
  • Cold (30-90 days): Infrequent access, UltraWarm storage (if enabled)
  • Delete (90+ days): Removed (or archive to S3 Glacier)

Query Patterns and Performance

Example Queries in Kibana

1. Find all errors in the last hour:

GET logs-*/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-1h"
            }
          }
        },
        {
          "term": {
            "level": "ERROR"
          }
        }
      ]
    }
  },
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}

2. Calculate P95 API latency by endpoint:

GET logs-*/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-24h"
            }
          }
        },
        {
          "exists": {
            "field": "duration"
          }
        }
      ]
    }
  },
  "aggs": {
    "by_endpoint": {
      "terms": {
        "field": "endpoint.keyword",
        "size": 20
      },
      "aggs": {
        "latency_percentiles": {
          "percentiles": {
            "field": "duration",
            "percents": [50, 95, 99]
          }
        }
      }
    }
  }
}

3. Detect anomalous error rates:

GET logs-*/_search
{
  "size": 0,
  "query": {
    "range": {
      "@timestamp": {
        "gte": "now-7d"
      }
    }
  },
  "aggs": {
    "errors_over_time": {
      "date_histogram": {
        "field": "@timestamp",
        "fixed_interval": "1h"
      },
      "aggs": {
        "error_count": {
          "filter": {
            "term": {
              "level": "ERROR"
            }
          }
        }
      }
    }
  }
}

4. Full-text search across all logs:

GET logs-*/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "message": "database connection timeout"
          }
        },
        {
          "range": {
            "@timestamp": {
              "gte": "now-1d"
            }
          }
        }
      ]
    }
  },
  "highlight": {
    "fields": {
      "message": {}
    }
  }
}

Performance Optimization Tips

1. Use date-based index patterns:

logs-2025-01-15  (good)
logs-all         (bad - single huge index)

2. Limit shard size to 20-50GB:

{
  "rollover": {
    "min_primary_shard_size": "30gb"
  }
}

3. Use filters instead of queries when possible:

// Good: Filter (cached)
{
  "bool": {
    "filter": [
      { "term": { "service": "api" } }
    ]
  }
}
 
// Bad: Query (scored, slower)
{
  "match": {
    "service": "api"
  }
}

4. Limit result size:

{
  "size": 100,  // Don't fetch 10,000 results
  "from": 0
}

Monitoring the Pipeline

CloudWatch dashboard for pipeline health:

resource "aws_cloudwatch_dashboard" "logging_pipeline" {
  dashboard_name = "centralized-logging-pipeline"
 
  dashboard_body = jsonencode({
    widgets = [
      {
        type = "metric"
        properties = {
          metrics = [
            ["AWS/Firehose", "DeliveryToElasticsearch.Success", { stat = "Sum" }],
            [".", "DeliveryToElasticsearch.DataFreshness", { stat = "Average" }],
            [".", "DeliveryToElasticsearch.Records", { stat = "Sum" }]
          ]
          period = 300
          stat   = "Average"
          region = var.aws_region
          title  = "Firehose Delivery Metrics"
        }
      },
      {
        type = "metric"
        properties = {
          metrics = [
            ["AWS/ES", "ClusterStatus.green", { stat = "Average" }],
            [".", "ClusterStatus.yellow", { stat = "Average" }],
            [".", "ClusterStatus.red", { stat = "Average" }],
            [".", "CPUUtilization", { stat = "Average" }],
            [".", "JVMMemoryPressure", { stat = "Average" }]
          ]
          period = 300
          stat   = "Average"
          region = var.aws_region
          title  = "OpenSearch Cluster Health"
        }
      },
      {
        type = "metric"
        properties = {
          metrics = [
            ["AWS/ES", "SearchableDocuments", { stat = "Average" }],
            [".", "FreeStorageSpace", { stat = "Average" }],
            [".", "IndexingRate", { stat = "Average" }]
          ]
          period = 300
          stat   = "Average"
          region = var.aws_region
          title  = "OpenSearch Indexing"
        }
      }
    ]
  })
}
 
# Alerts
resource "aws_cloudwatch_metric_alarm" "firehose_delivery_failures" {
  alarm_name          = "firehose-delivery-failures"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "DeliveryToElasticsearch.Success"
  namespace           = "AWS/Firehose"
  period              = 300
  statistic           = "Sum"
  threshold           = 0.95  # Alert if success rate below 95%
  alarm_description   = "Firehose failing to deliver to OpenSearch"
  alarm_actions       = [aws_sns_topic.alerts.arn]
 
  dimensions = {
    DeliveryStreamName = aws_kinesis_firehose_delivery_stream.logs_to_opensearch.name
  }
}
 
resource "aws_cloudwatch_metric_alarm" "opensearch_cluster_red" {
  alarm_name          = "opensearch-cluster-red"
  comparison_operator = "LessThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ClusterStatus.green"
  namespace           = "AWS/ES"
  period              = 60
  statistic           = "Minimum"
  threshold           = 1
  alarm_description   = "OpenSearch cluster is not green"
  alarm_actions       = [aws_sns_topic.alerts.arn]
 
  dimensions = {
    DomainName = aws_opensearch_domain.logs.domain_name
  }
}

Multi-Account / Multi-Region Pattern

Central logging account architecture:

# In each application account
resource "aws_cloudwatch_log_subscription_filter" "cross_account" {
  name            = "central-logging"
  log_group_name  = "/aws/ecs/app"
  filter_pattern  = ""
  destination_arn = "arn:aws:firehose:us-east-1:${var.central_logging_account_id}:deliverystream/central-logs"
  role_arn        = aws_iam_role.cloudwatch_cross_account.arn
}
 
resource "aws_iam_role" "cloudwatch_cross_account" {
  name = "cloudwatch-cross-account-role"
 
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "logs.amazonaws.com"
        }
        Action = "sts:AssumeRole"
      }
    ]
  })
}
 
resource "aws_iam_role_policy" "cloudwatch_cross_account" {
  role = aws_iam_role.cloudwatch_cross_account.id
 
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "firehose:PutRecord",
          "firehose:PutRecordBatch"
        ]
        Resource = "arn:aws:firehose:us-east-1:${var.central_logging_account_id}:deliverystream/central-logs"
      }
    ]
  })
}
 
# In central logging account - allow cross-account writes
resource "aws_kinesis_firehose_delivery_stream" "central_logs" {
  name        = "central-logs"
  destination = "elasticsearch"
 
  # ... same configuration as before
}
 
# Resource policy allowing cross-account access
resource "aws_kinesis_firehose_resource_policy" "central_logs" {
  delivery_stream_name = aws_kinesis_firehose_delivery_stream.central_logs.name
 
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          AWS = [
            "arn:aws:iam::${var.app_account_1_id}:root",
            "arn:aws:iam::${var.app_account_2_id}:root"
          ]
        }
        Action = [
          "firehose:PutRecord",
          "firehose:PutRecordBatch"
        ]
        Resource = aws_kinesis_firehose_delivery_stream.central_logs.arn
      }
    ]
  })
}

Hybrid Strategy: Best of Both Worlds

Pattern: Keep recent logs in CloudWatch, long-term in OpenSearch

# CloudWatch: 7-day retention
resource "aws_cloudwatch_log_group" "app" {
  name              = "/aws/ecs/app"
  retention_in_days = 7  # Short retention in CloudWatch
 
  tags = {
    Application = "api-server"
  }
}
 
# Subscription to OpenSearch for long-term storage
resource "aws_cloudwatch_log_subscription_filter" "to_opensearch" {
  name            = "to-opensearch"
  log_group_name  = aws_cloudwatch_log_group.app.name
  filter_pattern  = ""
  destination_arn = aws_kinesis_firehose_delivery_stream.logs_to_opensearch.arn
  role_arn        = aws_iam_role.cloudwatch_to_firehose.arn
}

Why this works:

  • CloudWatch (7 days): Easy access via AWS Console, fast queries for recent issues
  • OpenSearch (90+ days): Cost-effective long-term storage, complex analytics, compliance

Cost comparison (1TB/month logs):

CloudWatch only (90 days):
- Ingestion: $500
- Storage: 1TB × 3 months × $0.03 = $90
- Total: $590/month

Hybrid (7 days CloudWatch + 90 days OpenSearch):
- CloudWatch ingestion: $500
- CloudWatch storage: 1TB × 0.23 months × $0.03 = $7
- Firehose: $29
- OpenSearch: $443
- Total: $979/month

Wait, that's MORE expensive?

The trick: Reduce CloudWatch Insights queries (most expensive)

CloudWatch only:
- Queries: 100GB/day × 30 × $0.005 = $150/month

Hybrid (query OpenSearch instead):
- Queries: 10GB/day × 30 × $0.005 = $15/month
- Savings: $135/month

Revised total: $844/month (still higher, but you get better analytics)

Real ROI comes at scale (5TB/month):

  • CloudWatch only: $2,950/month
  • Hybrid: $1,650/month
  • Savings: $1,300/month (44%)

Common Pitfalls

Pitfall 1: Too Many Shards

Problem: Creating an index per hour = 720 indices/month = 2,160 shards

Impact:

  • Cluster overhead (each shard = memory + CPU)
  • Slow searches across many shards

Solution: Daily indices (30 indices/month) with 3 shards each = 90 shards total

Pitfall 2: Not Using ILM

Problem: Indices grow forever, filling disk

Solution: Implement lifecycle policy (shown above)

Pitfall 3: Ingesting Unstructured Logs

Problem: All logs as plain text = no filtering, aggregations, or analytics

Solution: Use Lambda transformer to parse JSON and extract fields

Pitfall 4: Insufficient Instance Sizing

Problem: 1TB/day logs + 1 × r6g.large.search = cluster constantly red

Solution: Size based on ingestion rate:

  • Under 100GB/day: 3 × r6g.large
  • 100-500GB/day: 3 × r6g.xlarge
  • 500GB-1TB/day: 3 × r6g.2xlarge
  • Over 1TB/day: Consider UltraWarm or S3-based solutions

Conclusion: When to Centralize

Centralize to OpenSearch if:

  • Log volume exceeds 500GB/month (cost savings)
  • You need retention over 90 days (compliance)
  • Complex analytics required (aggregations, ML, anomaly detection)
  • Multi-account/region centralization needed
  • Full-text search across all logs

Stay with CloudWatch if:

  • Log volume under 100GB/month (not worth the complexity)
  • Simple queries sufficient (no aggregations)
  • 7-day retention acceptable
  • Tight AWS ecosystem integration preferred

Hybrid approach if:

  • You want convenience of CloudWatch + power of OpenSearch
  • Recent logs queried frequently, older logs rarely
  • Budget allows ($800-$1,000/month for 1TB logs)

Action Items

  1. Audit current log volume across all CloudWatch log groups
  2. Calculate CloudWatch costs (ingestion + storage + queries)
  3. Estimate OpenSearch costs using AWS pricing calculator
  4. Start with non-production logs to test the pipeline
  5. Implement index lifecycle management before going to production
  6. Set up monitoring and alerts for pipeline health
  7. Train team on Kibana for log analysis and dashboards

If you need help designing a centralized logging architecture for your organization, schedule a consultation. We'll analyze your log volume, design the optimal pipeline, and provide Terraform code for a production-ready implementation.

Need Help with Your Cloud Infrastructure?

Our experts are here to guide you through your cloud journey

Schedule a Free Consultation