Content
# MCS (MCP-Celery System)
[](LICENSE)
[](https://www.python.org/downloads/)
[](https://docs.celeryproject.org/)
[](https://www.docker.com/)
> 🚀 **An innovative AI Agent autonomous task management system** - Built on MCP and Celery, enabling AI Agents to automatically generate, deploy, test, and invoke distributed tasks.
---
## 📋 Table of Contents
- [Introduction](#-introduction)
- [Core Features](#-core-features)
- [System Architecture](#-system-architecture)
- [Quick Start](#-quick-start)
- [Detailed Usage Guide](#-detailed-usage-guide)
- [Advantages and Characteristics](#-advantages-and-characteristics)
- [Use Cases](#-use-cases)
- [Project Structure](#-project-structure)
- [Configuration Instructions](#-configuration-instructions)
- [API Documentation](#-api-documentation)
- [Best Practices](#-best-practices)
- [Future Plans](#-future-plans)
- [Contribution Guidelines](#-contribution-guidelines)
- [License](#-license)
---
## 🌟 Introduction
**MCS (MCP-Celery System)** is an innovative distributed task management system that deeply integrates **Model Context Protocol (MCP)** with **Celery**, providing AI Agents with complete task lifecycle management capabilities.
Traditional task systems require manual coding, deployment, and configuration. MCS allows AI Agents to:
- 🤖 **Autonomously generate code** - Automatically generate complete Celery task code based on requirements
- 🚀 **Automatically deploy** - One-click deployment to Docker containerized Worker nodes
- 🧪 **Self-test** - Automatically test task functionality after deployment
- 📞 **Dynamically invoke** - Discover and invoke any registered task through a unified interface
- 📊 **Intelligently manage** - Automatically maintain task metadata and classification information
### Why Choose MCS?
In the era of AI Agents, we need a system that enables Agents to **autonomously expand capabilities**. MCS is such a system:
- ✅ **Zero human intervention** - Agents can autonomously complete the entire process from code generation to deployment
- ✅ **Complete isolation** - Each task runs in an independent Docker container, unaffected by others
- ✅ **Dynamic discovery** - Newly deployed tasks can be immediately discovered and used by other Agents
- ✅ **Elastic scalability** - Supports horizontal scaling to easily handle high concurrency scenarios
- ✅ **Out-of-the-box** - Provides complete templates and toolchains, lowering the entry barrier
---
## ✨ Core Features
### 1. 🔄 Complete Task Lifecycle Management
```
Requirement Description → Code Generation → Automatic Deployment → Containerized Execution → Function Testing → Task Registration → Dynamic Invocation
```
### 2. 🐳 Docker Containerized Deployment
- **Automatic Image Building** - Automatically generates Dockerfile and builds images based on code
- **Isolated Runtime Environment** - Each task runs independently with resource isolation
- **Automatic Restart** - Containers automatically restart on abnormal exit
- **Environment Variable Injection** - Automatically configures environment variables like Redis
### 3. 📦 Intelligent Task Registration and Discovery
- **Redis Metadata Storage** - Unified storage for task information, parameters, and classifications
- **Classification Management** - Supports task classification for easy querying and management
- **Dynamic Discovery** - Newly deployed tasks can be discovered immediately
- **Version Management** - Supports task updates and version control
### 4. 🎯 Dual-Mode Task Execution
- **Synchronous Mode** (`trigger_celery_task`) - Waits for task completion and returns results
- **Asynchronous Mode** (`send_celery_task`) - Immediately returns task ID, suitable for long-running tasks
### 5. 🛠️ Powerful MCP Toolset
| Tool Name | Function Description |
|---------------------------------|------------------------------------------------|
| `generate_startMain_code` | Generates a standard Celery task code template |
| `deploy_task` | One-click deployment of code to Worker nodes |
| `trigger_celery_task` | Synchronously executes tasks and retrieves results |
| `send_celery_task` | Asynchronously sends tasks to the queue |
| `get_celery_result` | Queries task execution results |
| `get_available_tasks` | Retrieves a list of all available tasks |
| `get_task_details` | Retrieves detailed information about a task |
| `register_task_info` | Registers task metadata to Redis |
### 6. 🔧 Flexible Configuration System
- **Environment Variable Priority** - Supports overriding configurations via environment variables
- **Multi-Queue Support** - Supports custom queues for task isolation
- **Unified Redis Configuration** - Centralized management of Redis configurations
---
## 🏗️ System Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ AI Agent (Claude) │
│ Interacts with the system via MCP tools │
└────────────────┬────────────────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────────┐
│ MCP Server (mcp_server.py) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Task Tools │ │ Query Tools │ │ Deploy Tools │ │
│ │ • trigger │ │ • get_tasks │ │ • deploy │ │
│ │ • send │ │ • get_result │ │ • generate │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────┬────────────────────────────────────┬────────────────┘
│ │
↓ ↓
┌────────────────────────────────┐ ┌────────────────────────────┐
│ Redis (Message Broker & Storage) │ │ Deploy Worker (Deployment Service) │
│ • Task Queue (Broker) │ │ • Receives deployment requests │
│ • Result Backend (Backend) │ │ • Builds Docker images │
│ • Task Metadata (Metadata) │ │ • Starts containers │
└────────────┬───────────────────┘ └────────────────────────────┘
│ │
↓ ↓
┌─────────────────────────────────────────────────────────────────┐
│ Celery Worker Cluster (Docker Containers) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker N │ │
│ │ Queue: queue1│ │ Queue: queue2│ │ Queue: queueN│ │
│ │ • Task A │ │ • Task C │ │ • Task X │ │
│ │ • Task B │ │ • Task D │ │ • Task Y │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### Workflow
1. **Task Generation Phase**
```
AI Agent → Describe Requirement → generate_startMain_code → Generate Code Template → Save Locally
```
2. **Deployment Phase**
```
AI Agent → deploy_task → Read Code Folder → Send to Deploy Worker
→ Deploy Worker Receives → Create Deployment Directory → Write Files
→ Build Docker Image → Start Container → Return Deployment Result
```
3. **Testing Phase**
```
AI Agent → trigger_celery_task → Send Test Task → Celery Worker Executes
→ Return Execution Result → AI Agent Validates Functionality
```
4. **Registration Phase**
```
AI Agent → register_task_info → Write to Redis → Task Can Be Discovered and Invoked
```
5. **Invocation Phase**
```
Any Agent → get_available_tasks → Discover Tasks
→ trigger_celery_task/send_celery_task → Execute Task
→ get_celery_result → Retrieve Result
```
---
## 🚀 Quick Start
### Prerequisites
- **Python 3.12+**
- **Docker** (for containerized deployment)
- **Redis** (message broker and storage)
### 1. Clone the Project
```bash
git clone https://github.com/wudao1006/MCP-celery-System.git
cd MCP-celery-System
```
### 2. Install Dependencies
```bash
# Install main service dependencies
pip install -r requirement.txt
# Install deployment service dependencies
pip install -r deploy_mcp/deploy_worker_requirements.txt
```
### 3. Configure Redis
⚠️ **Important: Do not modify the example file directly; create your own configuration file.**
**Method A: Using Configuration File**
```bash
# Copy example configuration
cp Redis/redis_config.example.py Redis/redis_config.py
# Edit configuration file
nano Redis/redis_config.py
```
Modify to your actual configuration:
```python
def get_redis_config():
return {
"host": "your-redis-host",
"port": 6379,
"password": "your-secure-password",
"db": 0,
"decode_responses": True,
"socket_timeout": 30,
"socket_connect_timeout": 30,
"retry_on_timeout": True
}
```
**Method B: Using Environment Variables (Recommended)**
```bash
# Copy environment variable template
cp .env.example .env
# Edit environment variables
nano .env
```
Modify to your actual configuration:
```bash
REDIS_HOST=your-redis-host
REDIS_PORT=6379
REDIS_PASSWORD=your-secure-password
REDIS_BROKER_DB=0
REDIS_BACKEND_DB=1
```
### 4. Start Redis
```bash
# Linux/Mac
redis-server
# Docker (recommended)
docker run -d --name redis -p 6379:6379 redis:latest
# If a password is needed
docker run -d --name redis -p 6379:6379 \
--requirepass your-password \
redis:latest
```
### 5. Start Deployment Worker
```bash
# Linux/Mac
cd deploy_mcp
chmod +x start_deploy_worker.sh
./start_deploy_worker.sh
# Windows
cd deploy_mcp
start_deploy_worker.bat
```
### 6. Start MCP Server
```bash
python mcp_server.py
```
### 7. Configure Claude Desktop
Add the MCP server in the Claude Desktop configuration file:
**Mac**: `~/Library/Application Support/Claude/claude_desktop_config.json`
**Windows**: `%APPDATA%\Claude\claude_desktop_config.json`
```json
{
"mcpServers": {
"mcs": {
"command": "python",
"args": ["C:/Users/YourUsername/Desktop/MCS/mcp_server.py"]
}
}
}
```
### 8. Test the System
In Claude Desktop:
```
Please help me create a simple addition task:
- Task Name: simple_add
- Function: Accept two numbers and return their sum
- Queue: test_queue
Then deploy and test it.
```
Claude will automatically:
1. Generate code
2. Create Dockerfile
3. Deploy to Worker nodes
4. Test functionality
5. Register the task
---
## 📖 Detailed Usage Guide
### Complete Workflow Example
#### Scenario: Create a MongoDB Operation Task
**1. Describe Requirement**
```
I need a MongoDB operation Worker that supports the following functionalities:
- Insert documents
- Query documents
- Update documents
- Delete documents
MongoDB Configuration:
- host: localhost
- port: 27017
- database: mydb
```
**2. AI Agent Generates Functional Module**
The Agent generates `mongodb_operations.py` based on the requirements:
```python
from pymongo import MongoClient
def get_mongo_client(host='localhost', port=27017, database='mydb'):
client = MongoClient(host, port)
return client[database]
def insert_document(collection_name, document):
db = get_mongo_client()
result = db[collection_name].insert_one(document)
return str(result.inserted_id)
def find_documents(collection_name, query):
db = get_mongo_client()
documents = list(db[collection_name].find(query))
return documents
# ... other functions
```
**3. Generate Main Startup File**
The Agent calls `generate_startMain_code`:
```python
await generate_startMain_code(
task_name="mongodb_operation",
description="MongoDB database operation task",
parameters=[
{"name": "operation", "type": "str", "required": True, "description": "Operation type: insert/find/update/delete"},
{"name": "collection", "type": "str", "required": True, "description": "Collection name"},
{"name": "data", "type": "dict", "required": True, "description": "Operation data"}
],
function_body="""
from mongodb_operations import insert_document, find_documents, update_document, delete_document
if operation == 'insert':
return insert_document(collection, data)
elif operation == 'find':
return find_documents(collection, data)
elif operation == 'update':
return update_document(collection, data.get('query'), data.get('update'))
elif operation == 'delete':
return delete_document(collection, data)
else:
raise ValueError(f"Unsupported operation: {operation}")
""",
queue="mongodb",
return_type="Any",
additional_files=[
{"filename": "mongodb_operations.py", "content": "# Previously generated code"}
]
)
```
The generated template will be saved to `./generated_tasks/mongodb_operation_mongodb/`.
**4. Agent Completes Code and Creates Dockerfile**
The Agent creates a complete project structure:
```
generated_tasks/mongodb_operation_mongodb/
├── app_mongodb.py # Main startup file
├── mongodb_operations.py # Functional module
├── requirements.txt # Dependency list
└── Dockerfile # Docker build file
```
Example of `Dockerfile`:
```dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["celery", "-A", "app_mongodb", "worker", "-l", "info", "-Q", "mongodb"]
```
**5. Deploy to Worker Nodes**
The Agent calls `deploy_task`:
```python
result = await deploy_task(
task_name="mongodb_operation",
description="MongoDB database operation task",
parameters=[...], # Same as above
queue="mongodb",
category="database",
code_folder_path="./generated_tasks/mongodb_operation_mongodb"
)
```
The system will:
- Read all files in the code folder
- Send to Deploy Worker
- Build Docker image: `celery-mongodb-operation:uuid`
- Start container: `mongodb_operation_mongodb_worker`
- Automatically register the task to Redis
**6. Test Functionality**
The Agent calls `trigger_celery_task` for testing:
```python
# Test insert operation
result = await trigger_celery_task(
task_name="mongodb_operation",
kwargs={
"operation": "insert",
"collection": "users",
"data": {"name": "Alice", "age": 30}
},
queue="mongodb"
)
# Test query operation
result = await trigger_celery_task(
task_name="mongodb_operation",
kwargs={
"operation": "find",
"collection": "users",
"data": {"name": "Alice"}
},
queue="mongodb"
)
```
**7. Task is Now Available**
Other Agents can now:
```python
# Discover tasks
tasks = await get_available_tasks()
# Find mongodb_operation task
# Get detailed information
details = await get_task_details("mongodb_operation")
# Use the task
result = await trigger_celery_task(
task_name="mongodb_operation",
kwargs={...}
)
```
---
## 💡 Advantages and Characteristics
### 1. 🤖 AI-Native Design
- **Born for AI Agents** - All interfaces are designed to be easily understood and used by Agents
- **Natural Language Driven** - Agents can describe requirements in natural language, and the system automatically implements them
- **Zero Learning Cost** - Agents do not need to understand the complexities of Celery or Docker
### 2. 🔒 Security Isolation
- **Container-Level Isolation** - Each task runs in an independent Docker container
- **Resource Limits** - Configurable limits on CPU, memory, etc.
- **Network Isolation** - Supports custom network configurations
### 3. 🚀 High Performance
- **Asynchronous Execution** - Based on Celery's distributed task queue
- **Concurrency Control** - Supports concurrent execution across multiple Workers
- **Result Caching** - Redis result backend for quick task result queries
### 4. 📊 Observability
- **Complete Logs** - Docker container logs record all execution processes
- **Status Tracking** - Real-time querying of task execution status
- **Metadata Management** - Redis stores all task metadata
### 5. 🔧 Easy to Extend
- **Modular Design** - Components are loosely coupled, making them easy to replace and extend
- **Plugin System** - Supports custom tools and hooks
- **Multi-Language Support** - Theoretically supports tasks written in any language (via Docker)
### 6. 🌐 Distributed Architecture
- **Horizontal Scaling** - Easily add more Worker nodes
- **Load Balancing** - Celery automatically distributes tasks to available Workers
- **Fault Tolerance** - Tasks automatically retry on failure
---
## 🎯 Use Cases
### 1. 🤖 AI Agent Capability Expansion
**Scenario**: AI Agent needs to dynamically acquire new capabilities
```
Agent: "I need to handle PDF files and extract text and images."
MCS:
1. Generate PDF processing code (using PyPDF2, pdf2image)
2. Deploy as pdf_processor task
3. Agent can immediately invoke this task to process PDFs
```
**Advantage**: Agents dynamically expand capabilities at runtime without needing to restart or manual intervention.
### 2. 🔄 Workflow Automation
**Scenario**: Build complex data processing pipelines
```python
# Agent creates a series of tasks
tasks = [
"data_fetcher", # Fetch data from API
"data_cleaner", # Clean data
"data_analyzer", # Analyze data
"report_generator" # Generate report
]
# Agent orchestrates execution
data = await trigger_celery_task("data_fetcher", ...)
cleaned = await trigger_celery_task("data_cleaner", kwargs={"data": data})
analyzed = await trigger_celery_task("data_analyzer", kwargs={"data": cleaned})
report = await trigger_celery_task("report_generator", kwargs={"data": analyzed})
```
### 3. 🧪 Rapid Prototyping
**Scenario**: Quickly validate ideas
```
Developer: "Help me create an image compression service."
MCS:
- Generates code within 5 minutes
- Automatically deploys to a container
- Immediately available API endpoint
```
**Advantage**: Complete the transition from idea to usable service in minutes.
### 4. 🔌 Microservices Architecture
**Scenario**: Build a loosely coupled microservices system
- Each service as an independent Celery task
- Communicate via queues
- Independent deployment and scaling
### 5. 📊 Data Processing Platform
**Scenario**: Large-scale data ETL
```python
# Agent creates data processing tasks
tasks = {
"csv_processor": "Process CSV files",
"json_processor": "Process JSON files",
"excel_processor": "Process Excel files",
"data_aggregator": "Aggregate multiple data sources"
}
# Parallel processing of large files
for file in files:
await send_celery_task(
task_name=get_processor_for(file),
kwargs={"file_path": file},
queue="data_processing"
)
```
### 6. 🌐 Multi-Tenant SaaS Platform
**Scenario**: Provide customized services for different tenants
- Each tenant has an independent task queue
- Isolated execution environments
- Flexible resource allocation
---
## 📁 Project Structure
```
MCS/
├── mcp_server.py # Main file for MCP server
├── mcp_app.py # Celery application configuration
├── file_Reader.py # File reading utility
├── requirement.txt # Main dependencies
├── example_input.txt # Usage examples
├── .mcp.json # MCP configuration file
│
├── Redis/ # Redis related modules
│ ├── redis_client.py # Redis client and task management
│ └── redis_config.py # Redis configuration
│
├── deploy_mcp/ # Deployment service module
│ ├── deploy_worker.py # Deployment Worker implementation
│ ├── mcp_app.py # Deployment service Celery configuration
│ ├── deploy_worker_requirements.txt # Deployment service dependencies
│ ├── DEPLOYMENT_README.md # Deployment service documentation
│ ├── start_deploy_worker.sh # Startup script for Linux/Mac
│ └── start_deploy_worker.bat # Startup script for Windows
│
└── generated_tasks/ # Generated task code (automatically created)
└── {task_name}_{queue}/
├── app_{queue}.py # Task main file
├── Dockerfile # Docker build file
├── requirements.txt # Task dependencies
└── ... # Other auxiliary files
```
---
### Redis Configuration
**Method 1: Configuration File** (`Redis/redis_config.py`)
```python
DEFAULT_REDIS_CONFIG = {
'host': 'localhost',
'port': 6379,
'password': 'your_password',
'db': 0,
'decode_responses': True
}
```
**Method 2: Environment Variables** (higher priority)
```bash
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=your_password
export REDIS_BROKER_DB=0
export REDIS_BACKEND_DB=1
```
### Celery Configuration
In `mcp_app.py`:
```python
celery_app.conf.update(
task_serializer='json', # Task serialization format
accept_content=['json'], # Accepted content types
result_serializer='json', # Result serialization format
timezone='UTC', # Timezone
enable_utc=True, # Enable UTC
worker_prefetch_multiplier=1, # Number of tasks prefetched by Worker
task_acks_late=True, # Acknowledge task after completion
result_expires=3600, # Result expiration time (seconds)
)
```
### Docker Configuration
The deployment Worker automatically configures the following environment variables:
```bash
REDIS_HOST
REDIS_PORT
REDIS_PASSWORD
REDIS_BROKER_DB
REDIS_BACKEND_DB
C_FORCE_ROOT=1 # Allow running as root
```
---
## 📚 API Documentation
### MCP Tool List
#### 1. `generate_startMain_code`
Generates a Celery task code template.
**Parameters**:
- `task_name` (str): Task name
- `description` (str): Task description
- `parameters` (List[Dict]): Parameter list
- `function_body` (str): Function implementation code
- `queue` (str): Queue name, default "celery"
- `return_type` (str): Return type, default "Any"
- `tasks` (List[Dict], optional): Additional task list
- `additional_files` (List[Dict], optional): Additional file list
**Returns**: Code generation prompt template (string)
**Example**:
```python
prompt = generate_startMain_code(
task_name="add_numbers",
description="Add two numbers",
parameters=[
{"name": "a", "type": "int", "required": True, "description": "First number"},
{"name": "b", "type": "int", "required": True, "description": "Second number"}
],
function_body="return a + b",
queue="math",
return_type="int"
)
```
#### 2. `deploy_task`
Deploys a task to Worker nodes.
**Parameters**:
- `task_name` (str): Task name
- `description` (str): Task description
- `parameters` (List[Dict]): Parameter list
- `queue` (str): Queue name
- `category` (str): Task category, default "general"
- `code_folder_path` (str, optional): Code folder path
**Returns**: Deployment result (dictionary)
**Example**:
```python
result = await deploy_task(
task_name="add_numbers",
description="Add two numbers",
parameters=[...],
queue="math",
category="arithmetic",
code_folder_path="./generated_tasks/add_numbers_math"
)
# Example return
{
"success": True,
"deployment_id": "abc-123-def",
"docker_image": "celery-add-numbers:abc-123-def",
"container_id": "xyz789",
"worker_status": "RUNNING",
"message": "Task add_numbers code folder deployed successfully"
}
```
#### 3. `trigger_celery_task`
Synchronously executes a task and waits for the result.
**Parameters**:
- `task_name` (str): Task name
- `args` (list, optional): Positional arguments
- `kwargs` (dict, optional): Keyword arguments
- `queue` (str, optional): Queue name
**Returns**: Execution result (dictionary)
**Example**:
```python
result = await trigger_celery_task(
task_name="add_numbers",
kwargs={"a": 10, "b": 20},
queue="math"
)
# Example return
{
"success": True,
"task_id": "task-id-123",
"task_name": "add_numbers",
"queue": "math",
"status": "PENDING",
"message": "Execution result: 30"
}
```
#### 4. `send_celery_task`
Asynchronously sends a task (does not wait for the result).
**Parameters**: Same as `trigger_celery_task`
**Returns**: Task ID (dictionary)
**Example**:
```python
result = await send_celery_task(
task_name="long_running_task",
kwargs={"data": "..."},
queue="background"
)
# Example return
{
"success": True,
"task_id": "task-id-456",
"task_name": "long_running_task",
"queue": "background",
"status": "SENT",
"message": "Task sent to background queue, Task ID: task-id-456"
}
```
#### 5. `get_celery_result`
Queries the execution result of a task.
**Parameters**:
- `task_id` (str): Task ID
**Returns**: Task status and result (dictionary)
**Example**:
```python
result = await get_celery_result("task-id-456")
# Example return (success)
{
"success": True,
"task_id": "task-id-456",
"status": "SUCCESS",
"result": {"data": "..."},
"message": "Task executed successfully"
}
# Example return (pending)
{
"success": True,
"task_id": "task-id-456",
"status": "PENDING",
"result": None,
"message": "Task status: PENDING"
}
```
#### 6. `get_available_tasks`
Retrieves all available tasks.
**Parameters**: None
**Returns**: Task list (dictionary)
**Example**:
```python
result = await get_available_tasks()
# Example return
{
"success": True,
"tasks": [
{
"name": "add_numbers",
"description": "Add two numbers",
"parameters": [...],
"return_type": "int",
"category": "arithmetic",
"queue": "math",
"created_at": "2025-01-15T10:00:00",
"last_updated": "2025-01-15T10:00:00"
},
...
],
"total_count": 10,
"message": "Successfully retrieved 10 available tasks"
}
```
#### 7. `get_task_details`
Retrieves detailed information about a task.
**Parameters**:
- `task_name` (str): Task name
**Returns**: Task details (dictionary)
**Example**:
```python
result = await get_task_details("add_numbers")
```
#### 8. `register_task_info`
Manually registers a task to Redis.
**Parameters**:
- `task_name` (str): Task name
- `description` (str): Task description
- `parameters` (List[Dict]): Parameter list
- `return_type` (str): Return type, default "Any"
- `category` (str): Category, default "general"
- `queue` (str): Queue, default "celery"
**Returns**: Registration result (dictionary)
---
## 🎓 Best Practices
### 1. Task Naming Conventions
- Use lowercase letters and underscores: `process_data`
- Start with a verb: `fetch_users`, `generate_report`
- Clearly define task purpose: `mongodb_insert` instead of `db_op`
### 2. Queue Organization
- Categorize by functionality: `database`, `processing`, `notification`
- Categorize by priority: `high_priority`, `normal`, `low_priority`
- Categorize by resource requirements: `cpu_intensive`, `io_intensive`
### 3. Error Handling
Add comprehensive error handling in tasks:
```python
@celery_app.task(name='mcp_app.risky_task', queue='default')
def risky_task(data):
try:
# Task logic
result = process(data)
return {"success": True, "result": result}
except ValueError as e:
# Parameter error
return {"success": False, "error": "invalid_input", "message": str(e)}
except Exception as e:
# Other errors
return {"success": False, "error": "internal_error", "message": str(e)}
```
### 4. Task Timeout Settings
Set timeouts for long-running tasks:
```python
celery_app.conf.update(
task_soft_time_limit=300, # 5 minutes soft timeout (raises exception)
task_time_limit=600, # 10 minutes hard timeout (force termination)
)
```
### 5. Resource Cleanup
Clean up resources at the end of tasks:
```python
@celery_app.task(name='mcp_app.file_processor', queue='processing')
def file_processor(file_path):
temp_file = None
try:
# Process file
temp_file = create_temp_file()
result = process_file(file_path, temp_file)
return result
finally:
# Ensure temporary files are cleaned up
if temp_file and os.path.exists(temp_file):
os.remove(temp_file)
```
### 6. Task Idempotency
Design tasks to be repeatable:
```python
@celery_app.task(name='mcp_app.update_user', queue='database')
def update_user(user_id, data):
# Use upsert instead of insert
db.users.update_one(
{"_id": user_id},
{"$set": data},
upsert=True # Create if not exists
)
```
### 7. Monitoring and Logging
Add detailed logging:
```python
import logging
logger = logging.getLogger(__name__)
@celery_app.task(name='mcp_app.important_task', queue='critical')
def important_task(data):
logger.info(f"Starting task, data size: {len(data)}")
try:
result = process(data)
logger.info(f"Task completed, result: {result}")
return result
except Exception as e:
logger.error(f"Task failed: {e}", exc_info=True)
raise
```
---
## 🔮 Expandable Directions
- [ ] **Web Management Interface** - Visual management of tasks and monitoring execution status
- [ ] **Task Version Management** - Support for multi-version coexistence and gray release of tasks
- [ ] **Performance Monitoring** - Integration of Prometheus + Grafana for monitoring
- [ ] **Task Dependency Management** - Support for dependencies between tasks
- [ ] **Automatic Retry Strategies** - Intelligent task retry and degradation mechanisms
- [ ] **Kubernetes Support** - Container orchestration based on K8s
- [ ] **Multi-Language Support** - Support for tasks written in Node.js, Go, Java, etc.
- [ ] **Task Marketplace** - Community-shared task template marketplace
- [ ] **A/B Testing** - Support for A/B testing of tasks
- [ ] **Intelligent Scheduling** - Smart task scheduling based on load
- [ ] **Federated Learning Support** - Support for distributed machine learning tasks
- [ ] **Edge Computing** - Support for deploying tasks on edge nodes
- [ ] **Multi-Cloud Deployment** - Support for cloud platforms like AWS, Azure, GCP
- [ ] **Automatic Scaling** - Automatically scale Workers based on load
- [ ] **AI-Assisted Optimization** - AI automatically optimizes task code and configurations
---
## 📄 License
This project is licensed under the **MIT License** - see the [LICENSE](LICENSE) file for details.
---
## 🙏 Acknowledgments
- **Anthropic** - Claude AI and MCP protocol
- **Celery** - Powerful distributed task queue
- **Redis** - High-performance in-memory database
- **Docker** - Containerization technology
---
## 🌟 Star History
If this project has been helpful to you, please give us a Star ⭐!
[](https://star-history.com/#yourusername/MCS&Date)
---
<div align="center">
**[⬆ Back to Top](#mcs-mcp-celery-system)**
Made with ❤️ by the MCS Team
</div>
Connection Info
You Might Also Like
markitdown
MarkItDown-MCP is a lightweight server for converting URIs to Markdown.
firecrawl
Firecrawl MCP Server enables web scraping, crawling, and content extraction.
servers
Model Context Protocol Servers
Time
A Model Context Protocol server for time and timezone conversions.
Filesystem
Node.js MCP Server for filesystem operations with dynamic access control.
Sequential Thinking
A structured MCP server for dynamic problem-solving and reflective thinking.