Add AirflowConfig class for centralized configuration management

This commit is contained in:
2025-02-25 08:53:37 +00:00
parent 420b6fc68f
commit ea60acd54a
6 changed files with 91 additions and 100 deletions

View File

@@ -1,9 +1,11 @@
import asyncio import asyncio
import logging import logging
import os
import sys import sys
import click import click
from airflow_mcp_server.config import AirflowConfig
from airflow_mcp_server.server_safe import serve as serve_safe from airflow_mcp_server.server_safe import serve as serve_safe
from airflow_mcp_server.server_unsafe import serve as serve_unsafe from airflow_mcp_server.server_unsafe import serve as serve_unsafe
@@ -12,7 +14,11 @@ from airflow_mcp_server.server_unsafe import serve as serve_unsafe
@click.option("-v", "--verbose", count=True, help="Increase verbosity") @click.option("-v", "--verbose", count=True, help="Increase verbosity")
@click.option("--safe", "-s", is_flag=True, help="Use only read-only tools") @click.option("--safe", "-s", is_flag=True, help="Use only read-only tools")
@click.option("--unsafe", "-u", is_flag=True, help="Use all tools (default)") @click.option("--unsafe", "-u", is_flag=True, help="Use all tools (default)")
def main(verbose: int, safe: bool, unsafe: bool) -> None: @click.option("--base-url", help="Airflow API base URL")
@click.option("--spec-path", help="Path to OpenAPI spec file")
@click.option("--auth-token", help="Authentication token")
@click.option("--cookie", help="Session cookie")
def main(verbose: int, safe: bool, unsafe: bool, base_url: str = None, spec_path: str = None, auth_token: str = None, cookie: str = None) -> None:
"""MCP server for Airflow""" """MCP server for Airflow"""
logging_level = logging.WARN logging_level = logging.WARN
if verbose == 1: if verbose == 1:
@@ -22,16 +28,30 @@ def main(verbose: int, safe: bool, unsafe: bool) -> None:
logging.basicConfig(level=logging_level, stream=sys.stderr) logging.basicConfig(level=logging_level, stream=sys.stderr)
# Read environment variables with proper precedence
# Environment variables take precedence over CLI arguments
config_base_url = os.environ.get("AIRFLOW_BASE_URL") or base_url
config_spec_path = os.environ.get("AIRFLOW_SPEC_PATH") or spec_path
config_auth_token = os.environ.get("AIRFLOW_AUTH_TOKEN") or auth_token
config_cookie = os.environ.get("AIRFLOW_COOKIE") or cookie
# Initialize configuration
try:
config = AirflowConfig(base_url=config_base_url, spec_path=config_spec_path, auth_token=config_auth_token, cookie=config_cookie)
except ValueError as e:
click.echo(f"Configuration error: {e}", err=True)
sys.exit(1)
# Determine server mode with proper precedence # Determine server mode with proper precedence
if safe and unsafe: if safe and unsafe:
# CLI argument validation # CLI argument validation
raise click.UsageError("Options --safe and --unsafe are mutually exclusive") raise click.UsageError("Options --safe and --unsafe are mutually exclusive")
elif safe: elif safe:
# CLI argument for safe mode # CLI argument for safe mode
asyncio.run(serve_safe()) asyncio.run(serve_safe(config))
else: else:
# Default to unsafe mode # Default to unsafe mode
asyncio.run(serve_unsafe()) asyncio.run(serve_unsafe(config))
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,25 @@
class AirflowConfig:
"""Centralized configuration for Airflow MCP server."""
def __init__(self, base_url: str | None = None, spec_path: str | None = None, auth_token: str | None = None, cookie: str | None = None) -> None:
"""Initialize configuration with provided values.
Args:
base_url: Airflow API base URL
spec_path: Path to OpenAPI spec file
auth_token: Authentication token
cookie: Session cookie
Raises:
ValueError: If required configuration is missing
"""
self.base_url = base_url
if not self.base_url:
raise ValueError("Missing required configuration: base_url")
self.spec_path = spec_path
self.auth_token = auth_token
self.cookie = cookie
if not self.auth_token and not self.cookie:
raise ValueError("Either auth_token or cookie must be provided")

