Initial empty airflow project

This commit is contained in:
2025-02-09 20:08:12 +00:00
parent fd4585a9c5
commit 48126aeef2
11 changed files with 362 additions and 0 deletions

2
.astro/config.yaml Normal file
View File

@@ -0,0 +1,2 @@
project:
name: airflow-mcp-server

View File

@@ -0,0 +1 @@
# Add dag files to exempt from parse test below. ex: dags/<test-file>

View File

@@ -0,0 +1,141 @@
"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**"""
from contextlib import contextmanager
import logging
import os
import pytest
from airflow.models import DagBag, Variable, Connection
from airflow.hooks.base import BaseHook
from airflow.utils.db import initdb
# init airflow database
initdb()
# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables
# =========== MONKEYPATCH BaseHook.get_connection() ===========
def basehook_get_connection_monkeypatch(key: str, *args, **kwargs):
print(
f"Attempted to fetch connection during parse returning an empty Connection object for {key}"
)
return Connection(key)
BaseHook.get_connection = basehook_get_connection_monkeypatch
# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() ===========
# =========== MONKEYPATCH OS.GETENV() ===========
def os_getenv_monkeypatch(key: str, *args, **kwargs):
default = None
if args:
default = args[0] # os.getenv should get at most 1 arg after the key
if kwargs:
default = kwargs.get(
"default", None
) # and sometimes kwarg if people are using the sig
env_value = os.environ.get(key, None)
if env_value:
return env_value # if the env_value is set, return it
if (
key == "JENKINS_HOME" and default is None
): # fix https://github.com/astronomer/astro-cli/issues/601
return None
if default:
return default # otherwise return whatever default has been passed
return f"MOCKED_{key.upper()}_VALUE" # if absolutely nothing has been passed - return the mocked value
os.getenv = os_getenv_monkeypatch
# # =========== /MONKEYPATCH OS.GETENV() ===========
# =========== MONKEYPATCH VARIABLE.GET() ===========
class magic_dict(dict):
def __init__(self, *args, **kwargs):
self.update(*args, **kwargs)
def __getitem__(self, key):
return {}.get(key, "MOCKED_KEY_VALUE")
_no_default = object() # allow falsey defaults
def variable_get_monkeypatch(key: str, default_var=_no_default, deserialize_json=False):
print(
f"Attempted to get Variable value during parse, returning a mocked value for {key}"
)
if default_var is not _no_default:
return default_var
if deserialize_json:
return magic_dict()
return "NON_DEFAULT_MOCKED_VARIABLE_VALUE"
Variable.get = variable_get_monkeypatch
# # =========== /MONKEYPATCH VARIABLE.GET() ===========
@contextmanager
def suppress_logging(namespace):
"""
Suppress logging within a specific namespace to keep tests "clean" during build
"""
logger = logging.getLogger(namespace)
old_value = logger.disabled
logger.disabled = True
try:
yield
finally:
logger.disabled = old_value
def get_import_errors():
"""
Generate a tuple for import errors in the dag bag, and include DAGs without errors.
"""
with suppress_logging("airflow"):
dag_bag = DagBag(include_examples=False)
def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))
# Initialize an empty list to store the tuples
result = []
# Iterate over the items in import_errors
for k, v in dag_bag.import_errors.items():
result.append((strip_path_prefix(k), v.strip()))
# Check if there are DAGs without errors
for file_path in dag_bag.dags:
# Check if the file_path is not in import_errors, meaning no errors
if file_path not in dag_bag.import_errors:
result.append((strip_path_prefix(file_path), "No import errors"))
return result
@pytest.mark.parametrize(
"rel_path, rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]
)
def test_file_imports(rel_path, rv):
"""Test for import errors on a file"""
if os.path.exists(".astro/dag_integrity_exceptions.txt"):
with open(".astro/dag_integrity_exceptions.txt", "r") as f:
exceptions = f.readlines()
print(f"Exceptions: {exceptions}")
if (rv != "No import errors") and rel_path not in exceptions:
# If rv is not "No import errors," consider it a failed test
raise Exception(f"{rel_path} failed to import with message \n {rv}")
else:
# If rv is "No import errors," consider it a passed test
print(f"{rel_path} passed the import test")

8
.dockerignore Normal file
View File

@@ -0,0 +1,8 @@
astro
.git
.env
airflow_settings.yaml
logs/
.venv
airflow.db
airflow.cfg

1
Dockerfile Normal file
View File

@@ -0,0 +1 @@
FROM quay.io/astronomer/astro-runtime:12.6.0

25
airflow_settings.yaml Normal file
View File

@@ -0,0 +1,25 @@
# This file allows you to configure Airflow Connections, Pools, and Variables in a single place for local development only.
# NOTE: json dicts can be added to the conn_extra field as yaml key value pairs. See the example below.
# For more information, refer to our docs: https://www.astronomer.io/docs/astro/cli/develop-project#configure-airflow_settingsyaml-local-development-only
# For questions, reach out to: https://support.astronomer.io
# For issues create an issue ticket here: https://github.com/astronomer/astro-cli/issues
airflow:
connections:
- conn_id:
conn_type:
conn_host:
conn_schema:
conn_login:
conn_password:
conn_port:
conn_extra:
example_extra_field: example-value
pools:
- pool_name:
pool_slot:
pool_description:
variables:
- variable_name:
variable_value:

0
dags/.airflowignore Normal file
View File

100
dags/exampledag.py Normal file
View File

