Ask AI

You are viewing an unreleased or outdated version of the documentation

Asset Observations#

An asset observation is an event that records metadata about a given asset. Unlike asset materializations, asset observations do not signify that an asset has been mutated.

Relevant APIs#

NameDescription
AssetObservationDagster event indicating that an asset's metadata has been recorded.
AssetKeyA unique identifier for a particular external asset.

Overview#

AssetObservation events are used to record metadata in Dagster about a given asset. Asset observation events can be logged at runtime within ops and assets. An asset must be defined using the @asset decorator or have existing materializations in order for its observations to be displayed.

Logging an AssetObservation from an Op#

To make Dagster aware that we have recorded metadata about an asset, we can log an AssetObservation event from within an op. To do this, we use the method OpExecutionContext.log_event on the context:

from dagster import AssetObservation, op


@op
def observation_op(context: OpExecutionContext):
    df = read_df()
    context.log_event(
        AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)})
    )
    return 5

We should now see an observation event in the event log when we execute this asset.

asset-observation

Attaching Metadata to an AssetObservation#

There are a variety of types of metadata that can be associated with an observation event, all through the MetadataValue class. Each observation event optionally takes a dictionary of metadata that is then displayed in the event log and the Asset Details page. Check our API docs for MetadataValue for more details on the types of event metadata available.

from dagster import AssetObservation, MetadataValue, op


@op
def observes_dataset_op(context: OpExecutionContext):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    context.log_event(
        AssetObservation(
            asset_key="my_dataset",
            metadata={
                "text_metadata": "Text-based metadata for this event",
                "path": MetadataValue.path(remote_storage_path),
                "dashboard_url": MetadataValue.url(
                    "http://mycoolsite.com/url_for_my_data"
                ),
                "size (bytes)": calculate_bytes(df),
            },
        )
    )
    return remote_storage_path

In the Asset Details page, we can see observations in the Asset Activity table.

asset-activity-observation

Specifying a partition for an AssetObservation#

If you are observing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the partition argument on the object.

from dagster import Config, op, OpExecutionContext


class MyOpConfig(Config):
    date: str


@op
def partitioned_dataset_op(context: OpExecutionContext, config: MyOpConfig):
    partition_date = config.date
    df = read_df_for_date(partition_date)
    context.log_event(
        AssetObservation(asset_key="my_partitioned_dataset", partition=partition_date)
    )
    return df

Observable assets#

Assets may have a user-defined observation function that returns metadata collected about the asset. Whenever the observation function is run, an AssetObservation event will be generated for the asset.

One use of observable assets is to track changes to the content of an asset. When an asset is observed to have a newer data version than the data version it had when a downstream asset was materialized, then the downstream asset will be given an "Unsynced" label in the Dagster UI that indicates that upstream data has changed.

AutoMaterializePolicys can be used to automatically materialize downstream assets when this occurs.

The observable_asset decorator provides a convenient way to define assets with observation functions. The below observable asset takes a file hash and returns it as the data version. Every time you run the observation function, a new observation will be generated with this hash set as its data version.

from hashlib import sha256

from dagster import DataVersion, observable_asset


@observable_asset
def foo_asset():
    content = read_some_file()
    hash_sig = sha256()
    hash_sig.update(bytearray(content, "utf8"))
    return DataVersion(hash_sig.hexdigest())

When the file content changes, the hash and therefore the data version will change - this will notify Dagster that downstream assets derived from an older value (i.e. a different data version) of this asset might need to be updated.

Asset observations can be triggered via the "Observe" button in the UI graph explorer view. Note that this button will only be visible if at least one asset in the current graph defines an observation function.

observable-source-asset

Asset observations can also be run as part of an asset job. This allows you to run asset observations on a schedule:

from dagster import (
    DataVersion,
    ScheduleDefinition,
    define_asset_job,
    observable_asset,
)


@observable_asset
def foo_asset():
    content = read_some_file()
    hash_sig = sha256()
    hash_sig.update(bytearray(content, "utf8"))
    return DataVersion(hash_sig.hexdigest())


observation_job = define_asset_job("observation_job", [foo_asset])

# schedule that will run the observation on foo_asset every day
observation_schedule = ScheduleDefinition(
    name="observation_schedule",
    cron_schedule="@daily",
    job=observation_job,
)

NOTE: Currently, asset observations cannot be run as part of a standard asset job that materializes assets. The selection argument to define_asset_job must target only observable assets-- an error will be thrown if a mix of materializable assets and observable assets is selected.