View File

@@ -1,11 +1,11 @@
import logging import logging
import os
from typing import Any from typing import Any
from mcp.server import Server from mcp.server import Server
from mcp.server.stdio import stdio_server from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool from mcp.types import TextContent, Tool
from airflow_mcp_server.config import AirflowConfig
from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool
# ===========THIS IS FOR DEBUGGING WITH MCP INSPECTOR=================== # ===========THIS IS FOR DEBUGGING WITH MCP INSPECTOR===================
@@ -20,35 +20,18 @@ from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def serve() -> None: async def serve(config: AirflowConfig) -> None:
"""Start MCP server. """Start MCP server.
Configuration precedence: Args:
1. Environment variables (highest) config: Configuration object with auth and URL settings
2. Command line arguments (if applicable)
3. Default values (lowest)
For authentication:
1. Cookie authentication (highest)
2. Auth token authentication (secondary)
""" """
# Check for AIRFLOW_BASE_URL which is always required
if "AIRFLOW_BASE_URL" not in os.environ:
raise ValueError("Missing required environment variable: AIRFLOW_BASE_URL")
# Check for either AUTH_TOKEN or COOKIE
has_auth_token = "AUTH_TOKEN" in os.environ
has_cookie = "COOKIE" in os.environ
if not has_auth_token and not has_cookie:
raise ValueError("Either AUTH_TOKEN or COOKIE environment variable must be provided")
server = Server("airflow-mcp-server") server = Server("airflow-mcp-server")
@server.list_tools() @server.list_tools()
async def list_tools() -> list[Tool]: async def list_tools() -> list[Tool]:
try: try:
return await get_airflow_tools() return await get_airflow_tools(config)
except Exception as e: except Exception as e:
logger.error("Failed to list tools: %s", e) logger.error("Failed to list tools: %s", e)
raise raise
@@ -56,7 +39,7 @@ async def serve() -> None:
@server.call_tool() @server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
try: try:
tool = await get_tool(name) tool = await get_tool(config, name)
async with tool.client: async with tool.client:
result = await tool.run(body=arguments) result = await tool.run(body=arguments)
return [TextContent(type="text", text=str(result))] return [TextContent(type="text", text=str(result))]

View File

@@ -1,45 +1,28 @@
import logging import logging
import os
from typing import Any from typing import Any
from mcp.server import Server from mcp.server import Server
from mcp.server.stdio import stdio_server from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool from mcp.types import TextContent, Tool
from airflow_mcp_server.config import AirflowConfig
from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def serve() -> None: async def serve(config: AirflowConfig) -> None:
"""Start MCP server in safe mode (read-only operations). """Start MCP server in safe mode (read-only operations).
Configuration precedence: Args:
1. Environment variables (highest) config: Configuration object with auth and URL settings
2. Command line arguments (if applicable)
3. Default values (lowest)
For authentication:
1. Cookie authentication (highest)
2. Auth token authentication (secondary)
""" """
# Check for AIRFLOW_BASE_URL which is always required
if "AIRFLOW_BASE_URL" not in os.environ:
raise ValueError("Missing required environment variable: AIRFLOW_BASE_URL")
# Check for either AUTH_TOKEN or COOKIE
has_auth_token = "AUTH_TOKEN" in os.environ
has_cookie = "COOKIE" in os.environ
if not has_auth_token and not has_cookie:
raise ValueError("Either AUTH_TOKEN or COOKIE environment variable must be provided")
server = Server("airflow-mcp-server-safe") server = Server("airflow-mcp-server-safe")
@server.list_tools() @server.list_tools()
async def list_tools() -> list[Tool]: async def list_tools() -> list[Tool]:
try: try:
return await get_airflow_tools(mode="safe") return await get_airflow_tools(config, mode="safe")
except Exception as e: except Exception as e:
logger.error("Failed to list tools: %s", e) logger.error("Failed to list tools: %s", e)
raise raise
@@ -49,7 +32,7 @@ async def serve() -> None:
try: try:
if not name.startswith("get_"): if not name.startswith("get_"):
raise ValueError("Only GET operations allowed in safe mode") raise ValueError("Only GET operations allowed in safe mode")
tool = await get_tool(name) tool = await get_tool(config, name)
async with tool.client: async with tool.client:
result = await tool.run(body=arguments) result = await tool.run(body=arguments)
return [TextContent(type="text", text=str(result))] return [TextContent(type="text", text=str(result))]

