A production-grade distributed log processing and search platform built with Java, Apache Kafka, and Elasticsearch. Capable of ingesting 50,000+ log events per minute with sub-300ms search latency.
- High-throughput ingestion β Single and batch log ingestion via REST API, published to Kafka with 6 partitions
- Intelligent parsing β Automatic detection and parsing of Logback, Log4j, JSON, and plaintext log formats
- Real-time processing β Kafka consumer pipeline processes and indexes logs into Elasticsearch within seconds
- Powerful search β Full-text search, filter by service/host/level/time range, trace correlation
- Dead Letter Queue β Failed messages routed to
log-events.DLTfor reliability - Analytics Dashboard β React frontend with live charts, log level distribution, service analytics
- Trace Timeline β Visualize an entire request journey across microservices by trace ID
βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β React FrontendβββββΆβIngestion ServiceββββββΆβ Apache Kafka βββββΆβProcessor Serviceβ
β (Port 3000) β β (Port 8081) β β (Port 9092) β β (Port 8082) β
βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ ββββββββββ¬βββββββββ
β β
β βββββββββββββββββββ βββββββββββΌβββββββββ
ββββββββββββββΆβ Search API ββββββββββββββββββββββββββββββ β Elasticsearch β
β (Port 8083) β β (Port 9200) β
βββββββββββββββββββ ββββββββββββββββββββ
| Module | Description |
|---|---|
common |
Shared models (LogEvent, SearchRequest, SearchResponse), DTOs |
ingestion |
REST API for receiving logs, Kafka producer, log validation |
processor |
Kafka consumer, log parser (Logback/Log4j/JSON/Plaintext), Elasticsearch indexer |
search-api |
Full-text search, aggregations, trace lookup, pagination |
queue |
Kafka topic configuration, Dead Letter Queue setup |
- Java 17+
- Docker & Docker Compose
- Node.js 18+ (for frontend)
docker compose up -d zookeeper kafka elasticsearch kibana kafka-uiWait for all containers to be healthy (~30 seconds), then create Kafka topics:
docker exec kafka kafka-topics --bootstrap-server localhost:9092 \
--create --if-not-exists --topic log-events --partitions 6 --replication-factor 1
docker exec kafka kafka-topics --bootstrap-server localhost:9092 \
--create --if-not-exists --topic log-events.DLT --partitions 3 --replication-factor 1Open IntelliJ IDEA and run each main class:
ingestion β IngestionServiceApplication (port 8081)
processor β ProcessorApplication (port 8082)
search-api β SearchApiApplication (port 8083)
cd frontend
npm install
npm run devOpen http://localhost:3000
POST /api/v1/logs
Content-Type: application/json
{
"serviceName": "payment-service",
"host": "prod-node-01",
"log": "2024-01-15 10:23:45.123 [main] ERROR com.example.PaymentService - Payment failed: timeout traceId=abc123"
}Response:
{
"status": "accepted",
"eventId": "23847a98-6ba7-4f98-abbf-c12b260d8dca",
"ingestedAt": "2026-02-22T13:43:35.001899Z"
}POST /api/v1/logs/batch
Content-Type: application/json
{
"serviceName": "auth-service",
"host": "prod-node-02",
"logs": [
"INFO: User login successful userId=12345 traceId=abc123",
"WARN: Rate limit approaching for IP 192.168.1.1",
"ERROR: JWT token expired traceId=abc123"
]
}GET /api/v1/search?query=payment&serviceName=payment-service&page=0&size=20GET /api/v1/search?levels=ERROR,WARN&sortOrder=descGET /api/v1/search/trace/{traceId}GET /api/v1/search/{id}Search Response:
{
"hits": [...],
"totalHits": 42,
"tookMs": 31,
"page": 0,
"size": 20,
"totalPages": 3,
"levelCounts": { "ERROR": 5, "WARN": 12, "INFO": 25 },
"serviceCounts": { "payment-service": 20, "auth-service": 22 }
}| Technology | Version | Purpose |
|---|---|---|
| Java | 17 | Core language |
| Spring Boot | 3.3.5 | Application framework |
| Apache Kafka | 7.5.0 | Message streaming |
| Spring Kafka | 3.2.4 | Kafka integration |
| Elasticsearch | 8.11.0 | Search & analytics engine |
| Elasticsearch Java Client | 8.11.0 | ES Java API |
| Lombok | Latest | Boilerplate reduction |
| Maven | 3.x | Build tool |
| Technology | Version | Purpose |
|---|---|---|
| React | 18 | UI framework |
| Vite | 5 | Build tool & dev server |
| Tailwind CSS | 3 | Styling |
| Recharts | 2 | Charts & analytics |
| Axios | 1.6 | HTTP client |
| React Router | 6 | Client-side routing |
| Lucide React | Latest | Icons |
| Service | Port | Purpose |
|---|---|---|
| Zookeeper | 2181 | Kafka coordination |
| Kafka | 9092 | Message broker |
| Elasticsearch | 9200 | Search engine |
| Kibana | 5601 | ES management UI |
| Kafka UI | 8090 | Kafka management UI |
| Metric | Target | Notes |
|---|---|---|
| Ingestion throughput | 50,000+ events/min | 6 Kafka partitions |
| Search latency | < 300ms | p99 |
| Batch size | Up to 1,000 logs | Per request |
| Index refresh | 5 seconds | Configurable |
distributed-log-search/
βββ common/ # Shared models and DTOs
β βββ src/main/java/com/logSearch/common/
β βββ model/LogEvent.java
β βββ dto/SearchRequest.java, SearchResponse.java
βββ ingestion/ # Log ingestion service
β βββ src/main/java/com/logSearch/ingestion/
β βββ controller/LogIngestionController.java
β βββ service/LogIngestionService.java
β βββ config/KafkaProducerConfig.java
βββ processor/ # Kafka consumer + ES indexer
β βββ src/main/java/com/logSearch/processor/
β βββ consumer/LogEventConsumer.java
β βββ service/LogParserService.java
β βββ service/ElasticsearchIndexingService.java
βββ search-api/ # Search REST API
β βββ src/main/java/com/logSearch/search/
β βββ controller/SearchController.java
β βββ service/LogSearchService.java
βββ queue/ # Kafka topic configuration
βββ docker-compose.yml
βββ pom.xml
Real-time log statistics, level distribution, and service breakdown charts.
Search across all logs with filters for service, level, host, and time range.
Visualize a complete request journey across microservices using trace IDs.
Send single or batch logs directly from the UI with sample templates.
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
batch-size: 16384
linger-ms: 5
kafka:
topic:
name: log-events
partitions: 6spring:
kafka:
consumer:
group-id: log-processor-group
max-poll-records: 500
elasticsearch:
index:
prefix: applogselasticsearch:
index:
prefix: applogs
server:
port: 8083The processor automatically detects and parses these formats:
# Logback
2024-01-15 10:23:45.123 [main] ERROR com.example.Service - Message traceId=abc123
# Log4j
2024-01-15 10:23:45,123 ERROR [ThreadName] com.example.Service - Message
# JSON
{"level":"ERROR","message":"Something failed","timestamp":"2024-01-15T10:23:45Z","traceId":"abc123"}
# Plaintext
ERROR: Something went wrong with the payment processor
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'feat: add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License.