3 Commits

5 changed files with 65 additions and 58 deletions

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "airflow-mcp-server" name = "airflow-mcp-server"
version = "0.6.1" version = "0.6.2"
description = "MCP Server for Airflow" description = "MCP Server for Airflow"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"

View File

@@ -1,52 +0,0 @@
import logging
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from airflow_mcp_server.config import AirflowConfig
from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool
# ===========THIS IS FOR DEBUGGING WITH MCP INSPECTOR===================
# import sys
# Configure root logger to stderr
# logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stderr)])
# Disable Uvicorn's default handlers
# logging.getLogger("uvicorn.error").handlers = []
# logging.getLogger("uvicorn.access").handlers = []
# ======================================================================
logger = logging.getLogger(__name__)
async def serve(config: AirflowConfig) -> None:
"""Start MCP server.
Args:
config: Configuration object with auth and URL settings
"""
server = Server("airflow-mcp-server")
@server.list_tools()
async def list_tools() -> list[Tool]:
try:
return await get_airflow_tools(config)
except Exception as e:
logger.error("Failed to list tools: %s", e)
raise
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
try:
tool = await get_tool(config, name)
async with tool.client:
result = await tool.run(body=arguments)
return [TextContent(type="text", text=str(result))]
except Exception as e:
logger.error("Tool execution failed: %s", e)
raise
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options, raise_exceptions=True)

View File

@@ -1,9 +1,10 @@
import logging import logging
from typing import Any from typing import Any
import anyio
from mcp.server import Server from mcp.server import Server
from mcp.server.stdio import stdio_server from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool from mcp.types import Prompt, Resource, ResourceTemplate, TextContent, Tool
from airflow_mcp_server.config import AirflowConfig from airflow_mcp_server.config import AirflowConfig
from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool
@@ -27,6 +28,24 @@ async def serve(config: AirflowConfig) -> None:
logger.error("Failed to list tools: %s", e) logger.error("Failed to list tools: %s", e)
raise raise
@server.list_resources()
async def list_resources() -> list[Resource]:
"""List available resources (returns empty list)."""
logger.info("Resources list requested - returning empty list")
return []
@server.list_resource_templates()
async def list_resource_templates() -> list[ResourceTemplate]:
"""List available resource templates (returns empty list)."""
logger.info("Resource templates list requested - returning empty list")
return []
@server.list_prompts()
async def list_prompts() -> list[Prompt]:
"""List available prompts (returns empty list)."""
logger.info("Prompts list requested - returning empty list")
return []
@server.call_tool() @server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
try: try:
@@ -42,4 +61,10 @@ async def serve(config: AirflowConfig) -> None:
options = server.create_initialization_options() options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream): async with stdio_server() as (read_stream, write_stream):
try:
await server.run(read_stream, write_stream, options, raise_exceptions=True) await server.run(read_stream, write_stream, options, raise_exceptions=True)
except anyio.BrokenResourceError:
logger.error("BrokenResourceError: Stream was closed unexpectedly. Exiting gracefully.")
except Exception as e:
logger.error(f"Unexpected error in server.run: {e}")
raise

View File

@@ -1,13 +1,23 @@
import logging import logging
from typing import Any from typing import Any
import anyio
from mcp.server import Server from mcp.server import Server
from mcp.server.stdio import stdio_server from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool from mcp.types import Prompt, Resource, ResourceTemplate, TextContent, Tool
from airflow_mcp_server.config import AirflowConfig from airflow_mcp_server.config import AirflowConfig
from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool from airflow_mcp_server.tools.tool_manager import get_airflow_tools, get_tool
# ===========THIS IS FOR DEBUGGING WITH MCP INSPECTOR===================
# import sys
# Configure root logger to stderr
# logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stderr)])
# Disable Uvicorn's default handlers
# logging.getLogger("uvicorn.error").handlers = []
# logging.getLogger("uvicorn.access").handlers = []
# ======================================================================
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -27,6 +37,24 @@ async def serve(config: AirflowConfig) -> None:
logger.error("Failed to list tools: %s", e) logger.error("Failed to list tools: %s", e)
raise raise
@server.list_resources()
async def list_resources() -> list[Resource]:
"""List available resources (returns empty list)."""
logger.info("Resources list requested - returning empty list")
return []
@server.list_resource_templates()
async def list_resource_templates() -> list[ResourceTemplate]:
"""List available resource templates (returns empty list)."""
logger.info("Resource templates list requested - returning empty list")
return []
@server.list_prompts()
async def list_prompts() -> list[Prompt]:
"""List available prompts (returns empty list)."""
logger.info("Prompts list requested - returning empty list")
return []
@server.call_tool() @server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
try: try:
@@ -40,4 +68,10 @@ async def serve(config: AirflowConfig) -> None:
options = server.create_initialization_options() options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream): async with stdio_server() as (read_stream, write_stream):
try:
await server.run(read_stream, write_stream, options, raise_exceptions=True) await server.run(read_stream, write_stream, options, raise_exceptions=True)
except anyio.BrokenResourceError:
logger.error("BrokenResourceError: Stream was closed unexpectedly. Exiting gracefully.")
except Exception as e:
logger.error(f"Unexpected error in server.run: {e}")
raise

2
uv.lock generated
View File

@@ -112,7 +112,7 @@ wheels = [
[[package]] [[package]]
name = "airflow-mcp-server" name = "airflow-mcp-server"
version = "0.6.1" version = "0.6.2"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "aiofiles" }, { name = "aiofiles" },