View File

@@ -1,45 +1,28 @@
import logging import logging
import os
from typing import Any from typing import Any
from mcp.server import Server from mcp.server import Server
from mcp.server.stdio import stdio_server from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool from mcp.types import TextContent, Tool
from airflow_mcp_server.config import AirflowConfig
from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def serve() -> None: async def serve(config: AirflowConfig) -> None:
"""Start MCP server in unsafe mode (all operations). """Start MCP server in unsafe mode (all operations).
Configuration precedence: Args:
1. Environment variables (highest) config: Configuration object with auth and URL settings
2. Command line arguments (if applicable)
3. Default values (lowest)
For authentication:
1. Cookie authentication (highest)
2. Auth token authentication (secondary)
""" """
# Check for AIRFLOW_BASE_URL which is always required
if "AIRFLOW_BASE_URL" not in os.environ:
raise ValueError("Missing required environment variable: AIRFLOW_BASE_URL")
# Check for either AUTH_TOKEN or COOKIE
has_auth_token = "AUTH_TOKEN" in os.environ
has_cookie = "COOKIE" in os.environ
if not has_auth_token and not has_cookie:
raise ValueError("Either AUTH_TOKEN or COOKIE environment variable must be provided")
server = Server("airflow-mcp-server-unsafe") server = Server("airflow-mcp-server-unsafe")
@server.list_tools() @server.list_tools()
async def list_tools() -> list[Tool]: async def list_tools() -> list[Tool]:
try: try:
return await get_airflow_tools(mode="unsafe") return await get_airflow_tools(config, mode="unsafe")
except Exception as e: except Exception as e:
logger.error("Failed to list tools: %s", e) logger.error("Failed to list tools: %s", e)
raise raise
@@ -47,7 +30,7 @@ async def serve() -> None:
@server.call_tool() @server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
try: try:
tool = await get_tool(name) tool = await get_tool(config, name)
async with tool.client: async with tool.client:
result = await tool.run(body=arguments) result = await tool.run(body=arguments)
return [TextContent(type="text", text=str(result))] return [TextContent(type="text", text=str(result))]

View File

