diff --git a/README.md b/README.md index cd34bd0..5987c89 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,6 @@ https://github.com/user-attachments/assets/f3e60fff-8680-4dd9-b08e-fa7db655a705 "airflow-mcp-server" ], "env": { - "OPENAPI_SPEC": "", "AIRFLOW_BASE_URL": "http:///api/v1", "AUTH_TOKEN": "" } @@ -30,12 +29,13 @@ https://github.com/user-attachments/assets/f3e60fff-8680-4dd9-b08e-fa7db655a705 } ``` -> You can download the openapi spec from [Airflow REST API](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html) # Scope 2 different streams in which Airflow MCP Server can be used: - Adding Airflow to AI (_complete access to an Airflow deployment_) - This will enable AI to be able to write DAGs and just do things in a schedule on its own. + - Use command `airflow-mcp-server` or `airflow-mcp-server --unsafe`. - Adding AI to Airflow (_read-only access using Airflow Plugin_) - This stream can enable Users to be able to get a better understanding about their deployment. Specially in cases where teams have hundreds, if not thousands of dags. + - Use command `airflow-mcp-server --safe`. diff --git a/airflow-mcp-server/README.md b/airflow-mcp-server/README.md index 52f744e..722735c 100644 --- a/airflow-mcp-server/README.md +++ b/airflow-mcp-server/README.md @@ -18,7 +18,6 @@ A [Model Context Protocol](https://modelcontextprotocol.io/) server for controll "airflow-mcp-server" ], "env": { - "OPENAPI_SPEC": "", "AIRFLOW_BASE_URL": "http:///api/v1", "AUTH_TOKEN": "" } @@ -27,13 +26,29 @@ A [Model Context Protocol](https://modelcontextprotocol.io/) server for controll } ``` +### Operation Modes + +The server supports two operation modes: + +- **Safe Mode** (`--safe`): Only allows read-only operations (GET requests). This is useful when you want to prevent any modifications to your Airflow instance. +- **Unsafe Mode** (`--unsafe`): Allows all operations including modifications. This is the default mode. + +To start in safe mode: +```bash +airflow-mcp-server --safe +``` + +To explicitly start in unsafe mode (though this is default): +```bash +airflow-mcp-server --unsafe +``` ### Considerations The MCP Server expects environment variables to be set: - `AIRFLOW_BASE_URL`: The base URL of the Airflow API - `AUTH_TOKEN`: The token to use for authorization (_This should be base64 encoded username:password_) -- `OPENAPI_SPEC`: The path to the OpenAPI spec file +- `OPENAPI_SPEC`: The path to the OpenAPI spec file (_Optional_) (_defaults to latest stable release_) *Currently, only Basic Auth is supported.* @@ -45,6 +60,7 @@ The default is 100 items, but you can change it using `maximum_page_limit` optio - [x] First API - [x] Parse OpenAPI Spec +- [x] Safe/Unsafe mode implementation - [ ] Parse proper description with list_tools. - [ ] Airflow config fetch (_specifically for page limit_) - [ ] Env variables optional (_env variables might not be ideal for airflow plugins_) diff --git a/airflow-mcp-server/src/airflow_mcp_server/__init__.py b/airflow-mcp-server/src/airflow_mcp_server/__init__.py index 837b616..8decf7f 100644 --- a/airflow-mcp-server/src/airflow_mcp_server/__init__.py +++ b/airflow-mcp-server/src/airflow_mcp_server/__init__.py @@ -1,19 +1,19 @@ +import asyncio import logging -import os import sys -from pathlib import Path import click -from airflow_mcp_server.server import serve +from airflow_mcp_server.server_safe import serve as serve_safe +from airflow_mcp_server.server_unsafe import serve as serve_unsafe @click.command() -@click.option("-v", "--verbose", count=True) -def main(verbose: bool) -> None: +@click.option("-v", "--verbose", count=True, help="Increase verbosity") +@click.option("--safe", "-s", is_flag=True, help="Use only read-only tools") +@click.option("--unsafe", "-u", is_flag=True, help="Use all tools (default)") +def main(verbose: int, safe: bool, unsafe: bool) -> None: """MCP server for Airflow""" - import asyncio - logging_level = logging.WARN if verbose == 1: logging_level = logging.INFO @@ -21,7 +21,14 @@ def main(verbose: bool) -> None: logging_level = logging.DEBUG logging.basicConfig(level=logging_level, stream=sys.stderr) - asyncio.run(serve()) + + if safe and unsafe: + raise click.UsageError("Options --safe and --unsafe are mutually exclusive") + + if safe: + asyncio.run(serve_safe()) + else: # Default to unsafe mode + asyncio.run(serve_unsafe()) if __name__ == "__main__": diff --git a/airflow-mcp-server/src/airflow_mcp_server/server_safe.py b/airflow-mcp-server/src/airflow_mcp_server/server_safe.py new file mode 100644 index 0000000..bf81b3e --- /dev/null +++ b/airflow-mcp-server/src/airflow_mcp_server/server_safe.py @@ -0,0 +1,45 @@ +import logging +import os +from typing import Any + +from mcp.server import Server +from mcp.server.stdio import stdio_server +from mcp.types import TextContent, Tool + +from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool + +logger = logging.getLogger(__name__) + + +async def serve() -> None: + """Start MCP server in safe mode (read-only operations).""" + required_vars = ["AIRFLOW_BASE_URL", "AUTH_TOKEN"] + if not all(var in os.environ for var in required_vars): + raise ValueError(f"Missing required environment variables: {required_vars}") + + server = Server("airflow-mcp-server-safe") + + @server.list_tools() + async def list_tools() -> list[Tool]: + try: + return await get_airflow_tools(mode="safe") + except Exception as e: + logger.error("Failed to list tools: %s", e) + raise + + @server.call_tool() + async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: + try: + if not name.startswith("get_"): + raise ValueError("Only GET operations allowed in safe mode") + tool = await get_tool(name) + async with tool.client: + result = await tool.run(body=arguments) + return [TextContent(type="text", text=str(result))] + except Exception as e: + logger.error("Tool execution failed: %s", e) + raise + + options = server.create_initialization_options() + async with stdio_server() as (read_stream, write_stream): + await server.run(read_stream, write_stream, options, raise_exceptions=True) diff --git a/airflow-mcp-server/src/airflow_mcp_server/server_unsafe.py b/airflow-mcp-server/src/airflow_mcp_server/server_unsafe.py new file mode 100644 index 0000000..bcc7932 --- /dev/null +++ b/airflow-mcp-server/src/airflow_mcp_server/server_unsafe.py @@ -0,0 +1,43 @@ +import logging +import os +from typing import Any + +from mcp.server import Server +from mcp.server.stdio import stdio_server +from mcp.types import TextContent, Tool + +from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool + +logger = logging.getLogger(__name__) + + +async def serve() -> None: + """Start MCP server in unsafe mode (all operations).""" + required_vars = ["AIRFLOW_BASE_URL", "AUTH_TOKEN"] + if not all(var in os.environ for var in required_vars): + raise ValueError(f"Missing required environment variables: {required_vars}") + + server = Server("airflow-mcp-server-unsafe") + + @server.list_tools() + async def list_tools() -> list[Tool]: + try: + return await get_airflow_tools(mode="unsafe") + except Exception as e: + logger.error("Failed to list tools: %s", e) + raise + + @server.call_tool() + async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: + try: + tool = await get_tool(name) + async with tool.client: + result = await tool.run(body=arguments) + return [TextContent(type="text", text=str(result))] + except Exception as e: + logger.error("Tool execution failed: %s", e) + raise + + options = server.create_initialization_options() + async with stdio_server() as (read_stream, write_stream): + await server.run(read_stream, write_stream, options, raise_exceptions=True) diff --git a/airflow-mcp-server/src/airflow_mcp_server/tools/tool_manager.py b/airflow-mcp-server/src/airflow_mcp_server/tools/tool_manager.py index 74df2e5..ee70e86 100644 --- a/airflow-mcp-server/src/airflow_mcp_server/tools/tool_manager.py +++ b/airflow-mcp-server/src/airflow_mcp_server/tools/tool_manager.py @@ -68,8 +68,11 @@ async def _initialize_tools() -> None: raise ValueError(f"Failed to initialize tools: {e}") from e -async def get_airflow_tools() -> list[Tool]: - """Get list of all available Airflow tools. +async def get_airflow_tools(mode: str = "unsafe") -> list[Tool]: + """Get list of available Airflow tools based on mode. + + Args: + mode: "safe" for GET operations only, "unsafe" for all operations (default) Returns: List of MCP Tool objects representing available operations @@ -83,6 +86,9 @@ async def get_airflow_tools() -> list[Tool]: tools = [] for operation_id, tool in _tools_cache.items(): try: + # Skip non-GET operations in safe mode + if mode == "safe" and not tool.operation.method.lower() == "get": + continue schema = tool.operation.input_model.model_json_schema() tools.append( Tool(