Logging Systems
Build comprehensive centralized logging systems to aggregate, analyze, and visualize logs from distributed applications and infrastructure.
Logging Architecture Overview
A well-designed logging architecture provides centralized log collection, storage, search, and analysis capabilities.
Modern Logging Stack
ELK Stack Implementation
The ELK Stack (Elasticsearch, Logstash, Kibana) is a powerful open-source logging solution.
ELK Stack Setup
1. Docker Compose Configuration
# docker-compose-elk.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
- xpack.security.enabled=true
- xpack.security.authc.api_key.enabled=true
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
- "9300:9300"
networks:
- elk
healthcheck:
test: ["CMD-SHELL", "curl -s http://localhost:9200/_cluster/health | grep -q '\"status\":\"green\"\\|\"status\":\"yellow\"'"]
interval: 30s
timeout: 10s
retries: 5
logstash:
image: docker.elastic.co/logstash/logstash:8.10.0
container_name: logstash
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro
ports:
- "5044:5044"
- "5000:5000/tcp"
- "5000:5000/udp"
- "9600:9600"
environment:
- "LS_JAVA_OPTS=-Xms1g -Xmx1g"
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
networks:
- elk
depends_on:
elasticsearch:
condition: service_healthy
kibana:
image: docker.elastic.co/kibana/kibana:8.10.0
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=elastic
- ELASTICSEARCH_PASSWORD=${ELASTIC_PASSWORD}
networks:
- elk
depends_on:
elasticsearch:
condition: service_healthy
filebeat:
image: docker.elastic.co/beats/filebeat:8.10.0
container_name: filebeat
user: root
volumes:
- ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
- filebeat_data:/usr/share/filebeat/data
command: filebeat -e -strict.perms=false
networks:
- elk
depends_on:
- logstash
volumes:
elasticsearch_data:
filebeat_data:
networks:
elk:
driver: bridge
2. Logstash Pipeline Configuration
# logstash/pipeline/logstash.conf
input {
beats {
port => 5044
}
tcp {
port => 5000
codec => json
}
http {
port => 8080
codec => json
}
}
filter {
# Parse JSON logs
if [message] =~ /^\{.*\}$/ {
json {
source => "message"
remove_field => ["message"]
}
}
# Parse application logs
if [fields][type] == "application" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:log_level}\] \[%{DATA:thread}\] %{DATA:logger} - %{GREEDYDATA:log_message}"
}
}
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
}
# Parse Nginx access logs
if [fields][type] == "nginx-access" {
grok {
match => {
"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:request_method} %{DATA:request_path} HTTP/%{NUMBER:http_version}" %{INT:status} %{INT:body_bytes_sent} "%{DATA:http_referer}" "%{DATA:http_user_agent}"'
}
}
mutate {
convert => {
"status" => "integer"
"body_bytes_sent" => "integer"
}
}
}
# Parse Docker container logs
if [container][name] {
mutate {
add_field => {
"container_name" => "%{[container][name]}"
"container_id" => "%{[container][id]}"
}
}
}
# Extract error information
if [log_level] in ["ERROR", "FATAL"] {
mutate {
add_tag => ["error"]
}
}
# GeoIP enrichment for access logs
if [remote_addr] {
geoip {
source => "remote_addr"
target => "geoip"
}
}
# Remove unnecessary fields
mutate {
remove_field => ["agent", "ecs", "input"]
}
}
output {
# Output to Elasticsearch
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
index => "logs-%{[fields][type]}-%{+YYYY.MM.dd}"
# Custom template
template_name => "logs"
template_overwrite => true
}
# Output errors to separate index
if "error" in [tags] {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
index => "errors-%{+YYYY.MM.dd}"
}
}
# Debug output
# stdout { codec => rubydebug }
}
3. Filebeat Configuration
# filebeat/filebeat.yml
filebeat.inputs:
# Application logs
- type: log
enabled: true
paths:
- /var/log/app/*.log
fields:
type: application
environment: ${ENVIRONMENT:dev}
fields_under_root: true
multiline:
pattern: '^\['
negate: true
match: after
# Container logs
- type: container
enabled: true
paths:
- '/var/lib/docker/containers/*/*.log'
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true
# Nginx access logs
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
type: nginx-access
fields_under_root: true
# Nginx error logs
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
fields:
type: nginx-error
fields_under_root: true
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
output.logstash:
hosts: ["logstash:5044"]
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
Structured Logging
1. Node.js Structured Logging
// logger.js
const winston = require('winston');
const { ElasticsearchTransport } = require('winston-elasticsearch');
class Logger {
constructor() {
this.logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: {
service: process.env.SERVICE_NAME || 'myapp',
environment: process.env.NODE_ENV || 'development',
version: process.env.APP_VERSION || '1.0.0',
hostname: require('os').hostname()
},
transports: [
// Console output
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
)
}),
// File output
new winston.transports.File({
filename: 'logs/error.log',
level: 'error',
maxsize: 5242880, // 5MB
maxFiles: 5
}),
new winston.transports.File({
filename: 'logs/combined.log',
maxsize: 5242880,
maxFiles: 5
}),
// Elasticsearch transport
new ElasticsearchTransport({
level: 'info',
clientOpts: {
node: process.env.ELASTICSEARCH_URL || 'http://localhost:9200',
auth: {
username: process.env.ELASTICSEARCH_USER || 'elastic',
password: process.env.ELASTICSEARCH_PASSWORD
}
},
index: 'logs-application'
})
]
});
}
info(message, meta = {}) {
this.logger.info(message, { ...meta, correlationId: this.getCorrelationId() });
}
error(message, error, meta = {}) {
this.logger.error(message, {
...meta,
error: {
message: error.message,
stack: error.stack,
code: error.code
},
correlationId: this.getCorrelationId()
});
}
warn(message, meta = {}) {
this.logger.warn(message, { ...meta, correlationId: this.getCorrelationId() });
}
debug(message, meta = {}) {
this.logger.debug(message, { ...meta, correlationId: this.getCorrelationId() });
}
// HTTP request logging
logRequest(req, res, responseTime) {
this.logger.info('HTTP Request', {
type: 'http_request',
method: req.method,
url: req.url,
statusCode: res.statusCode,
responseTime: responseTime,
userAgent: req.headers['user-agent'],
ip: req.ip,
userId: req.user?.id,
correlationId: req.headers['x-correlation-id']
});
}
// Database query logging
logQuery(query, duration, results) {
this.logger.debug('Database Query', {
type: 'database_query',
query: query,
duration: duration,
resultCount: results.length,
correlationId: this.getCorrelationId()
});
}
// Business event logging
logEvent(eventName, eventData) {
this.logger.info('Business Event', {
type: 'business_event',
eventName: eventName,
eventData: eventData,
correlationId: this.getCorrelationId()
});
}
getCorrelationId() {
// Get from async context or generate new one
return require('async_hooks').executionAsyncId().toString();
}
}
module.exports = new Logger();
2. Express Middleware
// logging-middleware.js
const logger = require('./logger');
const { v4: uuidv4 } = require('uuid');
function requestLogger(req, res, next) {
const startTime = Date.now();
const correlationId = req.headers['x-correlation-id'] || uuidv4();
// Add correlation ID to request
req.correlationId = correlationId;
res.setHeader('X-Correlation-ID', correlationId);
// Log request
logger.info('Incoming request', {
method: req.method,
url: req.url,
correlationId: correlationId,
userAgent: req.headers['user-agent'],
ip: req.ip
});
// Capture response
res.on('finish', () => {
const responseTime = Date.now() - startTime;
logger.logRequest(req, res, responseTime);
// Log slow requests
if (responseTime > 1000) {
logger.warn('Slow request detected', {
method: req.method,
url: req.url,
responseTime: responseTime,
statusCode: res.statusCode,
correlationId: correlationId
});
}
});
next();
}
function errorLogger(err, req, res, next) {
logger.error('Request error', err, {
method: req.method,
url: req.url,
statusCode: res.statusCode,
correlationId: req.correlationId
});
next(err);
}
module.exports = { requestLogger, errorLogger };
Log Analysis and Visualization
1. Kibana Dashboard Configuration
{
"dashboard": {
"title": "Application Logs Dashboard",
"panels": [
{
"id": "log-volume",
"type": "line",
"title": "Log Volume Over Time",
"query": {
"query": "*",
"timeField": "@timestamp"
},
"visualization": {
"type": "line",
"yAxisLabel": "Log Count",
"xAxisLabel": "Time"
}
},
{
"id": "error-rate",
"type": "metric",
"title": "Error Rate",
"query": {
"query": "log_level:ERROR OR log_level:FATAL",
"timeField": "@timestamp"
},
"visualization": {
"type": "gauge",
"colorScheme": "stoplight"
}
},
{
"id": "top-errors",
"type": "table",
"title": "Top Error Messages",
"query": {
"query": "log_level:ERROR",
"size": 10,
"aggregations": {
"top_errors": {
"terms": {
"field": "log_message.keyword",
"size": 10
}
}
}
}
},
{
"id": "response-time",
"type": "histogram",
"title": "Response Time Distribution",
"query": {
"query": "type:http_request",
"timeField": "@timestamp"
},
"visualization": {
"field": "responseTime",
"interval": 100
}
},
{
"id": "log-levels",
"type": "pie",
"title": "Log Levels Distribution",
"query": {
"query": "*",
"aggregations": {
"log_levels": {
"terms": {
"field": "log_level.keyword"
}
}
}
}
},
{
"id": "service-errors",
"type": "bar",
"title": "Errors by Service",
"query": {
"query": "log_level:ERROR",
"aggregations": {
"services": {
"terms": {
"field": "service.keyword",
"size": 10
}
}
}
}
}
],
"timeRange": {
"from": "now-24h",
"to": "now"
},
"refreshInterval": "30s"
}
}