Skip to main content

Logging Systems

Build comprehensive centralized logging systems to aggregate, analyze, and visualize logs from distributed applications and infrastructure.

Logging Architecture Overview

A well-designed logging architecture provides centralized log collection, storage, search, and analysis capabilities.

Modern Logging Stack

ELK Stack Implementation

The ELK Stack (Elasticsearch, Logstash, Kibana) is a powerful open-source logging solution.

ELK Stack Setup

1. Docker Compose Configuration

# docker-compose-elk.yml
version: '3.8'

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
- xpack.security.enabled=true
- xpack.security.authc.api_key.enabled=true
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
- "9300:9300"
networks:
- elk
healthcheck:
test: ["CMD-SHELL", "curl -s http://localhost:9200/_cluster/health | grep -q '\"status\":\"green\"\\|\"status\":\"yellow\"'"]
interval: 30s
timeout: 10s
retries: 5

logstash:
image: docker.elastic.co/logstash/logstash:8.10.0
container_name: logstash
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro
ports:
- "5044:5044"
- "5000:5000/tcp"
- "5000:5000/udp"
- "9600:9600"
environment:
- "LS_JAVA_OPTS=-Xms1g -Xmx1g"
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
networks:
- elk
depends_on:
elasticsearch:
condition: service_healthy

kibana:
image: docker.elastic.co/kibana/kibana:8.10.0
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=elastic
- ELASTICSEARCH_PASSWORD=${ELASTIC_PASSWORD}
networks:
- elk
depends_on:
elasticsearch:
condition: service_healthy

filebeat:
image: docker.elastic.co/beats/filebeat:8.10.0
container_name: filebeat
user: root
volumes:
- ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
- filebeat_data:/usr/share/filebeat/data
command: filebeat -e -strict.perms=false
networks:
- elk
depends_on:
- logstash

volumes:
elasticsearch_data:
filebeat_data:

networks:
elk:
driver: bridge

2. Logstash Pipeline Configuration

# logstash/pipeline/logstash.conf
input {
beats {
port => 5044
}

tcp {
port => 5000
codec => json
}

http {
port => 8080
codec => json
}
}

filter {
# Parse JSON logs
if [message] =~ /^\{.*\}$/ {
json {
source => "message"
remove_field => ["message"]
}
}

# Parse application logs
if [fields][type] == "application" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:log_level}\] \[%{DATA:thread}\] %{DATA:logger} - %{GREEDYDATA:log_message}"
}
}

date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
}

# Parse Nginx access logs
if [fields][type] == "nginx-access" {
grok {
match => {
"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:request_method} %{DATA:request_path} HTTP/%{NUMBER:http_version}" %{INT:status} %{INT:body_bytes_sent} "%{DATA:http_referer}" "%{DATA:http_user_agent}"'
}
}

mutate {
convert => {
"status" => "integer"
"body_bytes_sent" => "integer"
}
}
}

# Parse Docker container logs
if [container][name] {
mutate {
add_field => {
"container_name" => "%{[container][name]}"
"container_id" => "%{[container][id]}"
}
}
}

# Extract error information
if [log_level] in ["ERROR", "FATAL"] {
mutate {
add_tag => ["error"]
}
}

# GeoIP enrichment for access logs
if [remote_addr] {
geoip {
source => "remote_addr"
target => "geoip"
}
}

# Remove unnecessary fields
mutate {
remove_field => ["agent", "ecs", "input"]
}
}

output {
# Output to Elasticsearch
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
index => "logs-%{[fields][type]}-%{+YYYY.MM.dd}"

# Custom template
template_name => "logs"
template_overwrite => true
}

# Output errors to separate index
if "error" in [tags] {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
index => "errors-%{+YYYY.MM.dd}"
}
}

# Debug output
# stdout { codec => rubydebug }
}

3. Filebeat Configuration

# filebeat/filebeat.yml
filebeat.inputs:
# Application logs
- type: log
enabled: true
paths:
- /var/log/app/*.log
fields:
type: application
environment: ${ENVIRONMENT:dev}
fields_under_root: true
multiline:
pattern: '^\['
negate: true
match: after

# Container logs
- type: container
enabled: true
paths:
- '/var/lib/docker/containers/*/*.log'
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true

# Nginx access logs
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
type: nginx-access
fields_under_root: true

