diff --git a/airflow-wingman/pyproject.toml b/airflow-wingman/pyproject.toml
index 37d22a1..5dfb861 100644
--- a/airflow-wingman/pyproject.toml
+++ b/airflow-wingman/pyproject.toml
@@ -10,7 +10,9 @@ authors = [
]
dependencies = [
"apache-airflow>=2.10.0",
- "airflow-mcp-server>=0.2.0"
+ "airflow-mcp-server>=0.2.0",
+ "openai>=1.64.0",
+ "anthropic>=0.46.0"
]
classifiers = [
"Development Status :: 3 - Alpha",
diff --git a/airflow-wingman/src/airflow_wingman/llm_client.py b/airflow-wingman/src/airflow_wingman/llm_client.py
new file mode 100644
index 0000000..dc2543c
--- /dev/null
+++ b/airflow-wingman/src/airflow_wingman/llm_client.py
@@ -0,0 +1,109 @@
+"""
+Client for making API calls to various LLM providers using their official SDKs.
+"""
+
+from collections.abc import AsyncGenerator
+
+from anthropic import AsyncAnthropic
+from openai import AsyncOpenAI
+
+
+class LLMClient:
+ def __init__(self, api_key: str):
+ """Initialize the LLM client.
+
+ Args:
+ api_key: API key for the provider
+ """
+ self.api_key = api_key
+ self.openai_client = AsyncOpenAI(api_key=api_key)
+ self.anthropic_client = AsyncAnthropic(api_key=api_key)
+ self.openrouter_client = AsyncOpenAI(
+ base_url="https://openrouter.ai/api/v1",
+ api_key=api_key,
+ default_headers={
+ "HTTP-Referer": "http://localhost:8080", # Required by OpenRouter
+ "X-Title": "Airflow Wingman", # Required by OpenRouter
+ },
+ )
+
+ async def chat_completion(
+ self, messages: list[dict[str, str]], model: str, provider: str, temperature: float = 0.7, max_tokens: int | None = None, stream: bool = False
+ ) -> AsyncGenerator[str, None] | dict:
+ """Send a chat completion request to the specified provider.
+
+ Args:
+ messages: List of message dictionaries with 'role' and 'content'
+ model: Model identifier
+ provider: Provider identifier (openai, anthropic, openrouter)
+ temperature: Sampling temperature (0-1)
+ max_tokens: Maximum tokens to generate
+ stream: Whether to stream the response
+
+ Returns:
+ If stream=True, returns an async generator yielding response chunks
+ If stream=False, returns the complete response
+ """
+ try:
+ if provider == "openai":
+ return await self._openai_chat_completion(messages, model, temperature, max_tokens, stream)
+ elif provider == "anthropic":
+ return await self._anthropic_chat_completion(messages, model, temperature, max_tokens, stream)
+ elif provider == "openrouter":
+ return await self._openrouter_chat_completion(messages, model, temperature, max_tokens, stream)
+ else:
+ return {"error": f"Unknown provider: {provider}"}
+ except Exception as e:
+ return {"error": f"API request failed: {str(e)}"}
+
+ async def _openai_chat_completion(self, messages: list[dict[str, str]], model: str, temperature: float, max_tokens: int | None, stream: bool):
+ """Handle OpenAI chat completion requests."""
+ response = await self.openai_client.chat.completions.create(model=model, messages=messages, temperature=temperature, max_tokens=max_tokens, stream=stream)
+
+ if stream:
+
+ async def response_generator():
+ async for chunk in response:
+ if chunk.choices[0].delta.content:
+ yield chunk.choices[0].delta.content
+
+ return response_generator()
+ else:
+ return {"content": response.choices[0].message.content}
+
+ async def _anthropic_chat_completion(self, messages: list[dict[str, str]], model: str, temperature: float, max_tokens: int | None, stream: bool):
+ """Handle Anthropic chat completion requests."""
+ # Convert messages to Anthropic format
+ system_message = next((m["content"] for m in messages if m["role"] == "system"), None)
+ conversation = []
+ for m in messages:
+ if m["role"] != "system":
+ conversation.append({"role": "assistant" if m["role"] == "assistant" else "user", "content": m["content"]})
+
+ response = await self.anthropic_client.messages.create(model=model, messages=conversation, system=system_message, temperature=temperature, max_tokens=max_tokens, stream=stream)
+
+ if stream:
+
+ async def response_generator():
+ async for chunk in response:
+ if chunk.delta.text:
+ yield chunk.delta.text
+
+ return response_generator()
+ else:
+ return {"content": response.content[0].text}
+
+ async def _openrouter_chat_completion(self, messages: list[dict[str, str]], model: str, temperature: float, max_tokens: int | None, stream: bool):
+ """Handle OpenRouter chat completion requests."""
+ response = await self.openrouter_client.chat.completions.create(model=model, messages=messages, temperature=temperature, max_tokens=max_tokens, stream=stream)
+
+ if stream:
+
+ async def response_generator():
+ async for chunk in response:
+ if chunk.choices[0].delta.content:
+ yield chunk.choices[0].delta.content
+
+ return response_generator()
+ else:
+ return {"content": response.choices[0].message.content}
diff --git a/airflow-wingman/src/airflow_wingman/plugin.py b/airflow-wingman/src/airflow_wingman/plugin.py
index 78b01fd..b1e37d6 100644
--- a/airflow-wingman/src/airflow_wingman/plugin.py
+++ b/airflow-wingman/src/airflow_wingman/plugin.py
@@ -1,10 +1,11 @@
+"""Plugin definition for Airflow Wingman."""
+
from airflow.plugins_manager import AirflowPlugin
-from flask_appbuilder import BaseView as AppBuilderBaseView, expose
from flask import Blueprint
-from airflow_wingman.llms_models import MODELS
-
+from airflow_wingman.views import WingmanView
+# Create Blueprint
bp = Blueprint(
"wingman",
__name__,
@@ -13,21 +14,6 @@ bp = Blueprint(
static_url_path="/static/wingman",
)
-
-class WingmanView(AppBuilderBaseView):
- route_base = "/wingman"
- default_view = "chat"
-
- @expose("/")
- def chat(self):
- """
- Chat interface for Airflow Wingman.
- """
- return self.render_template(
- "wingman_chat.html", title="Airflow Wingman", models=MODELS
- )
-
-
# Create AppBuilder View
v_appbuilder_view = WingmanView()
v_appbuilder_package = {
@@ -39,6 +25,8 @@ v_appbuilder_package = {
# Create Plugin
class WingmanPlugin(AirflowPlugin):
+ """Airflow plugin for Wingman chat interface."""
+
name = "wingman"
flask_blueprints = [bp]
appbuilder_views = [v_appbuilder_package]
diff --git a/airflow-wingman/src/airflow_wingman/templates/wingman_chat.html b/airflow-wingman/src/airflow_wingman/templates/wingman_chat.html
index 6c6360b..3187fde 100644
--- a/airflow-wingman/src/airflow_wingman/templates/wingman_chat.html
+++ b/airflow-wingman/src/airflow_wingman/templates/wingman_chat.html
@@ -11,6 +11,8 @@
Note: For best results with function/tool calling capabilities, we recommend using models like Claude-3.5 Sonnet or GPT-4o. These models excel at understanding and using complex tools effectively.
+
+
Security: For your security, API keys are required for each session and are never stored. If you refresh the page or close the browser, you'll need to enter your API key again. This ensures your API keys remain secure in shared environments.