An non-materializable asset is an asset that is visible in Dagster but updated by an external process. For example, you have a process that loads data from Kafka into Amazon S3 every day. You want the S3 asset to be visible alongside your other data assets, but not triggered by Dagster.
In this case, you can use a non-materializable asset to leverage Dagster's event log and tooling without using the orchestrator. This allows you to maintain data lineage, observability, and data quality without unnecessary migrations.
The following code declares a single non-materializable asset that represents a file in S3 and passes it to a Definitions object:
Click the Asset in the Dagster UI tab to see how this asset would be rendered in the Dagster UI.
from dagster import AssetsDefinition, Definitions
defs = Definitions(assets=[AssetsDefinition.single("file_in_s3")])
Like when you use the @asset decorator, this code results in an AssetsDefinition object that contains the definition of a single asset. But unlike an AssetsDefinition object produced by the @asset decorator, the resulting object does not include a materialization function for the asset.
Click the Asset definition tab to view how this asset is defined.
Note that the Materialize button is disabled, because Dagster doesn't know how to materialize the asset.
Dependencies are defined by using the deps argument. This enables Dagster to model entire graphs of assets scheduled and orchestrated by other systems.
In the following example, we have two assets: raw_logs and processed_logs. The processed_logs asset is produced by a scheduled computation in another orchestration system. Using non-materializable assets allows you to model both assets in Dagster.
Click the Assets in the Dagster UI tab to see how these assets would be rendered in the Dagster UI.
Click the Asset definitions tab to view how these assets are defined.
Note that the Materialize button is disabled, because Dagster doesn't know how to materialize the asset.
Dagster-native assets with non-materializable asset dependencies#
Fully-managed assets can depend on non-materializable assets. In this example, the aggregated_logs asset depends on processed_logs, which is an non-materializable asset:
Click the Assets in the Dagster UI tab to see how these assets would be rendered in the Dagster UI.
from dagster import AssetsDefinition, Definitions, asset
raw_logs = AssetsDefinition.single("raw_logs")
processed_logs = AssetsDefinition.single("processed_logs", deps=[raw_logs])@asset(deps=[processed_logs])defaggregated_logs()->None:# Loads "processed_log" into memory and performs some aggregation...
defs = Definitions(assets=[aggregated_logs, raw_logs, processed_logs])
Click the Asset definitions tab to view how these assets are defined.
As with materializable assets, it's often useful to track how the contents of a non-materializable asset change over time. This allows catching data reliability issues, like data quality and freshness, as well as surfacing metadata in the catalog like column schema, row count, and usage statistics. TODO: more on why you want to do this, single pane of glass, etc..
There are two ways to track metadata for a non-materializable asset:
Observations - If you want to treat the process that updates the asset as a black box, but still monitor changes to the asset itself, then you can execute code within Dagster that observes the asset and reports the observation results in asset observation events.
External materializations - If the asset is updated in a discrete (i.e. non-streaming) manner, and you can instrument the process that updates it, then you can report asset materialization events to Dagster whenever it's updated from within that external process.
In many cases, the process that updates an asset is opaque to Dagster. For example, an asset might be a Kafka topic, a table updated by a streaming framework, or a table kept up-to-date by a distant team within the organization. However, properties of the asset itself can still be observed.
You can use observable assets to monitor these assets using Dagster. An observable asset pairs an asset definition with a function that observes the asset. The observation function is expected to return metadata that describes the asset at the time it's observed. This function can be scheduled, or manually executed, within Dagster, just like an asset materialization function.
Reporting external materializations for non-materializable assets#
In large organizations, Dagster often lives side-by-side with other orchestrators. For assets kept up-to-date by those orchestrators, it's often possible to instrument the tasks that update them to report those materializations to Dagster. This allows Dagster to be used as a central catalog and data reliability tool, even when it's not a central orchestrator.
This offers a deeper level of observability than with asset observations, because it provides a complete record of every time the asset was updated, and offers the ability to link back to the run in the other orchestrator that updated the asset.
To record a materializations from another system, you can use any of the following approaches:
Whether you're using Dagster OSS or Dagster+, you can use a REST endpoint for reporting asset materializations. The API also has endpoints for reporting asset observations and asset check evaluations.
Refer to the following tabs for examples using curl and Python to communicate with the API.
You can insert events to attach to external assets directly from Dagster's Python API. Specifically, the API is report_runless_asset_event on DagsterInstance.
For example, this would be useful when writing a Python script to backfill metadata:
from dagster import AssetMaterialization
# instance is a DagsterInstance. Get using DagsterInstance.get()
instance.report_runless_asset_event(
AssetMaterialization("asset_one", metadata={"nrows":10,"source":"From this script."}))
You can log an AssetMaterialization from an op. In this case, use the log_event method of OpExecutionContext to report an asset materialization of an external asset. For example: