Merge pull request #10 from abhishekbhakat/1-add-a-safe-mode-for-read-only-operations
1 add a safe mode for read only operations
This commit is contained in:
@@ -21,7 +21,6 @@ https://github.com/user-attachments/assets/f3e60fff-8680-4dd9-b08e-fa7db655a705
|
|||||||
"airflow-mcp-server"
|
"airflow-mcp-server"
|
||||||
],
|
],
|
||||||
"env": {
|
"env": {
|
||||||
"OPENAPI_SPEC": "<path_to_spec.yaml>",
|
|
||||||
"AIRFLOW_BASE_URL": "http://<host:port>/api/v1",
|
"AIRFLOW_BASE_URL": "http://<host:port>/api/v1",
|
||||||
"AUTH_TOKEN": "<base64_encoded_username_password>"
|
"AUTH_TOKEN": "<base64_encoded_username_password>"
|
||||||
}
|
}
|
||||||
@@ -30,12 +29,13 @@ https://github.com/user-attachments/assets/f3e60fff-8680-4dd9-b08e-fa7db655a705
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
> You can download the openapi spec from [Airflow REST API](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html)
|
|
||||||
|
|
||||||
# Scope
|
# Scope
|
||||||
|
|
||||||
2 different streams in which Airflow MCP Server can be used:
|
2 different streams in which Airflow MCP Server can be used:
|
||||||
- Adding Airflow to AI (_complete access to an Airflow deployment_)
|
- Adding Airflow to AI (_complete access to an Airflow deployment_)
|
||||||
- This will enable AI to be able to write DAGs and just do things in a schedule on its own.
|
- This will enable AI to be able to write DAGs and just do things in a schedule on its own.
|
||||||
|
- Use command `airflow-mcp-server` or `airflow-mcp-server --unsafe`.
|
||||||
- Adding AI to Airflow (_read-only access using Airflow Plugin_)
|
- Adding AI to Airflow (_read-only access using Airflow Plugin_)
|
||||||
- This stream can enable Users to be able to get a better understanding about their deployment. Specially in cases where teams have hundreds, if not thousands of dags.
|
- This stream can enable Users to be able to get a better understanding about their deployment. Specially in cases where teams have hundreds, if not thousands of dags.
|
||||||
|
- Use command `airflow-mcp-server --safe`.
|
||||||
|
|||||||
@@ -18,7 +18,6 @@ A [Model Context Protocol](https://modelcontextprotocol.io/) server for controll
|
|||||||
"airflow-mcp-server"
|
"airflow-mcp-server"
|
||||||
],
|
],
|
||||||
"env": {
|
"env": {
|
||||||
"OPENAPI_SPEC": "<path_to_spec.yaml>",
|
|
||||||
"AIRFLOW_BASE_URL": "http://<host:port>/api/v1",
|
"AIRFLOW_BASE_URL": "http://<host:port>/api/v1",
|
||||||
"AUTH_TOKEN": "<base64_encoded_username_password>"
|
"AUTH_TOKEN": "<base64_encoded_username_password>"
|
||||||
}
|
}
|
||||||
@@ -27,13 +26,29 @@ A [Model Context Protocol](https://modelcontextprotocol.io/) server for controll
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Operation Modes
|
||||||
|
|
||||||
|
The server supports two operation modes:
|
||||||
|
|
||||||
|
- **Safe Mode** (`--safe`): Only allows read-only operations (GET requests). This is useful when you want to prevent any modifications to your Airflow instance.
|
||||||
|
- **Unsafe Mode** (`--unsafe`): Allows all operations including modifications. This is the default mode.
|
||||||
|
|
||||||
|
To start in safe mode:
|
||||||
|
```bash
|
||||||
|
airflow-mcp-server --safe
|
||||||
|
```
|
||||||
|
|
||||||
|
To explicitly start in unsafe mode (though this is default):
|
||||||
|
```bash
|
||||||
|
airflow-mcp-server --unsafe
|
||||||
|
```
|
||||||
|
|
||||||
### Considerations
|
### Considerations
|
||||||
|
|
||||||
The MCP Server expects environment variables to be set:
|
The MCP Server expects environment variables to be set:
|
||||||
- `AIRFLOW_BASE_URL`: The base URL of the Airflow API
|
- `AIRFLOW_BASE_URL`: The base URL of the Airflow API
|
||||||
- `AUTH_TOKEN`: The token to use for authorization (_This should be base64 encoded username:password_)
|
- `AUTH_TOKEN`: The token to use for authorization (_This should be base64 encoded username:password_)
|
||||||
- `OPENAPI_SPEC`: The path to the OpenAPI spec file
|
- `OPENAPI_SPEC`: The path to the OpenAPI spec file (_Optional_) (_defaults to latest stable release_)
|
||||||
|
|
||||||
*Currently, only Basic Auth is supported.*
|
*Currently, only Basic Auth is supported.*
|
||||||
|
|
||||||
@@ -45,6 +60,7 @@ The default is 100 items, but you can change it using `maximum_page_limit` optio
|
|||||||
|
|
||||||
- [x] First API
|
- [x] First API
|
||||||
- [x] Parse OpenAPI Spec
|
- [x] Parse OpenAPI Spec
|
||||||
|
- [x] Safe/Unsafe mode implementation
|
||||||
- [ ] Parse proper description with list_tools.
|
- [ ] Parse proper description with list_tools.
|
||||||
- [ ] Airflow config fetch (_specifically for page limit_)
|
- [ ] Airflow config fetch (_specifically for page limit_)
|
||||||
- [ ] Env variables optional (_env variables might not be ideal for airflow plugins_)
|
- [ ] Env variables optional (_env variables might not be ideal for airflow plugins_)
|
||||||
|
|||||||
@@ -1,19 +1,19 @@
|
|||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
|
||||||
from airflow_mcp_server.server import serve
|
from airflow_mcp_server.server_safe import serve as serve_safe
|
||||||
|
from airflow_mcp_server.server_unsafe import serve as serve_unsafe
|
||||||
|
|
||||||
|
|
||||||
@click.command()
|
@click.command()
|
||||||
@click.option("-v", "--verbose", count=True)
|
@click.option("-v", "--verbose", count=True, help="Increase verbosity")
|
||||||
def main(verbose: bool) -> None:
|
@click.option("--safe", "-s", is_flag=True, help="Use only read-only tools")
|
||||||
|
@click.option("--unsafe", "-u", is_flag=True, help="Use all tools (default)")
|
||||||
|
def main(verbose: int, safe: bool, unsafe: bool) -> None:
|
||||||
"""MCP server for Airflow"""
|
"""MCP server for Airflow"""
|
||||||
import asyncio
|
|
||||||
|
|
||||||
logging_level = logging.WARN
|
logging_level = logging.WARN
|
||||||
if verbose == 1:
|
if verbose == 1:
|
||||||
logging_level = logging.INFO
|
logging_level = logging.INFO
|
||||||
@@ -21,7 +21,14 @@ def main(verbose: bool) -> None:
|
|||||||
logging_level = logging.DEBUG
|
logging_level = logging.DEBUG
|
||||||
|
|
||||||
logging.basicConfig(level=logging_level, stream=sys.stderr)
|
logging.basicConfig(level=logging_level, stream=sys.stderr)
|
||||||
asyncio.run(serve())
|
|
||||||
|
if safe and unsafe:
|
||||||
|
raise click.UsageError("Options --safe and --unsafe are mutually exclusive")
|
||||||
|
|
||||||
|
if safe:
|
||||||
|
asyncio.run(serve_safe())
|
||||||
|
else: # Default to unsafe mode
|
||||||
|
asyncio.run(serve_unsafe())
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
45
airflow-mcp-server/src/airflow_mcp_server/server_safe.py
Normal file
45
airflow-mcp-server/src/airflow_mcp_server/server_safe.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from mcp.server import Server
|
||||||
|
from mcp.server.stdio import stdio_server
|
||||||
|
from mcp.types import TextContent, Tool
|
||||||
|
|
||||||
|
from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def serve() -> None:
|
||||||
|
"""Start MCP server in safe mode (read-only operations)."""
|
||||||
|
required_vars = ["AIRFLOW_BASE_URL", "AUTH_TOKEN"]
|
||||||
|
if not all(var in os.environ for var in required_vars):
|
||||||
|
raise ValueError(f"Missing required environment variables: {required_vars}")
|
||||||
|
|
||||||
|
server = Server("airflow-mcp-server-safe")
|
||||||
|
|
||||||
|
@server.list_tools()
|
||||||
|
async def list_tools() -> list[Tool]:
|
||||||
|
try:
|
||||||
|
return await get_airflow_tools(mode="safe")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to list tools: %s", e)
|
||||||
|
raise
|
||||||
|
|
||||||
|
@server.call_tool()
|
||||||
|
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
|
||||||
|
try:
|
||||||
|
if not name.startswith("get_"):
|
||||||
|
raise ValueError("Only GET operations allowed in safe mode")
|
||||||
|
tool = await get_tool(name)
|
||||||
|
async with tool.client:
|
||||||
|
result = await tool.run(body=arguments)
|
||||||
|
return [TextContent(type="text", text=str(result))]
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Tool execution failed: %s", e)
|
||||||
|
raise
|
||||||
|
|
||||||
|
options = server.create_initialization_options()
|
||||||
|
async with stdio_server() as (read_stream, write_stream):
|
||||||
|
await server.run(read_stream, write_stream, options, raise_exceptions=True)
|
||||||
43
airflow-mcp-server/src/airflow_mcp_server/server_unsafe.py
Normal file
43
airflow-mcp-server/src/airflow_mcp_server/server_unsafe.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from mcp.server import Server
|
||||||
|
from mcp.server.stdio import stdio_server
|
||||||
|
from mcp.types import TextContent, Tool
|
||||||
|
|
||||||
|
from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def serve() -> None:
|
||||||
|
"""Start MCP server in unsafe mode (all operations)."""
|
||||||
|
required_vars = ["AIRFLOW_BASE_URL", "AUTH_TOKEN"]
|
||||||
|
if not all(var in os.environ for var in required_vars):
|
||||||
|
raise ValueError(f"Missing required environment variables: {required_vars}")
|
||||||
|
|
||||||
|
server = Server("airflow-mcp-server-unsafe")
|
||||||
|
|
||||||
|
@server.list_tools()
|
||||||
|
async def list_tools() -> list[Tool]:
|
||||||
|
try:
|
||||||
|
return await get_airflow_tools(mode="unsafe")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to list tools: %s", e)
|
||||||
|
raise
|
||||||
|
|
||||||
|
@server.call_tool()
|
||||||
|
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
|
||||||
|
try:
|
||||||
|
tool = await get_tool(name)
|
||||||
|
async with tool.client:
|
||||||
|
result = await tool.run(body=arguments)
|
||||||
|
return [TextContent(type="text", text=str(result))]
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Tool execution failed: %s", e)
|
||||||
|
raise
|
||||||
|
|
||||||
|
options = server.create_initialization_options()
|
||||||
|
async with stdio_server() as (read_stream, write_stream):
|
||||||
|
await server.run(read_stream, write_stream, options, raise_exceptions=True)
|
||||||
@@ -68,8 +68,11 @@ async def _initialize_tools() -> None:
|
|||||||
raise ValueError(f"Failed to initialize tools: {e}") from e
|
raise ValueError(f"Failed to initialize tools: {e}") from e
|
||||||
|
|
||||||
|
|
||||||
async def get_airflow_tools() -> list[Tool]:
|
async def get_airflow_tools(mode: str = "unsafe") -> list[Tool]:
|
||||||
"""Get list of all available Airflow tools.
|
"""Get list of available Airflow tools based on mode.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
mode: "safe" for GET operations only, "unsafe" for all operations (default)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of MCP Tool objects representing available operations
|
List of MCP Tool objects representing available operations
|
||||||
@@ -83,6 +86,9 @@ async def get_airflow_tools() -> list[Tool]:
|
|||||||
tools = []
|
tools = []
|
||||||
for operation_id, tool in _tools_cache.items():
|
for operation_id, tool in _tools_cache.items():
|
||||||
try:
|
try:
|
||||||
|
# Skip non-GET operations in safe mode
|
||||||
|
if mode == "safe" and not tool.operation.method.lower() == "get":
|
||||||
|
continue
|
||||||
schema = tool.operation.input_model.model_json_schema()
|
schema = tool.operation.input_model.model_json_schema()
|
||||||
tools.append(
|
tools.append(
|
||||||
Tool(
|
Tool(
|
||||||
|
|||||||
Reference in New Issue
Block a user