# Nginx error logs
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
fields:
type: nginx-error
fields_under_root: true

processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

output.logstash:
hosts: ["logstash:5044"]

logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644

Structured Logging

1. Node.js Structured Logging

// logger.js
const winston = require('winston');
const { ElasticsearchTransport } = require('winston-elasticsearch');

class Logger {
constructor() {
this.logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: {
service: process.env.SERVICE_NAME || 'myapp',
environment: process.env.NODE_ENV || 'development',
version: process.env.APP_VERSION || '1.0.0',
hostname: require('os').hostname()
},
transports: [
// Console output
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
)
}),

// File output
new winston.transports.File({
filename: 'logs/error.log',
level: 'error',
maxsize: 5242880, // 5MB
maxFiles: 5
}),
new winston.transports.File({
filename: 'logs/combined.log',
maxsize: 5242880,
maxFiles: 5
}),

// Elasticsearch transport
new ElasticsearchTransport({
level: 'info',
clientOpts: {
node: process.env.ELASTICSEARCH_URL || 'http://localhost:9200',
auth: {
username: process.env.ELASTICSEARCH_USER || 'elastic',
password: process.env.ELASTICSEARCH_PASSWORD
}
},
index: 'logs-application'
})
]
});
}

info(message, meta = {}) {
this.logger.info(message, { ...meta, correlationId: this.getCorrelationId() });
}

error(message, error, meta = {}) {
this.logger.error(message, {
...meta,
error: {
message: error.message,
stack: error.stack,
code: error.code
},
correlationId: this.getCorrelationId()
});
}

warn(message, meta = {}) {
this.logger.warn(message, { ...meta, correlationId: this.getCorrelationId() });
}

debug(message, meta = {}) {
this.logger.debug(message, { ...meta, correlationId: this.getCorrelationId() });
}

// HTTP request logging
logRequest(req, res, responseTime) {
this.logger.info('HTTP Request', {
type: 'http_request',
method: req.method,
url: req.url,
statusCode: res.statusCode,
responseTime: responseTime,
userAgent: req.headers['user-agent'],
ip: req.ip,
userId: req.user?.id,
correlationId: req.headers['x-correlation-id']
});
}

// Database query logging
logQuery(query, duration, results) {
this.logger.debug('Database Query', {
type: 'database_query',
query: query,
duration: duration,
resultCount: results.length,
correlationId: this.getCorrelationId()
});
}

// Business event logging
logEvent(eventName, eventData) {
this.logger.info('Business Event', {
type: 'business_event',
eventName: eventName,
eventData: eventData,
correlationId: this.getCorrelationId()
});
}

getCorrelationId() {
// Get from async context or generate new one
return require('async_hooks').executionAsyncId().toString();
}
}

module.exports = new Logger();

2. Express Middleware

// logging-middleware.js
const logger = require('./logger');
const { v4: uuidv4 } = require('uuid');

function requestLogger(req, res, next) {
const startTime = Date.now();
const correlationId = req.headers['x-correlation-id'] || uuidv4();

// Add correlation ID to request
req.correlationId = correlationId;
res.setHeader('X-Correlation-ID', correlationId);

// Log request
logger.info('Incoming request', {
method: req.method,
url: req.url,
correlationId: correlationId,
userAgent: req.headers['user-agent'],
ip: req.ip
});

// Capture response
res.on('finish', () => {
const responseTime = Date.now() - startTime;

logger.logRequest(req, res, responseTime);

// Log slow requests
if (responseTime > 1000) {
logger.warn('Slow request detected', {
method: req.method,
url: req.url,
responseTime: responseTime,
statusCode: res.statusCode,
correlationId: correlationId
});
}
});

next();
}

function errorLogger(err, req, res, next) {
logger.error('Request error', err, {
method: req.method,
url: req.url,
statusCode: res.statusCode,
correlationId: req.correlationId
});

next(err);
}

module.exports = { requestLogger, errorLogger };

Log Analysis and Visualization

1. Kibana Dashboard Configuration