@@ -1,10 +1,10 @@
import logging import logging
import os
from importlib import resources from importlib import resources
from mcp.types import Tool from mcp.types import Tool
from airflow_mcp_server.client.airflow_client import AirflowClient from airflow_mcp_server.client.airflow_client import AirflowClient
from airflow_mcp_server.config import AirflowConfig
from airflow_mcp_server.parser.operation_parser import OperationParser from airflow_mcp_server.parser.operation_parser import OperationParser
from airflow_mcp_server.tools.airflow_tool import AirflowTool from airflow_mcp_server.tools.airflow_tool import AirflowTool
@@ -13,16 +13,19 @@ logger = logging.getLogger(__name__)
_tools_cache: dict[str, AirflowTool] = {} _tools_cache: dict[str, AirflowTool] = {}
def _initialize_client() -> AirflowClient: def _initialize_client(config: AirflowConfig) -> AirflowClient:
"""Initialize Airflow client with environment variables or embedded spec. """Initialize Airflow client with configuration.
Args:
config: Configuration object with auth and URL settings
Returns: Returns:
AirflowClient instance AirflowClient instance
Raises: Raises:
ValueError: If required environment variables are missing or default spec is not found ValueError: If default spec is not found
""" """
spec_path = os.environ.get("OPENAPI_SPEC") spec_path = config.spec_path
if not spec_path: if not spec_path:
# Fallback to embedded v1.yaml # Fallback to embedded v1.yaml
try: try:
@@ -32,41 +35,33 @@ def _initialize_client() -> AirflowClient:
except Exception as e: except Exception as e:
raise ValueError("Default OpenAPI spec not found in package resources") from e raise ValueError("Default OpenAPI spec not found in package resources") from e
# Check for base URL
if "AIRFLOW_BASE_URL" not in os.environ:
raise ValueError("Missing required environment variable: AIRFLOW_BASE_URL")
# Check for either AUTH_TOKEN or COOKIE
has_auth_token = "AUTH_TOKEN" in os.environ
has_cookie = "COOKIE" in os.environ
if not has_auth_token and not has_cookie:
raise ValueError("Either AUTH_TOKEN or COOKIE environment variable must be provided")
# Initialize client with appropriate authentication method # Initialize client with appropriate authentication method
client_args = {"spec_path": spec_path, "base_url": os.environ["AIRFLOW_BASE_URL"]} client_args = {"spec_path": spec_path, "base_url": config.base_url}
# Apply cookie auth first if available (highest precedence) # Apply cookie auth first if available (highest precedence)
if has_cookie: if config.cookie:
client_args["cookie"] = os.environ["COOKIE"] client_args["cookie"] = config.cookie
# Otherwise use auth token if available # Otherwise use auth token if available
elif has_auth_token: elif config.auth_token:
client_args["auth_token"] = os.environ["AUTH_TOKEN"] client_args["auth_token"] = config.auth_token
return AirflowClient(**client_args) return AirflowClient(**client_args)
async def _initialize_tools() -> None: async def _initialize_tools(config: AirflowConfig) -> None:
"""Initialize tools cache with Airflow operations. """Initialize tools cache with Airflow operations.
Args:
config: Configuration object with auth and URL settings
Raises: Raises:
ValueError: If initialization fails ValueError: If initialization fails
""" """
global _tools_cache global _tools_cache
try: try:
client = _initialize_client() client = _initialize_client(config)
spec_path = os.environ.get("OPENAPI_SPEC") spec_path = config.spec_path
if not spec_path: if not spec_path:
with resources.files("airflow_mcp_server.resources").joinpath("v1.yaml").open("rb") as f: with resources.files("airflow_mcp_server.resources").joinpath("v1.yaml").open("rb") as f:
spec_path = f.name spec_path = f.name
@@ -84,10 +79,11 @@ async def _initialize_tools() -> None:
raise ValueError(f"Failed to initialize tools: {e}") from e raise ValueError(f"Failed to initialize tools: {e}") from e
async def get_airflow_tools(mode: str = "unsafe") -> list[Tool]: async def get_airflow_tools(config: AirflowConfig, mode: str = "unsafe") -> list[Tool]:
"""Get list of available Airflow tools based on mode. """Get list of available Airflow tools based on mode.
Args: Args:
config: Configuration object with auth and URL settings
mode: "safe" for GET operations only, "unsafe" for all operations (default) mode: "safe" for GET operations only, "unsafe" for all operations (default)
Returns: Returns:
@@ -97,7 +93,7 @@ async def get_airflow_tools(mode: str = "unsafe") -> list[Tool]:
ValueError: If initialization fails ValueError: If initialization fails
""" """
if not _tools_cache: if not _tools_cache:
await _initialize_tools() await _initialize_tools(config)
tools = [] tools = []
for operation_id, tool in _tools_cache.items(): for operation_id, tool in _tools_cache.items():
@@ -120,10 +116,11 @@ async def get_airflow_tools(mode: str = "unsafe") -> list[Tool]:
return tools return tools
async def get_tool(name: str) -> AirflowTool: async def get_tool(config: AirflowConfig, name: str) -> AirflowTool:
"""Get specific tool by name. """Get specific tool by name.
Args: Args:
config: Configuration object with auth and URL settings
name: Tool/operation name name: Tool/operation name
Returns: Returns:
@@ -134,7 +131,7 @@ async def get_tool(name: str) -> AirflowTool:
ValueError: If tool initialization fails ValueError: If tool initialization fails
""" """
if not _tools_cache: if not _tools_cache:
await _initialize_tools() await _initialize_tools(config)
if name not in _tools_cache: if name not in _tools_cache:
raise KeyError(f"Tool {name} not found") raise KeyError(f"Tool {name} not found")