Airflow 3 readiness initial commit

This commit is contained in:
2025-04-23 06:17:27 +00:00
parent 4734005ae4
commit 66cd068b33
10 changed files with 839 additions and 7244 deletions

3
.gitignore vendored
View File

@@ -179,3 +179,6 @@ project_resources/
# Ruff
.ruff_cache/
# Airflow
AIRFLOW_HOME/

View File

@@ -1,13 +1,13 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.11
rev: v0.11.6
hooks:
- id: ruff
args: [--fix]
- id: ruff-format
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer

View File

@@ -6,7 +6,6 @@
<img width="380" height="200" src="https://glama.ai/mcp/servers/6gjq9w80xr/badge" />
</a>
## Overview
A [Model Context Protocol](https://modelcontextprotocol.io/) server for controlling Airflow via Airflow APIs.
@@ -14,7 +13,6 @@ A [Model Context Protocol](https://modelcontextprotocol.io/) server for controll
https://github.com/user-attachments/assets/f3e60fff-8680-4dd9-b08e-fa7db655a705
## Setup
### Usage with Claude Desktop
@@ -28,17 +26,19 @@ https://github.com/user-attachments/assets/f3e60fff-8680-4dd9-b08e-fa7db655a705
"airflow-mcp-server"
],
"env": {
"AIRFLOW_BASE_URL": "http://<host:port>/api/v1",
// Either use AUTH_TOKEN for basic auth
"AUTH_TOKEN": "<base64_encoded_username_password>",
// Or use COOKIE for cookie-based auth
"COOKIE": "<session_cookie>"
"AIRFLOW_BASE_URL": "http://<host:port>",
"AUTH_TOKEN": "<jwt_access_token>"
}
}
}
}
```
> **Note:**
> - Set `AIRFLOW_BASE_URL` to the root Airflow URL (e.g., `http://localhost:8080`).
> - Do **not** include `/api/v1` in the base URL. The server will automatically fetch the OpenAPI spec from `${AIRFLOW_BASE_URL}/openapi.json`.
> - Only `AUTH_TOKEN` (JWT) is required for authentication. Cookie and basic auth are no longer supported in Airflow 3.0.
### Operation Modes
The server supports two operation modes:
@@ -59,18 +59,12 @@ airflow-mcp-server --unsafe
### Considerations
The MCP Server expects environment variables to be set:
- `AIRFLOW_BASE_URL`: The base URL of the Airflow API
- `AUTH_TOKEN`: The token to use for basic auth (_This should be base64 encoded username:password_) (_Optional if COOKIE is provided_)
- `COOKIE`: The session cookie to use for authentication (_Optional if AUTH_TOKEN is provided_)
- `OPENAPI_SPEC`: The path to the OpenAPI spec file (_Optional_) (_defaults to latest stable release_)
- `AIRFLOW_BASE_URL`: The root URL of the Airflow instance (e.g., `http://localhost:8080`)
- `AUTH_TOKEN`: The JWT access token for authentication
**Authentication**
The server supports two authentication methods:
- **Basic Auth**: Using base64 encoded username:password via `AUTH_TOKEN` environment variable
- **Cookie**: Using session cookie via `COOKIE` environment variable
At least one of these authentication methods must be provided.
- Only JWT authentication is supported in Airflow 3.0. You must provide a valid `AUTH_TOKEN`.
**Page Limit**
@@ -78,10 +72,9 @@ The default is 100 items, but you can change it using `maximum_page_limit` optio
## Tasks
- [x] First API
- [x] Parse OpenAPI Spec
- [ ] Airflow 3 readiness
- [ ] Parse OpenAPI Spec
- [x] Safe/Unsafe mode implementation
- [x] Allow session auth
- [ ] Parse proper description with list_tools.
- [ ] Airflow config fetch (_specifically for page limit_)
- [ ] Env variables optional (_env variables might not be ideal for airflow plugins_)

View File