{
"dashboard": {
"title": "Application Logs Dashboard",
"panels": [
{
"id": "log-volume",
"type": "line",
"title": "Log Volume Over Time",
"query": {
"query": "*",
"timeField": "@timestamp"
},
"visualization": {
"type": "line",
"yAxisLabel": "Log Count",
"xAxisLabel": "Time"
}
},
{
"id": "error-rate",
"type": "metric",
"title": "Error Rate",
"query": {
"query": "log_level:ERROR OR log_level:FATAL",
"timeField": "@timestamp"
},
"visualization": {
"type": "gauge",
"colorScheme": "stoplight"
}
},
{
"id": "top-errors",
"type": "table",
"title": "Top Error Messages",
"query": {
"query": "log_level:ERROR",
"size": 10,
"aggregations": {
"top_errors": {
"terms": {
"field": "log_message.keyword",
"size": 10
}
}
}
}
},
{
"id": "response-time",
"type": "histogram",
"title": "Response Time Distribution",
"query": {
"query": "type:http_request",
"timeField": "@timestamp"
},
"visualization": {
"field": "responseTime",
"interval": 100
}
},
{
"id": "log-levels",
"type": "pie",
"title": "Log Levels Distribution",
"query": {
"query": "*",
"aggregations": {
"log_levels": {
"terms": {
"field": "log_level.keyword"
}
}
}
}
},
{
"id": "service-errors",
"type": "bar",
"title": "Errors by Service",
"query": {
"query": "log_level:ERROR",
"aggregations": {
"services": {
"terms": {
"field": "service.keyword",
"size": 10
}
}
}
}
}
],
"timeRange": {
"from": "now-24h",
"to": "now"
},
"refreshInterval": "30s"
}
}

2. Custom Kibana Queries

# Find all errors in the last hour
log_level:ERROR AND @timestamp:[now-1h TO now]

# Find slow requests
type:http_request AND responseTime:>1000

# Find errors for specific user
log_level:ERROR AND userId:"12345"

# Find errors by correlation ID
correlationId:"abc-123-def"

# Find database slow queries
type:database_query AND duration:>1000

# Find 5xx errors
type:http_request AND statusCode:[500 TO 599]

# Find failed login attempts
eventName:"user_login_failed"

# Complex query with boolean operators
(log_level:ERROR OR log_level:WARN) AND service:"payment-service" AND @timestamp:[now-1h TO now]

# Regex search
log_message:/timeout|connection.*failed/

# Range query
responseTime:[1000 TO 5000]

Log Retention and Lifecycle Management

1. Elasticsearch Index Lifecycle Policy

{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "1d",
"max_docs": 100000000
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "3d",
"actions": {
"allocate": {
"number_of_replicas": 1
},
"forcemerge": {
"max_num_segments": 1
},
"shrink": {
"number_of_shards": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "7d",
"actions": {
"allocate": {
"number_of_replicas": 0
},
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}

2. Apply Lifecycle Policy

#!/bin/bash
# apply-ilm-policy.sh

# Create index template with ILM policy
curl -X PUT "http://localhost:9200/_index_template/logs-template" \
-H 'Content-Type: application/json' \
-d '{
"index_patterns": ["logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "logs-policy",
"index.lifecycle.rollover_alias": "logs"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"log_level": { "type": "keyword" },
"service": { "type": "keyword" },
"log_message": { "type": "text" },
"correlationId": { "type": "keyword" },
"responseTime": { "type": "long" },
"statusCode": { "type": "integer" }
}
}
}
}'

# Create initial index
curl -X PUT "http://localhost:9200/logs-000001" \
-H 'Content-Type: application/json' \
-d '{
"aliases": {
"logs": {
"is_write_index": true
}
}
}'

Log Alerting

1. ElastAlert Configuration

# elastalert/config.yaml
rules_folder: rules
run_every:
minutes: 1

buffer_time:
minutes: 15

es_host: elasticsearch
es_port: 9200
es_username: elastic
es_password: ${ELASTIC_PASSWORD}

writeback_index: elastalert_status
alert_time_limit:
days: 2

2. Alert Rules

# elastalert/rules/error-spike.yaml
name: Error Spike Alert
type: spike
index: logs-*

spike_height: 2
spike_type: up
timeframe:
minutes: 15

threshold_cur: 10
threshold_ref: 5

filter:
- term:
log_level: "ERROR"

alert:
- slack:
slack_webhook_url: "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
slack_channel: "#alerts"
slack_username_override: "ElastAlert"
slack_emoji_override: ":warning:"
slack_msg_color: "danger"