@@ -0,0 +1,100 @@
"""
## Astronaut ETL example DAG
This DAG queries the list of astronauts currently in space from the
Open Notify API and prints each astronaut's name and flying craft.
There are two tasks, one to get the data from the API and save the results,
and another to print the results. Both tasks are written in Python using
Airflow's TaskFlow API, which allows you to easily turn Python functions into
Airflow tasks, and automatically infer dependencies and pass data.
The second task uses dynamic task mapping to create a copy of the task for
each Astronaut in the list retrieved from the API. This list will change
depending on how many Astronauts are in space, and the DAG will adjust
accordingly each time it runs.
For more explanation and getting started instructions, see our Write your
first DAG tutorial: https://www.astronomer.io/docs/learn/get-started-with-airflow
![Picture of the ISS](https://www.esa.int/var/esa/storage/images/esa_multimedia/images/2010/02/space_station_over_earth/10293696-3-eng-GB/Space_Station_over_Earth_card_full.jpg)
"""
from airflow import Dataset
from airflow.decorators import dag, task
from pendulum import datetime
import requests
# Define the basic parameters of the DAG, like schedule and start_date
@dag(
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
doc_md=__doc__,
default_args={"owner": "Astro", "retries": 3},
tags=["example"],
)
def example_astronauts():
# Define tasks
@task(
# Define a dataset outlet for the task. This can be used to schedule downstream DAGs when this task has run.
outlets=[Dataset("current_astronauts")]
) # Define that this task updates the `current_astronauts` Dataset
def get_astronauts(**context) -> list[dict]:
"""
This task uses the requests library to retrieve a list of Astronauts
currently in space. The results are pushed to XCom with a specific key
so they can be used in a downstream pipeline. The task returns a list
of Astronauts to be used in the next task.
"""
try:
r = requests.get("http://api.open-notify.org/astros.json")
r.raise_for_status()
number_of_people_in_space = r.json()["number"]
list_of_people_in_space = r.json()["people"]
except:
print("API currently not available, using hardcoded data instead.")
number_of_people_in_space = 12
list_of_people_in_space = [
{"craft": "ISS", "name": "Oleg Kononenko"},
{"craft": "ISS", "name": "Nikolai Chub"},
{"craft": "ISS", "name": "Tracy Caldwell Dyson"},
{"craft": "ISS", "name": "Matthew Dominick"},
{"craft": "ISS", "name": "Michael Barratt"},
{"craft": "ISS", "name": "Jeanette Epps"},
{"craft": "ISS", "name": "Alexander Grebenkin"},
{"craft": "ISS", "name": "Butch Wilmore"},
{"craft": "ISS", "name": "Sunita Williams"},
{"craft": "Tiangong", "name": "Li Guangsu"},
{"craft": "Tiangong", "name": "Li Cong"},
{"craft": "Tiangong", "name": "Ye Guangfu"},
]
context["ti"].xcom_push(
key="number_of_people_in_space", value=number_of_people_in_space
)
return list_of_people_in_space
@task
def print_astronaut_craft(greeting: str, person_in_space: dict) -> None:
"""
This task creates a print statement with the name of an
Astronaut in space and the craft they are flying on from
the API request results of the previous task, along with a
greeting which is hard-coded in this example.
"""
craft = person_in_space["craft"]
name = person_in_space["name"]
print(f"{name} is currently in space flying on the {craft}! {greeting}")
# Use dynamic task mapping to run the print_astronaut_craft task for each
# Astronaut in space
print_astronaut_craft.partial(greeting="Hello! :)").expand(
person_in_space=get_astronauts() # Define dependencies using TaskFlow API syntax
)
# Instantiate the DAG
example_astronauts()

0
packages.txt Normal file
View File

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
# Astro Runtime includes the following pre-installed providers packages: https://www.astronomer.io/docs/astro/runtime-image-architecture#provider-packages

View File

@@ -0,0 +1,83 @@
"""Example DAGs test. This test ensures that all Dags have tags, retries set to two, and no import errors. This is an example pytest and may not be fit the context of your DAGs. Feel free to add and remove tests."""
import os
import logging
from contextlib import contextmanager
import pytest
from airflow.models import DagBag
@contextmanager
def suppress_logging(namespace):
logger = logging.getLogger(namespace)
old_value = logger.disabled
logger.disabled = True
try:
yield
finally:
logger.disabled = old_value
def get_import_errors():
"""
Generate a tuple for import errors in the dag bag
"""
with suppress_logging("airflow"):
dag_bag = DagBag(include_examples=False)
def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))
# prepend "(None,None)" to ensure that a test object is always created even if it's a no op.
return [(None, None)] + [
(strip_path_prefix(k), v.strip()) for k, v in dag_bag.import_errors.items()
]
def get_dags():
"""
Generate a tuple of dag_id, <DAG objects> in the DagBag
"""
with suppress_logging("airflow"):
dag_bag = DagBag(include_examples=False)
def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))
return [(k, v, strip_path_prefix(v.fileloc)) for k, v in dag_bag.dags.items()]
@pytest.mark.parametrize(
"rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]
)
def test_file_imports(rel_path, rv):
"""Test for import errors on a file"""
if rel_path and rv:
raise Exception(f"{rel_path} failed to import with message \n {rv}")
APPROVED_TAGS = {}
@pytest.mark.parametrize(
"dag_id,dag,fileloc", get_dags(), ids=[x[2] for x in get_dags()]
)
def test_dag_tags(dag_id, dag, fileloc):
"""
test if a DAG is tagged and if those TAGs are in the approved list
"""
assert dag.tags, f"{dag_id} in {fileloc} has no tags"
if APPROVED_TAGS:
assert not set(dag.tags) - APPROVED_TAGS
@pytest.mark.parametrize(
"dag_id,dag, fileloc", get_dags(), ids=[x[2] for x in get_dags()]
)
def test_dag_retries(dag_id, dag, fileloc):
"""
test if a DAG has retries set
"""
assert (
dag.default_args.get("retries", None) >= 2
), f"{dag_id} in {fileloc} must have task retries >= 2."