@@ -1,6 +1,6 @@
[project]
name = "airflow-mcp-server"
version = "0.5.0"
version = "0.6.0"
description = "MCP Server for Airflow"
readme = "README.md"
requires-python = ">=3.11"
@@ -12,7 +12,7 @@ dependencies = [
"aiohttp>=3.11.11",
"aioresponses>=0.7.7",
"importlib-resources>=6.5.0",
"mcp>=1.2.0",
"mcp>=1.6.0",
"openapi-core>=0.19.4",
"pydantic>=2.10.5",
"pyyaml>=6.0.0",
@@ -58,7 +58,6 @@ exclude = [
[tool.hatch.build.targets.wheel]
packages = ["src/airflow_mcp_server"]
package-data = {"airflow_mcp_server"= ["*.yaml"]}
[tool.hatch.build.targets.wheel.sources]
"src/airflow_mcp_server" = "airflow_mcp_server"

View File

@@ -15,10 +15,8 @@ from airflow_mcp_server.server_unsafe import serve as serve_unsafe
@click.option("--safe", "-s", is_flag=True, help="Use only read-only tools")
@click.option("--unsafe", "-u", is_flag=True, help="Use all tools (default)")
@click.option("--base-url", help="Airflow API base URL")
@click.option("--spec-path", help="Path to OpenAPI spec file")
@click.option("--auth-token", help="Authentication token")
@click.option("--cookie", help="Session cookie")
def main(verbose: int, safe: bool, unsafe: bool, base_url: str = None, spec_path: str = None, auth_token: str = None, cookie: str = None) -> None:
@click.option("--auth-token", help="Authentication token (JWT)")
def main(verbose: int, safe: bool, unsafe: bool, base_url: str = None, auth_token: str = None) -> None:
"""MCP server for Airflow"""
logging_level = logging.WARN
if verbose == 1:
@@ -29,22 +27,18 @@ def main(verbose: int, safe: bool, unsafe: bool, base_url: str = None, spec_path
logging.basicConfig(level=logging_level, stream=sys.stderr)
# Read environment variables with proper precedence
# Environment variables take precedence over CLI arguments
config_base_url = os.environ.get("AIRFLOW_BASE_URL") or base_url
config_spec_path = os.environ.get("OPENAPI_SPEC") or spec_path
config_auth_token = os.environ.get("AUTH_TOKEN") or auth_token
config_cookie = os.environ.get("COOKIE") or cookie
# Initialize configuration
try:
config = AirflowConfig(base_url=config_base_url, spec_path=config_spec_path, auth_token=config_auth_token, cookie=config_cookie)
config = AirflowConfig(base_url=config_base_url, auth_token=config_auth_token)
except ValueError as e:
click.echo(f"Configuration error: {e}", err=True)
sys.exit(1)
# Determine server mode with proper precedence
if safe and unsafe:
# CLI argument validation
raise click.UsageError("Options --safe and --unsafe are mutually exclusive")
elif safe:
# CLI argument for safe mode

View File

@@ -1,11 +1,7 @@
import logging
import re
from pathlib import Path
from types import SimpleNamespace
from typing import Any, BinaryIO, TextIO
import aiohttp
import yaml
import requests
from jsonschema_path import SchemaPath
from openapi_core import OpenAPI
from openapi_core.validation.request.validators import V31RequestValidator
@@ -33,223 +29,51 @@ class AirflowClient:
def __init__(
self,
spec_path: Path | str | dict | bytes | BinaryIO | TextIO,
base_url: str,
auth_token: str | None = None,
cookie: str | None = None,
auth_token: str,
) -> None:
"""Initialize Airflow client.
Args:
spec_path: OpenAPI spec as file path, dict, bytes, or file object
base_url: Base URL for API
auth_token: Authentication token (optional if cookie is provided)
cookie: Session cookie (optional if auth_token is provided)
auth_token: Authentication token (JWT)
Raises:
ValueError: If spec_path is invalid or spec cannot be loaded or if neither auth_token nor cookie is provided
ValueError: If required configuration is missing or OpenAPI spec cannot be loaded
"""
if not auth_token and not cookie:
raise ValueError("Either auth_token or cookie must be provided")
if not base_url:
raise ValueError("Missing required configuration: base_url")
if not auth_token:
raise ValueError("Missing required configuration: auth_token (JWT)")
self.base_url = base_url
self.auth_token = auth_token
self.headers = {"Authorization": f"Bearer {self.auth_token}"}
# Fetch OpenAPI spec from endpoint
openapi_url = f"{self.base_url.rstrip('/')}/openapi.json"
self.raw_spec = self._fetch_openapi_spec(openapi_url)
# Validate spec has required fields
if not isinstance(self.raw_spec, dict):
raise ValueError("OpenAPI spec must be a dictionary")
required_fields = ["openapi", "info", "paths"]
for field in required_fields:
if field not in self.raw_spec:
raise ValueError(f"OpenAPI spec missing required field: {field}")
validate(self.raw_spec)
self.spec = OpenAPI.from_dict(self.raw_spec)
logger.debug("OpenAPI spec loaded successfully")
if "paths" not in self.raw_spec:
raise ValueError("OpenAPI spec does not contain paths information")
self._paths = self.raw_spec["paths"]
logger.debug("Using raw spec paths")
schema_path = SchemaPath.from_dict(self.raw_spec)
self._validator = V31RequestValidator(schema_path)
def _fetch_openapi_spec(self, url: str) -> dict:
try:
# Load and parse OpenAPI spec
if isinstance(spec_path, dict):
self.raw_spec = spec_path
elif isinstance(spec_path, bytes):
self.raw_spec = yaml.safe_load(spec_path)
elif isinstance(spec_path, str | Path):
with open(spec_path) as f:
self.raw_spec = yaml.safe_load(f)
elif hasattr(spec_path, "read"):
content = spec_path.read()
if isinstance(content, bytes):
self.raw_spec = yaml.safe_load(content)
else:
self.raw_spec = yaml.safe_load(content)
else:
raise ValueError("Invalid spec_path type. Expected Path, str, dict, bytes or file-like object")
# Validate spec has required fields
if not isinstance(self.raw_spec, dict):
raise ValueError("OpenAPI spec must be a dictionary")
required_fields = ["openapi", "info", "paths"]
for field in required_fields:
if field not in self.raw_spec:
raise ValueError(f"OpenAPI spec missing required field: {field}")
# Validate OpenAPI spec format
validate(self.raw_spec)
# Initialize OpenAPI spec
self.spec = OpenAPI.from_dict(self.raw_spec)
logger.debug("OpenAPI spec loaded successfully")
# Debug raw spec
logger.debug("Raw spec keys: %s", self.raw_spec.keys())
# Get paths from raw spec
if "paths" not in self.raw_spec:
raise ValueError("OpenAPI spec does not contain paths information")
self._paths = self.raw_spec["paths"]
logger.debug("Using raw spec paths")
# Initialize request validator with schema path
schema_path = SchemaPath.from_dict(self.raw_spec)
self._validator = V31RequestValidator(schema_path)
# API configuration
self.base_url = base_url.rstrip("/")
self.headers = {"Accept": "application/json"}
# Set authentication header based on precedence (cookie > auth_token)
if cookie:
self.headers["Cookie"] = cookie
elif auth_token:
self.headers["Authorization"] = f"Basic {auth_token}"
except Exception as e:
logger.error("Failed to initialize AirflowClient: %s", e)
raise ValueError(f"Failed to initialize client: {e}")
async def __aenter__(self) -> "AirflowClient":
self._session = aiohttp.ClientSession(headers=self.headers)
return self
async def __aexit__(self, *exc) -> None:
if hasattr(self, "_session"):
await self._session.close()
delattr(self, "_session")
def _get_operation(self, operation_id: str) -> tuple[str, str, SimpleNamespace]:
"""Get operation details from OpenAPI spec.
Args:
operation_id: The operation ID to look up
Returns:
Tuple of (path, method, operation) where operation is a SimpleNamespace object
Raises:
ValueError: If operation not found
"""
try:
# Debug the paths structure
logger.debug("Looking for operation %s in paths", operation_id)
for path, path_item in self._paths.items():
for method, operation_data in path_item.items():
# Skip non-operation fields
if method.startswith("x-") or method == "parameters":
continue
# Debug each operation
logger.debug("Checking %s %s: %s", method, path, operation_data.get("operationId"))
if operation_data.get("operationId") == operation_id:
logger.debug("Found operation %s at %s %s", operation_id, method, path)
# Convert keys to snake_case and create object
converted_data = convert_dict_keys(operation_data)
operation_obj = SimpleNamespace(**converted_data)
return path, method, operation_obj
raise ValueError(f"Operation {operation_id} not found in spec")
except Exception as e:
logger.error("Error getting operation %s: %s", operation_id, e)
raise
def _validate_path_params(self, path: str, params: dict[str, Any] | None) -> None:
if not params:
params = {}
# Extract path parameter names from the path
path_params = set(re.findall(r"{([^}]+)}", path))
# Check for missing required parameters
missing_params = path_params - set(params.keys())
if missing_params:
raise ValueError(f"Missing required path parameters: {missing_params}")
# Check for invalid parameters
invalid_params = set(params.keys()) - path_params
if invalid_params:
raise ValueError(f"Invalid path parameters: {invalid_params}")
async def execute(
self,
operation_id: str,
path_params: dict[str, Any] | None = None,
query_params: dict[str, Any] | None = None,
body: dict[str, Any] | None = None,
) -> Any:
"""Execute an API operation.
Args:
operation_id: Operation ID from OpenAPI spec
path_params: URL path parameters
query_params: URL query parameters
body: Request body data
Returns:
API response data
Raises:
ValueError: If operation not found
RuntimeError: If used outside async context
aiohttp.ClientError: For HTTP/network errors
"""
if not hasattr(self, "_session") or not self._session:
raise RuntimeError("Client not in async context")
try:
# Get operation details
path, method, _ = self._get_operation(operation_id)
# Validate path parameters
self._validate_path_params(path, path_params)
# Format URL
if path_params:
path = path.format(**path_params)
url = f"{self.base_url}{path}"
logger.debug("Executing %s %s", method, url)
logger.debug("Request body: %s", body)
logger.debug("Request query params: %s", query_params)
# Dynamically set headers based on presence of body
request_headers = self.headers.copy()
if body is not None:
request_headers["Content-Type"] = "application/json"
# Make request
async with self._session.request(
method=method,
url=url,
params=query_params,
json=body,
) as response:
response.raise_for_status()
content_type = response.headers.get("Content-Type", "").lower()
# Status codes that typically have no body
no_body_statuses = {204}
if response.status in no_body_statuses:
if content_type and "application/json" in content_type:
logger.warning("Unexpected JSON body with status %s", response.status)
return await response.json() # Parse if present, though rare
logger.debug("Received %s response with no body", response.status)
return response.status
# For statuses expecting a body, check mimetype
if "application/json" in content_type:
logger.debug("Response: %s", await response.text())
return await response.json()
# Unexpected mimetype with body
response_text = await response.text()
logger.error("Unexpected mimetype %s for status %s: %s", content_type, response.status, response_text)
raise ValueError(f"Cannot parse response with mimetype {content_type} as JSON")
except aiohttp.ClientError as e:
logger.error("Error executing operation %s: %s", operation_id, e)
raise
except Exception as e:
logger.error("Error executing operation %s: %s", operation_id, e)
raise ValueError(f"Failed to execute operation: {e}")
response = requests.get(url, headers=self.headers)
response.raise_for_status()
except requests.RequestException as e:
raise ValueError(f"Failed to fetch OpenAPI spec from {url}: {e}")
return response.json()

View File

@@ -1,14 +1,12 @@
class AirflowConfig:
"""Centralized configuration for Airflow MCP server."""
def __init__(self, base_url: str | None = None, spec_path: str | None = None, auth_token: str | None = None, cookie: str | None = None) -> None:
def __init__(self, base_url: str | None = None, auth_token: str | None = None) -> None:
"""Initialize configuration with provided values.
Args:
base_url: Airflow API base URL
spec_path: Path to OpenAPI spec file
auth_token: Authentication token
cookie: Session cookie
auth_token: Authentication token (JWT)
Raises:
ValueError: If required configuration is missing
@@ -17,9 +15,6 @@ class AirflowConfig:
if not self.base_url:
raise ValueError("Missing required configuration: base_url")
self.spec_path = spec_path
self.auth_token = auth_token
self.cookie = cookie
if not self.auth_token and not self.cookie:
raise ValueError("Either auth_token or cookie must be provided")
if not self.auth_token:
raise ValueError("Missing required configuration: auth_token (JWT)")

File diff suppressed because it is too large Load Diff

View File

@@ -1,211 +1,43 @@
import logging
from importlib import resources
from pathlib import Path
from typing import Any
from unittest.mock import patch
import aiohttp
import pytest
import yaml
from aioresponses import aioresponses
from airflow_mcp_server.client.airflow_client import AirflowClient
from openapi_core import OpenAPI
from airflow_mcp_server.client.airflow_client import AirflowClient
logging.basicConfig(level=logging.DEBUG)
def create_valid_spec(paths: dict[str, Any] | None = None) -> dict[str, Any]:
return {"openapi": "3.0.0", "info": {"title": "Airflow API", "version": "1.0.0"}, "paths": paths or {}}
def mock_openapi_response(*args, **kwargs):
class MockResponse:
def __init__(self):
self.status_code = 200
def json(self):
return {"openapi": "3.0.0", "info": {"title": "Airflow API", "version": "1.0.0"}, "paths": {}}
return MockResponse()
@pytest.fixture
def client() -> AirflowClient:
with resources.files("airflow_mcp_server.resources").joinpath("v1.yaml").open("rb") as f:
spec = yaml.safe_load(f)
return AirflowClient(
spec_path=spec,
base_url="http://localhost:8080/api/v1",
auth_token="test-token",
)
def test_init_client_initialization(client: AirflowClient) -> None:
assert isinstance(client.spec, OpenAPI)
assert client.base_url == "http://localhost:8080/api/v1"
assert client.headers["Authorization"] == "Basic test-token"
assert "Cookie" not in client.headers
def test_init_client_with_cookie() -> None:
with resources.files("airflow_mcp_server.resources").joinpath("v1.yaml").open("rb") as f:
spec = yaml.safe_load(f)
client = AirflowClient(
spec_path=spec,
base_url="http://localhost:8080/api/v1",
cookie="session=b18e8c5e-92f5-4d1e-a8f2-7c1b62110cae.vmX5kqDq5TdvT9BzTlypMVclAwM",
)
assert isinstance(client.spec, OpenAPI)
assert client.base_url == "http://localhost:8080/api/v1"
assert "Authorization" not in client.headers
assert client.headers["Cookie"] == "session=b18e8c5e-92f5-4d1e-a8f2-7c1b62110cae.vmX5kqDq5TdvT9BzTlypMVclAwM"
def test_init_client_missing_auth() -> None:
with resources.files("airflow_mcp_server.resources").joinpath("v1.yaml").open("rb") as f:
spec = yaml.safe_load(f)
with pytest.raises(ValueError, match="Either auth_token or cookie must be provided"):
AirflowClient(
spec_path=spec,
def client():
with patch("airflow_mcp_server.client.airflow_client.requests.get", side_effect=mock_openapi_response):
return AirflowClient(
base_url="http://localhost:8080/api/v1",
auth_token="test-token",
)
def test_init_load_spec_from_bytes() -> None:
spec_bytes = yaml.dump(create_valid_spec()).encode()
client = AirflowClient(spec_path=spec_bytes, base_url="http://test", auth_token="test")
assert client.raw_spec is not None
def test_init_client_initialization(client):
assert isinstance(client.spec, OpenAPI)
assert client.base_url == "http://localhost:8080/api/v1"
assert client.headers["Authorization"] == "Bearer test-token"
def test_init_load_spec_from_path(tmp_path: Path) -> None:
spec_file = tmp_path / "test_spec.yaml"
spec_file.write_text(yaml.dump(create_valid_spec()))
client = AirflowClient(spec_path=spec_file, base_url="http://test", auth_token="test")
assert client.raw_spec is not None
def test_init_invalid_spec() -> None:
with pytest.raises(ValueError):
AirflowClient(spec_path={"invalid": "spec"}, base_url="http://test", auth_token="test")
def test_init_missing_paths_in_spec() -> None:
with pytest.raises(ValueError):
AirflowClient(spec_path={"openapi": "3.0.0"}, base_url="http://test", auth_token="test")
def test_ops_get_operation(client: AirflowClient) -> None:
path, method, operation = client._get_operation("get_dags")
assert path == "/dags"
assert method == "get"
assert operation.operation_id == "get_dags"
path, method, operation = client._get_operation("get_dag")
assert path == "/dags/{dag_id}"
assert method == "get"
assert operation.operation_id == "get_dag"
def test_ops_nonexistent_operation(client: AirflowClient) -> None:
with pytest.raises(ValueError, match="Operation nonexistent not found in spec"):
client._get_operation("nonexistent")
def test_ops_case_sensitive_operation(client: AirflowClient) -> None:
with pytest.raises(ValueError):
client._get_operation("GET_DAGS")
@pytest.mark.asyncio
async def test_exec_without_context() -> None:
client = AirflowClient(
spec_path=create_valid_spec(),
base_url="http://test",
auth_token="test",
)
with pytest.raises(RuntimeError, match="Client not in async context"):
await client.execute("get_dags")
@pytest.mark.asyncio
async def test_exec_get_dags(client: AirflowClient) -> None:
expected_response = {
"dags": [
{
"dag_id": "test_dag",
"is_active": True,
"is_paused": False,
}
],
"total_entries": 1,
}
with aioresponses() as mock:
async with client:
mock.get(
"http://localhost:8080/api/v1/dags?limit=100",
status=200,
payload=expected_response,
)
response = await client.execute("get_dags", query_params={"limit": 100})
assert response == expected_response
@pytest.mark.asyncio
async def test_exec_get_dag(client: AirflowClient) -> None:
expected_response = {
"dag_id": "test_dag",
"is_active": True,
"is_paused": False,
}
with aioresponses() as mock:
async with client:
mock.get(
"http://localhost:8080/api/v1/dags/test_dag",
status=200,
payload=expected_response,
)
response = await client.execute(
"get_dag",
path_params={"dag_id": "test_dag"},
)
assert response == expected_response
@pytest.mark.asyncio
async def test_exec_invalid_params(client: AirflowClient) -> None:
with pytest.raises(ValueError):
async with client:
# Test with missing required parameter
await client.execute("get_dag", path_params={})
with pytest.raises(ValueError):
async with client:
# Test with invalid parameter name
await client.execute("get_dag", path_params={"invalid": "value"})
@pytest.mark.asyncio
async def test_exec_timeout(client: AirflowClient) -> None:
with aioresponses() as mock:
mock.get("http://localhost:8080/api/v1/dags", exception=aiohttp.ClientError("Timeout"))
async with client:
with pytest.raises(aiohttp.ClientError):
await client.execute("get_dags")
@pytest.mark.asyncio
async def test_exec_error_response(client: AirflowClient) -> None:
with aioresponses() as mock:
async with client:
mock.get(
"http://localhost:8080/api/v1/dags",
status=403,
body="Forbidden",
)
with pytest.raises(aiohttp.ClientResponseError):
await client.execute("get_dags")
@pytest.mark.asyncio
async def test_exec_session_management(client: AirflowClient) -> None:
async with client:
with aioresponses() as mock:
mock.get(
"http://localhost:8080/api/v1/dags",
status=200,
payload={"dags": []},
)
await client.execute("get_dags")
with pytest.raises(RuntimeError):
await client.execute("get_dags")
def test_init_client_missing_auth():
with pytest.raises(ValueError, match="auth_token"):
AirflowClient(
base_url="http://localhost:8080/api/v1",
auth_token=None,
)

1380
uv.lock generated

File diff suppressed because it is too large Load Diff