Snowflake Polars (dagster-snowflake-polars)
This library provides an integration with Snowflake and Polars, allowing you to use Polars DataFrames with Snowflake storage.
I/O Manager
- dagster_snowflake_polars.snowflake_polars_io_manager
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. An I/O manager definition that reads inputs from and writes Polars DataFrames to Snowflake. When using the snowflake_polars_io_manager, any inputs and outputs without type annotations will be loaded as Polars DataFrames. Returns: IOManagerDefinition Examples: from dagster_snowflake_polars import snowflake_polars_io_manager
 from dagster import asset, Definitions
 import polars as pl
 @asset(
 key_prefix=["my_schema"], # will be used as the schema in snowflake
 )
 def my_table() -> pl.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={
 "io_manager": snowflake_polars_io_manager.configured({
 "database": "my_database",
 "account": {"env": "SNOWFLAKE_ACCOUNT"}
 })
 }
 )You can set a default schema to store the assets using the schemaconfiguration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.defs = Definitions(
 assets=[my_table],
 resources={"io_manager": snowflake_polars_io_manager.configured(
 {"database": "my_database", "schema": "my_schema"} # will be used as the schema
 )}
 )On individual assets, you can also specify the schema where they should be stored using metadata or by adding a key_prefixto the asset key. If bothkey_prefixand metadata are defined, the metadata will take precedence.@asset(
 key_prefix=["my_schema"], # will be used as the schema in snowflake
 )
 def my_table() -> pl.DataFrame:
 ...
 @asset(
 metadata={"schema": "my_schema"} # will be used as the schema in snowflake
 )
 def my_other_table() -> pl.DataFrame:
 ...For ops, the schema can be specified by including a “schema” entry in output metadata. @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pl.DataFrame:
 ...If none of these is provided, the schema will default to “public”. To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pl.DataFrame) -> pl.DataFrame:
 # my_table will just contain the data from column "a"
 ...
- classdagster_snowflake_polars.SnowflakePolarsIOManager
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. An I/O manager definition that reads inputs from and writes Polars DataFrames to Snowflake. When using the SnowflakePolarsIOManager, any inputs and outputs without type annotations will be loaded as Polars DataFrames. Returns: IOManagerDefinition Examples: from dagster_snowflake_polars import SnowflakePolarsIOManager
 from dagster import asset, Definitions, EnvVar
 import polars as pl
 @asset(
 key_prefix=["my_schema"], # will be used as the schema in snowflake
 )
 def my_table() -> pl.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={
 "io_manager": SnowflakePolarsIOManager(database="MY_DATABASE", account=EnvVar("SNOWFLAKE_ACCOUNT"))
 }
 )You can set a default schema to store the assets using the schemaconfiguration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.defs = Definitions(
 assets=[my_table],
 resources={
 "io_manager": SnowflakePolarsIOManager(database="my_database", schema="my_schema")
 }
 )On individual assets, you can also specify the schema where they should be stored using metadata or by adding a key_prefixto the asset key. If bothkey_prefixand metadata are defined, the metadata will take precedence.@asset(
 key_prefix=["my_schema"], # will be used as the schema in snowflake
 )
 def my_table() -> pl.DataFrame:
 ...
 @asset(
 metadata={"schema": "my_schema"} # will be used as the schema in snowflake
 )
 def my_other_table() -> pl.DataFrame:
 ...For ops, the schema can be specified by including a “schema” entry in output metadata. @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pl.DataFrame:
 ...If none of these is provided, the schema will default to “public”. To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pl.DataFrame) -> pl.DataFrame:
 # my_table will just contain the data from column "a"
 ...
Type Handler
- classdagster_snowflake_polars.SnowflakePolarsTypeHandler
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. Plugin for the Snowflake I/O Manager that can store and load Polars DataFrames as Snowflake tables. This handler uses Polars’ native write_database method with ADBC (Arrow Database Connectivity) for efficient data transfer without converting to pandas. Examples: from dagster_snowflake import SnowflakeIOManager
 from dagster_snowflake_polars import SnowflakePolarsTypeHandler
 from dagster import Definitions, EnvVar
 class MySnowflakeIOManager(SnowflakeIOManager):
 @staticmethod
 def type_handlers() -> Sequence[DbTypeHandler]:
 return [SnowflakePolarsTypeHandler()]
 @asset(
 key_prefix=["my_schema"], # will be used as the schema in snowflake
 )
 def my_table() -> pl.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={
 "io_manager": MySnowflakeIOManager(database="MY_DATABASE", account=EnvVar("SNOWFLAKE_ACCOUNT"), ...)
 }
 )