alert_text: |
Error spike detected!
Service: {service}
Error count increased by {spike}x
Current: {num_hits} errors
Reference: {ref_hits} errors
# elastalert/rules/slow-requests.yaml
name: Slow Request Alert
type: frequency
index: logs-*

num_events: 10
timeframe:
minutes: 5

filter:
- range:
responseTime:
gte: 2000
- term:
type: "http_request"

alert:
- email:
email: "[email protected]"
- slack:
slack_webhook_url: "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

alert_text: |
Slow requests detected!
Count: {num_hits} requests over 2 seconds
Service: {service}
Average response time: {avg_response_time}ms

Log Security and Compliance

1. PII Masking Filter

# logstash/pipeline/security.conf
filter {
# Mask credit card numbers
mutate {
gsub => [
"log_message", "\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "****-****-****-****"
]
}

# Mask email addresses
mutate {
gsub => [
"log_message", "\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "****@****.***"
]
}

# Mask SSN
mutate {
gsub => [
"log_message", "\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b", "***-**-****"
]
}

# Mask passwords in logs
if [log_message] =~ /password/ {
mutate {
gsub => [
"log_message", "password['\"]?\s*[:=]\s*['\"]?[^'\"\s]+['\"]?", "password=***"
]
}
}
}

2. Audit Logging

// audit-logger.js
class AuditLogger {
constructor(logger) {
this.logger = logger;
}

logAccess(userId, resource, action, result) {
this.logger.info('Access audit', {
type: 'audit',
category: 'access',
userId: userId,
resource: resource,
action: action,
result: result,
timestamp: new Date().toISOString(),
ip: this.getClientIP()
});
}

logDataChange(userId, entity, changeType, oldValue, newValue) {
this.logger.info('Data change audit', {
type: 'audit',
category: 'data_change',
userId: userId,
entity: entity,
changeType: changeType,
oldValue: this.sanitize(oldValue),
newValue: this.sanitize(newValue),
timestamp: new Date().toISOString()
});
}

logAuthentication(userId, method, result, metadata = {}) {
this.logger.info('Authentication audit', {
type: 'audit',
category: 'authentication',
userId: userId,
method: method,
result: result,
metadata: metadata,
timestamp: new Date().toISOString(),
ip: this.getClientIP()
});
}

sanitize(value) {
// Remove sensitive data before logging
if (typeof value === 'object') {
const sanitized = { ...value };
delete sanitized.password;
delete sanitized.token;
delete sanitized.secret;
return sanitized;
}
return value;
}

getClientIP() {
// Implementation to get client IP
return '0.0.0.0';
}
}

module.exports = AuditLogger;

Alternative Logging Solutions

Loki Stack

# loki-stack.yaml
version: '3.8'

services:
loki:
image: grafana/loki:2.9.0
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
volumes:
- ./loki-config.yaml:/etc/loki/local-config.yaml
- loki_data:/loki

promtail:
image: grafana/promtail:2.9.0
volumes:
- /var/log:/var/log
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- ./promtail-config.yaml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.yml

grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_PATHS_PROVISIONING=/etc/grafana/provisioning
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning

volumes:
loki_data:
grafana_data:

Key Takeaways

Logging Best Practices

  1. Structured Logging: Use JSON format for easy parsing
  2. Correlation IDs: Track requests across services
  3. Log Levels: Appropriate log levels for different situations
  4. Centralization: Aggregate logs from all sources
  5. Retention: Define clear retention policies

Implementation Strategy

  • Start Simple: Begin with basic logging setup
  • Structured Format: Implement structured logging early
  • Centralize Gradually: Add sources incrementally
  • Monitor Performance: Watch for logging overhead
  • Secure Sensitive Data: Mask PII and sensitive information

Common Patterns

  • Request Tracing: Correlation IDs across services
  • Error Aggregation: Group similar errors
  • Performance Monitoring: Track slow operations
  • Security Auditing: Log authentication and access
  • Business Analytics: Track business events

Next Steps: Ready to automate operations? Continue to Section 6.3: Operations Automation to learn about auto-scaling, self-healing, and ChatOps.


Centralized logging is essential for understanding distributed systems, troubleshooting issues, and maintaining security and compliance in modern applications.