A real-time chat moderation system using Rust (WebSocket), Python (FastAPI), Kafka, and PostgreSQL.
This system provides real-time chat moderation with the following components:
- Rust WebSocket Service: Handles WebSocket connections from clients, publishes messages to Kafka, and forwards moderation results back to clients.
- Python FastAPI Service: Consumes chat messages from Kafka, performs moderation (using keyword filtering or LLM), and publishes the results back to Kafka.
- Apache Kafka: Message broker for asynchronous communication between services.
- PostgreSQL: Persistent storage for chat messages and moderation results.
- Client connects to the Rust WebSocket service
- Client sends a chat message
- Rust service:
- Stores the message in PostgreSQL
- Publishes the message to Kafka topic
chat_messages
- Python service:
- Consumes messages from
chat_messagestopic - Performs moderation (keyword filtering or LLM-based)
- Stores moderation results in PostgreSQL
- Publishes moderation results to Kafka topic
moderation_results
- Consumes messages from
- Rust service:
- Consumes moderation results from
moderation_resultstopic - Forwards the results back to the appropriate client
- Consumes moderation results from
/docker: Docker Compose configurations and SQL initialization scripts/rust-service: Rust WebSocket service implementation/python-service: Python FastAPI moderation service implementation/docs: Additional documentation (if any)/tests: End-to-end tests
- Docker and Docker Compose
- Rust (if developing the WebSocket service)
- Python 3.11+ (if developing the moderation service)
The easiest way to run the entire system is using Docker Compose:
# Clone the repository
git clone https://github.com/yourusername/chat-moderation.git
cd chat-moderation
# Start all services
docker compose -f docker/docker-compose.yml up --buildThis will start:
- Zookeeper (port 2181)
- Kafka (port 9092)
- PostgreSQL (port 5432)
- Rust WebSocket Service (port 8080)
- Python FastAPI Service (port 8000)
# Navigate to the Rust service directory
cd rust-service
# Create .env file (if not exists)
echo "DATABASE_URL=postgres://user:password@localhost:5432/chatdb" > .env
echo "RUST_LOG=debug" >> .env
echo "KAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# Build the service
cargo build
# Run the service
cargo runThe Rust service will be available at ws://localhost:8080/ws.
# Navigate to the Python service directory
cd python-service
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Create .env file (if not exists) - basic version
echo "KAFKA_BOOTSTRAP_SERVERS=localhost:9092" > .env
echo "KAFKA_CONSUMER_TOPIC=chat_messages" >> .env
echo "KAFKA_PRODUCER_TOPIC=moderation_results" >> .env
echo "KAFKA_GROUP_ID=moderation-service" >> .env
echo "DATABASE_URL=postgresql+asyncpg://user:password@localhost:5432/chatdb" >> .env
echo "USE_LLM_MODERATION=false" >> .env
# Start the FastAPI server
uvicorn app.main:app --reloadThe Python service API will be available at http://localhost:8000.
GET /ws: WebSocket endpoint for chat clientsGET /health: Health check endpoint
GET /: API informationGET /health: Health check endpointPOST /moderate: HTTP endpoint for manual moderation (useful for testing){ "user_id": "user123", "message_id": 1, "message": "Hello world" }
# Navigate to the Rust service directory
cd rust-service
# Run all tests
cargo test
# Run specific test
cargo test test_name
# Run tests with output
cargo test -- --nocapture# Navigate to the Python service directory
cd python-service
source venv/bin/activate # On Windows: venv\Scripts\activate
# Run all tests
pytest
# Run with coverage
pytest --cov=app
# Run specific test module
pytest tests/test_module.py
# Run tests with verbose output
pytest -v# Ensure all services are running
docker-compose -f docker/docker-compose.yml up -d
# Run the end-to-end tests
python tests/end_to_end_test.pyThe system implements robust error handling:
- Kafka Connectivity: Both services handle Kafka connection failures gracefully, with automatic reconnection and message buffering.
- Database Connectivity: Connection pooling and retry mechanisms ensure database operations are reliable.
- WebSocket Connections: The Rust service manages client disconnections properly, cleaning up resources.
- Start the Docker infrastructure (Kafka, Zookeeper, Postgres)
- Run both services in development mode
- Make code changes
- Run tests to verify changes
- Restart services to apply changes
- Fork the repository
- Create a feature branch
- Make your changes
- Run tests
- Submit a pull request