diff --git a/src/airflow_mcp_server/__init__.py b/src/airflow_mcp_server/__init__.py index 3b90bcb..fe6efa2 100644 --- a/src/airflow_mcp_server/__init__.py +++ b/src/airflow_mcp_server/__init__.py @@ -1,9 +1,11 @@ import asyncio import logging +import os import sys import click +from airflow_mcp_server.config import AirflowConfig from airflow_mcp_server.server_safe import serve as serve_safe from airflow_mcp_server.server_unsafe import serve as serve_unsafe @@ -12,7 +14,11 @@ from airflow_mcp_server.server_unsafe import serve as serve_unsafe @click.option("-v", "--verbose", count=True, help="Increase verbosity") @click.option("--safe", "-s", is_flag=True, help="Use only read-only tools") @click.option("--unsafe", "-u", is_flag=True, help="Use all tools (default)") -def main(verbose: int, safe: bool, unsafe: bool) -> None: +@click.option("--base-url", help="Airflow API base URL") +@click.option("--spec-path", help="Path to OpenAPI spec file") +@click.option("--auth-token", help="Authentication token") +@click.option("--cookie", help="Session cookie") +def main(verbose: int, safe: bool, unsafe: bool, base_url: str = None, spec_path: str = None, auth_token: str = None, cookie: str = None) -> None: """MCP server for Airflow""" logging_level = logging.WARN if verbose == 1: @@ -22,16 +28,30 @@ def main(verbose: int, safe: bool, unsafe: bool) -> None: logging.basicConfig(level=logging_level, stream=sys.stderr) + # Read environment variables with proper precedence + # Environment variables take precedence over CLI arguments + config_base_url = os.environ.get("AIRFLOW_BASE_URL") or base_url + config_spec_path = os.environ.get("AIRFLOW_SPEC_PATH") or spec_path + config_auth_token = os.environ.get("AIRFLOW_AUTH_TOKEN") or auth_token + config_cookie = os.environ.get("AIRFLOW_COOKIE") or cookie + + # Initialize configuration + try: + config = AirflowConfig(base_url=config_base_url, spec_path=config_spec_path, auth_token=config_auth_token, cookie=config_cookie) + except ValueError as e: + click.echo(f"Configuration error: {e}", err=True) + sys.exit(1) + # Determine server mode with proper precedence if safe and unsafe: # CLI argument validation raise click.UsageError("Options --safe and --unsafe are mutually exclusive") elif safe: # CLI argument for safe mode - asyncio.run(serve_safe()) + asyncio.run(serve_safe(config)) else: # Default to unsafe mode - asyncio.run(serve_unsafe()) + asyncio.run(serve_unsafe(config)) if __name__ == "__main__": diff --git a/src/airflow_mcp_server/config.py b/src/airflow_mcp_server/config.py new file mode 100644 index 0000000..f3bb9d2 --- /dev/null +++ b/src/airflow_mcp_server/config.py @@ -0,0 +1,25 @@ +class AirflowConfig: + """Centralized configuration for Airflow MCP server.""" + + def __init__(self, base_url: str | None = None, spec_path: str | None = None, auth_token: str | None = None, cookie: str | None = None) -> None: + """Initialize configuration with provided values. + + Args: + base_url: Airflow API base URL + spec_path: Path to OpenAPI spec file + auth_token: Authentication token + cookie: Session cookie + + Raises: + ValueError: If required configuration is missing + """ + self.base_url = base_url + if not self.base_url: + raise ValueError("Missing required configuration: base_url") + + self.spec_path = spec_path + self.auth_token = auth_token + self.cookie = cookie + + if not self.auth_token and not self.cookie: + raise ValueError("Either auth_token or cookie must be provided") diff --git a/src/airflow_mcp_server/server.py b/src/airflow_mcp_server/server.py index 87f35e2..c9063b4 100644 --- a/src/airflow_mcp_server/server.py +++ b/src/airflow_mcp_server/server.py @@ -1,11 +1,11 @@ import logging -import os from typing import Any from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import TextContent, Tool +from airflow_mcp_server.config import AirflowConfig from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool # ===========THIS IS FOR DEBUGGING WITH MCP INSPECTOR=================== @@ -20,35 +20,18 @@ from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool logger = logging.getLogger(__name__) -async def serve() -> None: +async def serve(config: AirflowConfig) -> None: """Start MCP server. - Configuration precedence: - 1. Environment variables (highest) - 2. Command line arguments (if applicable) - 3. Default values (lowest) - - For authentication: - 1. Cookie authentication (highest) - 2. Auth token authentication (secondary) + Args: + config: Configuration object with auth and URL settings """ - # Check for AIRFLOW_BASE_URL which is always required - if "AIRFLOW_BASE_URL" not in os.environ: - raise ValueError("Missing required environment variable: AIRFLOW_BASE_URL") - - # Check for either AUTH_TOKEN or COOKIE - has_auth_token = "AUTH_TOKEN" in os.environ - has_cookie = "COOKIE" in os.environ - - if not has_auth_token and not has_cookie: - raise ValueError("Either AUTH_TOKEN or COOKIE environment variable must be provided") - server = Server("airflow-mcp-server") @server.list_tools() async def list_tools() -> list[Tool]: try: - return await get_airflow_tools() + return await get_airflow_tools(config) except Exception as e: logger.error("Failed to list tools: %s", e) raise @@ -56,7 +39,7 @@ async def serve() -> None: @server.call_tool() async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: try: - tool = await get_tool(name) + tool = await get_tool(config, name) async with tool.client: result = await tool.run(body=arguments) return [TextContent(type="text", text=str(result))] diff --git a/src/airflow_mcp_server/server_safe.py b/src/airflow_mcp_server/server_safe.py index 70a0ecf..543b3b5 100644 --- a/src/airflow_mcp_server/server_safe.py +++ b/src/airflow_mcp_server/server_safe.py @@ -1,45 +1,28 @@ import logging -import os from typing import Any from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import TextContent, Tool +from airflow_mcp_server.config import AirflowConfig from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool logger = logging.getLogger(__name__) -async def serve() -> None: +async def serve(config: AirflowConfig) -> None: """Start MCP server in safe mode (read-only operations). - Configuration precedence: - 1. Environment variables (highest) - 2. Command line arguments (if applicable) - 3. Default values (lowest) - - For authentication: - 1. Cookie authentication (highest) - 2. Auth token authentication (secondary) + Args: + config: Configuration object with auth and URL settings """ - # Check for AIRFLOW_BASE_URL which is always required - if "AIRFLOW_BASE_URL" not in os.environ: - raise ValueError("Missing required environment variable: AIRFLOW_BASE_URL") - - # Check for either AUTH_TOKEN or COOKIE - has_auth_token = "AUTH_TOKEN" in os.environ - has_cookie = "COOKIE" in os.environ - - if not has_auth_token and not has_cookie: - raise ValueError("Either AUTH_TOKEN or COOKIE environment variable must be provided") - server = Server("airflow-mcp-server-safe") @server.list_tools() async def list_tools() -> list[Tool]: try: - return await get_airflow_tools(mode="safe") + return await get_airflow_tools(config, mode="safe") except Exception as e: logger.error("Failed to list tools: %s", e) raise @@ -49,7 +32,7 @@ async def serve() -> None: try: if not name.startswith("get_"): raise ValueError("Only GET operations allowed in safe mode") - tool = await get_tool(name) + tool = await get_tool(config, name) async with tool.client: result = await tool.run(body=arguments) return [TextContent(type="text", text=str(result))] diff --git a/src/airflow_mcp_server/server_unsafe.py b/src/airflow_mcp_server/server_unsafe.py index a47cfc1..347db5c 100644 --- a/src/airflow_mcp_server/server_unsafe.py +++ b/src/airflow_mcp_server/server_unsafe.py @@ -1,45 +1,28 @@ import logging -import os from typing import Any from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import TextContent, Tool +from airflow_mcp_server.config import AirflowConfig from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool logger = logging.getLogger(__name__) -async def serve() -> None: +async def serve(config: AirflowConfig) -> None: """Start MCP server in unsafe mode (all operations). - Configuration precedence: - 1. Environment variables (highest) - 2. Command line arguments (if applicable) - 3. Default values (lowest) - - For authentication: - 1. Cookie authentication (highest) - 2. Auth token authentication (secondary) + Args: + config: Configuration object with auth and URL settings """ - # Check for AIRFLOW_BASE_URL which is always required - if "AIRFLOW_BASE_URL" not in os.environ: - raise ValueError("Missing required environment variable: AIRFLOW_BASE_URL") - - # Check for either AUTH_TOKEN or COOKIE - has_auth_token = "AUTH_TOKEN" in os.environ - has_cookie = "COOKIE" in os.environ - - if not has_auth_token and not has_cookie: - raise ValueError("Either AUTH_TOKEN or COOKIE environment variable must be provided") - server = Server("airflow-mcp-server-unsafe") @server.list_tools() async def list_tools() -> list[Tool]: try: - return await get_airflow_tools(mode="unsafe") + return await get_airflow_tools(config, mode="unsafe") except Exception as e: logger.error("Failed to list tools: %s", e) raise @@ -47,7 +30,7 @@ async def serve() -> None: @server.call_tool() async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: try: - tool = await get_tool(name) + tool = await get_tool(config, name) async with tool.client: result = await tool.run(body=arguments) return [TextContent(type="text", text=str(result))] diff --git a/src/airflow_mcp_server/tools/tool_manager.py b/src/airflow_mcp_server/tools/tool_manager.py index 8106cd3..3588cc7 100644 --- a/src/airflow_mcp_server/tools/tool_manager.py +++ b/src/airflow_mcp_server/tools/tool_manager.py @@ -1,10 +1,10 @@ import logging -import os from importlib import resources from mcp.types import Tool from airflow_mcp_server.client.airflow_client import AirflowClient +from airflow_mcp_server.config import AirflowConfig from airflow_mcp_server.parser.operation_parser import OperationParser from airflow_mcp_server.tools.airflow_tool import AirflowTool @@ -13,16 +13,19 @@ logger = logging.getLogger(__name__) _tools_cache: dict[str, AirflowTool] = {} -def _initialize_client() -> AirflowClient: - """Initialize Airflow client with environment variables or embedded spec. +def _initialize_client(config: AirflowConfig) -> AirflowClient: + """Initialize Airflow client with configuration. + + Args: + config: Configuration object with auth and URL settings Returns: AirflowClient instance Raises: - ValueError: If required environment variables are missing or default spec is not found + ValueError: If default spec is not found """ - spec_path = os.environ.get("OPENAPI_SPEC") + spec_path = config.spec_path if not spec_path: # Fallback to embedded v1.yaml try: @@ -32,41 +35,33 @@ def _initialize_client() -> AirflowClient: except Exception as e: raise ValueError("Default OpenAPI spec not found in package resources") from e - # Check for base URL - if "AIRFLOW_BASE_URL" not in os.environ: - raise ValueError("Missing required environment variable: AIRFLOW_BASE_URL") - - # Check for either AUTH_TOKEN or COOKIE - has_auth_token = "AUTH_TOKEN" in os.environ - has_cookie = "COOKIE" in os.environ - - if not has_auth_token and not has_cookie: - raise ValueError("Either AUTH_TOKEN or COOKIE environment variable must be provided") - # Initialize client with appropriate authentication method - client_args = {"spec_path": spec_path, "base_url": os.environ["AIRFLOW_BASE_URL"]} + client_args = {"spec_path": spec_path, "base_url": config.base_url} # Apply cookie auth first if available (highest precedence) - if has_cookie: - client_args["cookie"] = os.environ["COOKIE"] + if config.cookie: + client_args["cookie"] = config.cookie # Otherwise use auth token if available - elif has_auth_token: - client_args["auth_token"] = os.environ["AUTH_TOKEN"] + elif config.auth_token: + client_args["auth_token"] = config.auth_token return AirflowClient(**client_args) -async def _initialize_tools() -> None: +async def _initialize_tools(config: AirflowConfig) -> None: """Initialize tools cache with Airflow operations. + Args: + config: Configuration object with auth and URL settings + Raises: ValueError: If initialization fails """ global _tools_cache try: - client = _initialize_client() - spec_path = os.environ.get("OPENAPI_SPEC") + client = _initialize_client(config) + spec_path = config.spec_path if not spec_path: with resources.files("airflow_mcp_server.resources").joinpath("v1.yaml").open("rb") as f: spec_path = f.name @@ -84,10 +79,11 @@ async def _initialize_tools() -> None: raise ValueError(f"Failed to initialize tools: {e}") from e -async def get_airflow_tools(mode: str = "unsafe") -> list[Tool]: +async def get_airflow_tools(config: AirflowConfig, mode: str = "unsafe") -> list[Tool]: """Get list of available Airflow tools based on mode. Args: + config: Configuration object with auth and URL settings mode: "safe" for GET operations only, "unsafe" for all operations (default) Returns: @@ -97,7 +93,7 @@ async def get_airflow_tools(mode: str = "unsafe") -> list[Tool]: ValueError: If initialization fails """ if not _tools_cache: - await _initialize_tools() + await _initialize_tools(config) tools = [] for operation_id, tool in _tools_cache.items(): @@ -120,10 +116,11 @@ async def get_airflow_tools(mode: str = "unsafe") -> list[Tool]: return tools -async def get_tool(name: str) -> AirflowTool: +async def get_tool(config: AirflowConfig, name: str) -> AirflowTool: """Get specific tool by name. Args: + config: Configuration object with auth and URL settings name: Tool/operation name Returns: @@ -134,7 +131,7 @@ async def get_tool(name: str) -> AirflowTool: ValueError: If tool initialization fails """ if not _tools_cache: - await _initialize_tools() + await _initialize_tools(config) if name not in _tools_cache: raise KeyError(f"Tool {name} not found")