aggregate(download.expand(url=fetch_urls()))
XCom (short for cross‑communication ) is Airflow’s built‑in mechanism for exchanging small pieces of data between tasks. When used wisely, they unlock powerful patterns. When abused, they break your DAGs. Let’s see how to use them correctly. XComs are key‑value pairs stored in Airflow’s metadata database. A task can push an XCom (write a value under a key), and another task can pull that value (read it). xcom in airflow
@task def consume_two(data): return f"Got data['source']" @task def fetch_urls() -> list[str]: return ["http://a.com", "http://b.com"] @task def download(url: str) -> str: # download content return f"content_of_url" aggregate(download
@task def extract() -> dict: return "user_id": 123, "name": "Alice" # pushed automatically Let’s see how to use them correctly
process(extract()) # XCom passed implicitly
Here, each mapped task gets its own XCom value, and aggregate receives a list of all results. ❌ Passing large data # BAD – will bloat metadata DB @task def bad_task(): return large_dataframe.to_dict() # can be MB/GB ✅ Better: Store data in S3/GCS and pass the path as an XCom. ❌ Pulling from a task that hasn’t run @task def step_one(): return 1 @task def step_two(x): # If step_one failed or was skipped, this will raise an error return x + 1
push >> pull Pattern 1: Passing an ID from a query to a processing task @task def get_latest_record_id() -> int: # Imagine a SQL query here return 42 @task def process_record(record_id: int): print(f"Processing record record_id")