Skip to content

Datasets

Dataset constructors for common table/file formats.

Re-exports the most common dataset classes so callers can import from smallcat.datasets directly.

Available datasets
  • :class:ParquetDataset
  • :class:CSVDataset
  • :class:ExcelDataset
  • :class:DeltaTableDataset
  • :class:BaseDataset (abstract)

CSV

CSV dataset using DuckDB's CSV reader/writer.

This module defines :class:CSVDataset, a concrete dataset for CSV/TSV/DSV files using DuckDB (read_csv_auto / COPY ... WITH (FORMAT CSV)). All paths are treated as relative to the dataset's base URI.

Features
  • Auto-schema inference (delimiter, header, types) with overrides.
  • Large-file handling via DuckDB streaming.
  • Optional Hive-style partitioning on write.
Example

ds = CSVDataset.from_conn_id("local_fs") tbl = ds.load_arrow_table("bronze/raw/users.csv") ds.save_arrow_table("silver/users/", tbl)

Typical options (suggested): * Load: header, delimiter, columns, nullstr, types. * Save: header, delimiter, partition_by, overwrite.

Note

An implementation typically builds SQL like: SELECT * FROM read_csv_auto(? , ...options...) for reading and COPY (SELECT * FROM tmp_input) TO ? WITH (FORMAT CSV, ...) for writing.

smallcat.datasets.csv_dataset.CSVDataset

Bases: BaseDataset[CSVLoadOptions, CSVSaveOptions]

Dataset that reads/writes CSV using DuckDB.

  • Paths are resolved relative to the dataset's connection base (local filesystem, gs://, etc.).
  • Reading uses DuckDBPyConnection.read_csv under the hood and returns a pyarrow.Table.
  • Writing uses Relation.write_csv to materialize a table to CSV.
Notes:
  • Use :class:CSVLoadOptions to override auto-detection (separator, header, per-column types).
  • Use :class:CSVSaveOptions to control delimiter, header, and overwrite behavior.
Source code in src/smallcat/datasets/csv_dataset.py
class CSVDataset(BaseDataset[CSVLoadOptions, CSVSaveOptions]):
    """Dataset that reads/writes CSV using DuckDB.

    - **Paths** are resolved relative to the dataset's connection base
      (local filesystem, `gs://`, etc.).
    - **Reading** uses `DuckDBPyConnection.read_csv` under the hood and returns a
      `pyarrow.Table`.
    - **Writing** uses `Relation.write_csv` to materialize a table to CSV.

    Notes:
    -----
    - Use :class:`CSVLoadOptions` to override auto-detection (separator, header,
      per-column types).
    - Use :class:`CSVSaveOptions` to control delimiter, header, and overwrite behavior.
    """

    def load_arrow_record_batch_reader(
        self,
        path: str,
        where: str | None = None,
    ) -> pa.RecordBatchReader:
        """Stream CSV rows as `RecordBatch`es with an optional filter."""
        full_uri = self._full_uri(path)
        with self._duckdb_conn() as con:
            rel = con.read_csv(full_uri, **self.load_options_dict())
            query = "select * from data"
            if where:
                query += f" where {where}"
            return rel.query("data", query).fetch_record_batch()

    def save_arrow_table(self, path: str, table: pa.Table) -> None:
        """Write a PyArrow Table to CSV.

        Parameters
        ----------
        path
            Destination path (file or pattern) relative to the connection base.
            Compression is inferred from the extension (e.g. '.gz', '.zst').
        table
            The Arrow table to write.

        Raises:
        ------
        duckdb.IOException
            If the destination is not writable.
        """
        full_uri = self._full_uri(path)
        with self._duckdb_conn() as con:
            con.register("tmp_input", table)
            con.sql("SELECT * FROM tmp_input").write_csv(
                full_uri,
                **self.save_options_dict(),
            )

load_arrow_record_batch_reader

load_arrow_record_batch_reader(path: str, where: str | None = None) -> pa.RecordBatchReader

Stream CSV rows as RecordBatches with an optional filter.

Source code in src/smallcat/datasets/csv_dataset.py
def load_arrow_record_batch_reader(
    self,
    path: str,
    where: str | None = None,
) -> pa.RecordBatchReader:
    """Stream CSV rows as `RecordBatch`es with an optional filter."""
    full_uri = self._full_uri(path)
    with self._duckdb_conn() as con:
        rel = con.read_csv(full_uri, **self.load_options_dict())
        query = "select * from data"
        if where:
            query += f" where {where}"
        return rel.query("data", query).fetch_record_batch()

save_arrow_table

save_arrow_table(path: str, table: pa.Table) -> None

Write a PyArrow Table to CSV.

Parameters

path Destination path (file or pattern) relative to the connection base. Compression is inferred from the extension (e.g. '.gz', '.zst'). table The Arrow table to write.

Raises:

duckdb.IOException If the destination is not writable.

Source code in src/smallcat/datasets/csv_dataset.py
def save_arrow_table(self, path: str, table: pa.Table) -> None:
    """Write a PyArrow Table to CSV.

    Parameters
    ----------
    path
        Destination path (file or pattern) relative to the connection base.
        Compression is inferred from the extension (e.g. '.gz', '.zst').
    table
        The Arrow table to write.

    Raises:
    ------
    duckdb.IOException
        If the destination is not writable.
    """
    full_uri = self._full_uri(path)
    with self._duckdb_conn() as con:
        con.register("tmp_input", table)
        con.sql("SELECT * FROM tmp_input").write_csv(
            full_uri,
            **self.save_options_dict(),
        )

from_conn_id classmethod

from_conn_id(conn_id: str, *, load_options: L | None = None, save_options: S | None = None) -> BaseDataset[L, S]

Construct an instance by looking up an Airflow connection ID.

Uses airflow.hooks.base.BaseHook (or the SDK alternative) to fetch the connection and then calls the class constructor.

Parameters:

Name Type Description Default
conn_id str

Airflow connection ID to resolve.

required
load_options L | None

Optional load options model.

None
save_options S | None

Optional save options model.

None

Returns:

Type Description
BaseDataset[L, S]

A fully constructed BaseDataset subclass instance.

Source code in src/smallcat/datasets/base_dataset.py
@classmethod
def from_conn_id(
    cls: type["BaseDataset[L, S]"],
    conn_id: str,
    *,
    load_options: L | None = None,
    save_options: S | None = None,
) -> "BaseDataset[L, S]":
    """Construct an instance by looking up an Airflow connection ID.

    Uses `airflow.hooks.base.BaseHook` (or the SDK alternative) to fetch
    the connection and then calls the class constructor.

    Args:
      conn_id: Airflow connection ID to resolve.
      load_options: Optional load options model.
      save_options: Optional save options model.

    Returns:
      A fully constructed `BaseDataset` subclass instance.
    """
    if BaseHook is None:
        raise RuntimeError("Airflow not available. Install smallcat[airflow]")  # noqa: TRY003, EM101

    conn = BaseHook.get_connection(conn_id)
    return cls(conn=conn, load_options=load_options, save_options=save_options)

load_pandas

load_pandas(path: str, where: str | None = None) -> pd.DataFrame

Load data as a pandas DataFrame.

This is a convenience wrapper over load_arrow_table and pushes down filters when provided.

Parameters:

Name Type Description Default
path str

Relative dataset path.

required
where str | None

Optional SQL filter predicate injected into the query.

None

Returns:

Type Description
pd.DataFrame

A pandas DataFrame.

Source code in src/smallcat/datasets/base_dataset.py
def load_pandas(self, path: str, where: str | None = None) -> pd.DataFrame:
    """Load data as a pandas DataFrame.

    This is a convenience wrapper over `load_arrow_table` and pushes down
    filters when provided.

    Args:
      path: Relative dataset path.
      where: Optional SQL filter predicate injected into the query.

    Returns:
      A pandas `DataFrame`.
    """
    arrow_table = self.load_arrow_table(path=path, where=where)
    return arrow_table.to_pandas()

save_pandas

save_pandas(path: str, df: pd.DataFrame) -> None

Persist a pandas DataFrame.

Converts the DataFrame to a pyarrow.Table and delegates to save_arrow_table.

Parameters:

Name Type Description Default
path str

Relative dataset path.

required
df pd.DataFrame

DataFrame to persist.

required

Returns:

Type Description
None

None

Source code in src/smallcat/datasets/base_dataset.py
def save_pandas(self, path: str, df: pd.DataFrame) -> None:
    """Persist a pandas DataFrame.

    Converts the DataFrame to a `pyarrow.Table` and delegates to
    `save_arrow_table`.

    Args:
        path: Relative dataset path.
        df: DataFrame to persist.

    Returns:
        None
    """
    arrow_table = pa.Table.from_pandas(df)
    self.save_arrow_table(path, table=arrow_table)

Options

smallcat.datasets.csv_dataset.CSVLoadOptions

Bases: BaseModel

Options that control how CSV files are read.

These mirror DuckDB's read_csv_auto parameters we expose. All fields are optional; unset values defer to DuckDB defaults.

Attributes:

columns Optional mapping of column names to logical types (e.g. {"id": "INTEGER", "amount": "DOUBLE"}) used to override DuckDB's type inference when auto-detect is not good enough. sep Field separator (e.g. ",", "|", "\t"). If None, DuckDB will try to detect it. header Whether the first row contains column names. If None, DuckDB will detect. sample_size Number of rows to sample for schema detection. If None, DuckDB default applies. all_varchar If True, read all columns as VARCHAR (string). Useful when types are messy.

Source code in src/smallcat/datasets/csv_dataset.py
class CSVLoadOptions(BaseModel):
    r"""Options that control how CSV files are *read*.

    These mirror DuckDB's `read_csv_auto` parameters we expose.
    All fields are optional; unset values defer to DuckDB defaults.

    Attributes:
    ----------
    columns
        Optional mapping of column names to logical types
        (e.g. {"id": "INTEGER", "amount": "DOUBLE"}) used to override
        DuckDB's type inference when auto-detect is not good enough.
    sep
        Field separator (e.g. ",", "|", "\t"). If None, DuckDB will try to detect it.
    header
        Whether the first row contains column names. If None, DuckDB will detect.
    sample_size
        Number of rows to sample for schema detection. If None, DuckDB default applies.
    all_varchar
        If True, read all columns as VARCHAR (string). Useful when types are messy.
    """

    columns: Mapping[str, str] | None = Field(
        None,
        description="Override inferred types per column, e.g. {'id': 'INTEGER'}.",
    )
    sep: str | None = Field(
        None,
        description="Field separator (e.g. ',', '|', '\\t'); auto-detected if None.",
    )
    header: bool | None = Field(
        None,
        description="Whether the first row is a header; auto-detected if None.",
    )
    sample_size: int | None = Field(
        None,
        description="Rows to sample for inference; DuckDB default if None.",
    )
    all_varchar: bool | None = Field(
        None,
        description="If True, read all columns as VARCHAR.",
    )
columns class-attribute instance-attribute
columns: Mapping[str, str] | None = Field(None, description="Override inferred types per column, e.g. {'id': 'INTEGER'}.")
sep class-attribute instance-attribute
sep: str | None = Field(None, description="Field separator (e.g. ',', '|', '\\t'); auto-detected if None.")
header class-attribute instance-attribute
header: bool | None = Field(None, description='Whether the first row is a header; auto-detected if None.')
sample_size class-attribute instance-attribute
sample_size: int | None = Field(None, description='Rows to sample for inference; DuckDB default if None.')
all_varchar class-attribute instance-attribute
all_varchar: bool | None = Field(None, description='If True, read all columns as VARCHAR.')

smallcat.datasets.csv_dataset.CSVSaveOptions

Bases: BaseModel

Options that control how CSV files are written.

Attributes:

header Whether to write a header row with column names. sep Field separator to use when writing (e.g. ',', '|', '\t'). overwrite If True, allow overwriting existing files at the destination. Compression is inferred from the file extension ('.gz', '.zst', …).

Source code in src/smallcat/datasets/csv_dataset.py
class CSVSaveOptions(BaseModel):
    r"""Options that control how CSV files are *written*.

    Attributes:
    ----------
    header
        Whether to write a header row with column names.
    sep
        Field separator to use when writing (e.g. ',', '|', '\t').
    overwrite
        If True, allow overwriting existing files at the destination.
        Compression is *inferred from the file extension* ('.gz', '.zst', …).
    """

    header: bool | None = Field(
        None,
        description="Write a header row with column names.",
    )
    sep: str | None = Field(
        None,
        description="Field separator to use (e.g. ',', '|', r'\t').",
    )
    # compression is inferred from extension (.gz, .zst, …) don't expose unless you must
    overwrite: bool | None = Field(
        None,
        description="If True, overwrite existing files at the destination.",
    )
header class-attribute instance-attribute
header: bool | None = Field(None, description='Write a header row with column names.')
sep class-attribute instance-attribute
sep: str | None = Field(None, description="Field separator to use (e.g. ',', '|', r'\t').")
overwrite class-attribute instance-attribute
overwrite: bool | None = Field(None, description='If True, overwrite existing files at the destination.')

Excel

Excel (.xlsx) dataset via DuckDB's excel extension.

This module provides :class:ExcelDataset for reading/writing .xlsx files (legacy .xls is not supported). Paths are relative to the configured base URI; the DuckDB excel extension is installed/loaded at runtime.

Capabilities
  • Read a whole sheet or an A1 range with optional header handling.
  • Coerce empty columns or all columns to VARCHAR for schema stability.
  • Write Arrow tables to a specific sheet (with optional header row).
Example

ds = ExcelDataset.from_conn_id("fs_conn") tbl = ds.load_arrow_table("inputs/budget.xlsx") # first sheet by default ds.save_arrow_table("outputs/budget_out.xlsx", tbl)

Options
  • ExcelLoadOptions: header, sheet, range, all_varchar, empty_as_varchar.
  • ExcelSaveOptions: header, sheet.

smallcat.datasets.excel_dataset.ExcelDataset

Bases: BaseDataset[ExcelLoadOptions, ExcelSaveOptions]

Reads and writes .xlsx files via DuckDB's excel extension.

Notes
  • Legacy .xls format is not supported.
  • Paths are treated as relative to this dataset's base URI (e.g., file:// or gs://); use the connection extras to set the base.
Source code in src/smallcat/datasets/excel_dataset.py
class ExcelDataset(BaseDataset[ExcelLoadOptions, ExcelSaveOptions]):
    """Reads and writes **.xlsx** files via DuckDB's `excel` extension.

    Notes:
      * Legacy **.xls** format is **not** supported.
      * Paths are treated as relative to this dataset's base URI (e.g., `file://` or
        `gs://`); use the connection extras to set the base.
    """

    def load_arrow_record_batch_reader(
        self,
        path: str,
        where: str | None = None,
    ) -> pa.RecordBatchReader:
        """Stream .xlsx rows as record batches with an optional filter."""
        full_uri = self._full_uri(path)
        with self._duckdb_conn() as con:
            con.install_extension("excel")
            con.load_extension("excel")
            lo = self.load_options_dict()
            args_sql = ""
            if header := lo.get("header"):
                args_sql += f", header = {str(header).lower()}"
            if sheet := lo.get("sheet"):
                args_sql += f", sheet = '{sheet}'"
            if _range := lo.get("range"):
                args_sql += f", range = '{_range}'"
            if all_varchar := lo.get("all_varchar"):
                args_sql += f", all_varchar = {str(all_varchar).lower()}"
            if empty_as_varchar := lo.get("empty_as_varchar"):
                args_sql += f", empty_as_varchar = {str(empty_as_varchar).lower()}"
            query = f"select * from read_xlsx(?{args_sql})"  # noqa: S608
            if where:
                query += f" where {where}"
            return con.execute(query, [full_uri]).fetch_record_batch()

    def save_arrow_table(self, path: str, table: pa.Table) -> None:
        """Write a PyArrow table to an .xlsx file.

        Args:
          path: Relative path of the output .xlsx file (joined under the dataset base).
          table: The `pyarrow.Table` to write.

        Notes:
          Uses DuckDB's `COPY ... TO ... WITH (FORMAT xlsx ...)` from the
          `excel` extension. Save-time options are translated into COPY options.
        """
        full_uri = self._full_uri(path)
        with self._duckdb_conn() as con:
            con.install_extension("excel")
            con.load_extension("excel")
            lo = self.save_options_dict()
            args_sql = ""
            if header := lo.get("header"):
                args_sql += f", HEADER {str(header).lower()}"
            if sheet := lo.get("sheet"):
                args_sql += f", SHEET '{sheet}'"
            con.register("tmp_input", table)
            query = f"copy (select * from tmp_input) TO ? WITH (FORMAT xlsx{args_sql})"  # noqa: S608
            con.execute(query, [full_uri]).fetch_arrow_table()

load_arrow_record_batch_reader

load_arrow_record_batch_reader(path: str, where: str | None = None) -> pa.RecordBatchReader

Stream .xlsx rows as record batches with an optional filter.

Source code in src/smallcat/datasets/excel_dataset.py
def load_arrow_record_batch_reader(
    self,
    path: str,
    where: str | None = None,
) -> pa.RecordBatchReader:
    """Stream .xlsx rows as record batches with an optional filter."""
    full_uri = self._full_uri(path)
    with self._duckdb_conn() as con:
        con.install_extension("excel")
        con.load_extension("excel")
        lo = self.load_options_dict()
        args_sql = ""
        if header := lo.get("header"):
            args_sql += f", header = {str(header).lower()}"
        if sheet := lo.get("sheet"):
            args_sql += f", sheet = '{sheet}'"
        if _range := lo.get("range"):
            args_sql += f", range = '{_range}'"
        if all_varchar := lo.get("all_varchar"):
            args_sql += f", all_varchar = {str(all_varchar).lower()}"
        if empty_as_varchar := lo.get("empty_as_varchar"):
            args_sql += f", empty_as_varchar = {str(empty_as_varchar).lower()}"
        query = f"select * from read_xlsx(?{args_sql})"  # noqa: S608
        if where:
            query += f" where {where}"
        return con.execute(query, [full_uri]).fetch_record_batch()

save_arrow_table

save_arrow_table(path: str, table: pa.Table) -> None

Write a PyArrow table to an .xlsx file.

Parameters:

Name Type Description Default
path str

Relative path of the output .xlsx file (joined under the dataset base).

required
table pa.Table

The pyarrow.Table to write.

required
Notes

Uses DuckDB's COPY ... TO ... WITH (FORMAT xlsx ...) from the excel extension. Save-time options are translated into COPY options.

Source code in src/smallcat/datasets/excel_dataset.py
def save_arrow_table(self, path: str, table: pa.Table) -> None:
    """Write a PyArrow table to an .xlsx file.

    Args:
      path: Relative path of the output .xlsx file (joined under the dataset base).
      table: The `pyarrow.Table` to write.

    Notes:
      Uses DuckDB's `COPY ... TO ... WITH (FORMAT xlsx ...)` from the
      `excel` extension. Save-time options are translated into COPY options.
    """
    full_uri = self._full_uri(path)
    with self._duckdb_conn() as con:
        con.install_extension("excel")
        con.load_extension("excel")
        lo = self.save_options_dict()
        args_sql = ""
        if header := lo.get("header"):
            args_sql += f", HEADER {str(header).lower()}"
        if sheet := lo.get("sheet"):
            args_sql += f", SHEET '{sheet}'"
        con.register("tmp_input", table)
        query = f"copy (select * from tmp_input) TO ? WITH (FORMAT xlsx{args_sql})"  # noqa: S608
        con.execute(query, [full_uri]).fetch_arrow_table()

from_conn_id classmethod

from_conn_id(conn_id: str, *, load_options: L | None = None, save_options: S | None = None) -> BaseDataset[L, S]

Construct an instance by looking up an Airflow connection ID.

Uses airflow.hooks.base.BaseHook (or the SDK alternative) to fetch the connection and then calls the class constructor.

Parameters:

Name Type Description Default
conn_id str

Airflow connection ID to resolve.

required
load_options L | None

Optional load options model.

None
save_options S | None

Optional save options model.

None

Returns:

Type Description
BaseDataset[L, S]

A fully constructed BaseDataset subclass instance.

Source code in src/smallcat/datasets/base_dataset.py
@classmethod
def from_conn_id(
    cls: type["BaseDataset[L, S]"],
    conn_id: str,
    *,
    load_options: L | None = None,
    save_options: S | None = None,
) -> "BaseDataset[L, S]":
    """Construct an instance by looking up an Airflow connection ID.

    Uses `airflow.hooks.base.BaseHook` (or the SDK alternative) to fetch
    the connection and then calls the class constructor.

    Args:
      conn_id: Airflow connection ID to resolve.
      load_options: Optional load options model.
      save_options: Optional save options model.

    Returns:
      A fully constructed `BaseDataset` subclass instance.
    """
    if BaseHook is None:
        raise RuntimeError("Airflow not available. Install smallcat[airflow]")  # noqa: TRY003, EM101

    conn = BaseHook.get_connection(conn_id)
    return cls(conn=conn, load_options=load_options, save_options=save_options)

load_pandas

load_pandas(path: str, where: str | None = None) -> pd.DataFrame

Load data as a pandas DataFrame.

This is a convenience wrapper over load_arrow_table and pushes down filters when provided.

Parameters:

Name Type Description Default
path str

Relative dataset path.

required
where str | None

Optional SQL filter predicate injected into the query.

None

Returns:

Type Description
pd.DataFrame

A pandas DataFrame.

Source code in src/smallcat/datasets/base_dataset.py
def load_pandas(self, path: str, where: str | None = None) -> pd.DataFrame:
    """Load data as a pandas DataFrame.

    This is a convenience wrapper over `load_arrow_table` and pushes down
    filters when provided.

    Args:
      path: Relative dataset path.
      where: Optional SQL filter predicate injected into the query.

    Returns:
      A pandas `DataFrame`.
    """
    arrow_table = self.load_arrow_table(path=path, where=where)
    return arrow_table.to_pandas()

save_pandas

save_pandas(path: str, df: pd.DataFrame) -> None

Persist a pandas DataFrame.

Converts the DataFrame to a pyarrow.Table and delegates to save_arrow_table.

Parameters:

Name Type Description Default
path str

Relative dataset path.

required
df pd.DataFrame

DataFrame to persist.

required

Returns:

Type Description
None

None

Source code in src/smallcat/datasets/base_dataset.py
def save_pandas(self, path: str, df: pd.DataFrame) -> None:
    """Persist a pandas DataFrame.

    Converts the DataFrame to a `pyarrow.Table` and delegates to
    `save_arrow_table`.

    Args:
        path: Relative dataset path.
        df: DataFrame to persist.

    Returns:
        None
    """
    arrow_table = pa.Table.from_pandas(df)
    self.save_arrow_table(path, table=arrow_table)

Options

smallcat.datasets.excel_dataset.ExcelLoadOptions

Bases: BaseModel

Options that control how an .xlsx file is read.

Attributes:

Name Type Description
header bool | None

If True, treat the first row as column headers.

sheet str | None

Optional worksheet name to read. If omitted, the first sheet is used.

range str | None

Excel A1-style range to read (e.g., "A1:D100"). If omitted, the full sheet is read.

all_varchar bool | None

If True, coerce all columns to VARCHAR (strings).

empty_as_varchar bool | None

If True, treat empty columns as VARCHAR instead of NULL/typed.

Source code in src/smallcat/datasets/excel_dataset.py
class ExcelLoadOptions(BaseModel):
    """Options that control how an .xlsx file is read.

    Attributes:
      header: If True, treat the first row as column headers.
      sheet: Optional worksheet name to read. If omitted, the first sheet is used.
      range: Excel A1-style range to read (e.g., "A1:D100").
             If omitted, the full sheet is read.
      all_varchar: If True, coerce all columns to VARCHAR (strings).
      empty_as_varchar: If True, treat empty columns as VARCHAR instead of NULL/typed.
    """

    header: bool | None = Field(None)
    sheet: str | None = Field(None)
    range: str | None = Field(None)
    all_varchar: bool | None = Field(None)
    empty_as_varchar: bool | None = Field(None)
header class-attribute instance-attribute
header: bool | None = Field(None)
sheet class-attribute instance-attribute
sheet: str | None = Field(None)
range class-attribute instance-attribute
range: str | None = Field(None)
all_varchar class-attribute instance-attribute
all_varchar: bool | None = Field(None)
empty_as_varchar class-attribute instance-attribute
empty_as_varchar: bool | None = Field(None)

smallcat.datasets.excel_dataset.ExcelSaveOptions

Bases: BaseModel

Options that control how an Arrow table is written to .xlsx.

Attributes:

Name Type Description
header bool | None

If True, include column headers in the output file.

sheet str | None

Optional worksheet name to write into (created if missing).

Source code in src/smallcat/datasets/excel_dataset.py
class ExcelSaveOptions(BaseModel):
    """Options that control how an Arrow table is written to .xlsx.

    Attributes:
      header: If True, include column headers in the output file.
      sheet: Optional worksheet name to write into (created if missing).
    """

    header: bool | None = Field(None)
    sheet: str | None = Field(None)
header class-attribute instance-attribute
header: bool | None = Field(None)
sheet class-attribute instance-attribute
sheet: str | None = Field(None)

Parquet

Parquet dataset backed by DuckDB.

This module provides :class:ParquetDataset, a concrete implementation of :class:~smallcat.datasets.base_dataset.BaseDataset that reads/writes Parquet via DuckDB. Paths passed to public methods are relative to the configured base (e.g., file:// or gs://).

Features
  • Read from a single file, directory, or glob pattern.
  • Hive partition discovery and schema union (optional).
  • Write with optional partitioning and overwrite.
Example

ds = ParquetDataset.from_conn_id("gcs_conn") tbl = ds.load_arrow_table("bronze/events/*/.parquet") ds.save_arrow_table("silver/events/", tbl)

smallcat.datasets.parquet_dataset.ParquetDataset

Bases: BaseDataset

Parquet dataset backed by DuckDB's Parquet reader/writer.

Paths passed to public methods are treated as relative to the dataset's configured base (e.g., file:// or gs://). Reads return a PyArrow table.

Notes
  • You can pass a single file, a directory (e.g., /path/**.parquet), or any glob DuckDB understands.
Source code in src/smallcat/datasets/parquet_dataset.py
class ParquetDataset(BaseDataset):
    """Parquet dataset backed by DuckDB's Parquet reader/writer.

    Paths passed to public methods are treated as **relative** to the dataset's
    configured base (e.g., `file://` or `gs://`). Reads return a PyArrow
    table.

    Notes:
      * You can pass a single file, a directory (e.g., `/path/**.parquet`),
        or any glob DuckDB understands.
    """

    def load_arrow_record_batch_reader(
        self,
        path: str,
        where: str | None = None,
    ) -> pa.RecordBatchReader:
        """Stream Parquet rows as record batches with an optional filter."""
        full_uri = self._full_uri(path)
        with self._duckdb_conn() as con:
            rel = con.read_parquet(full_uri, **self.load_options_dict())
            query = "select * from data"
            if where:
                query += f" where {where}"
            return rel.query("data", query).fetch_record_batch()

    def save_arrow_table(self, path: str, table: pa.Table) -> None:
        """Write a PyArrow table to Parquet.

        Args:
          path: Relative output path (file or directory) joined under the
            dataset base URI.
          table: The `pyarrow.Table` to write.

        Notes:
          Uses `Relation.write_parquet` with parameters from
          `save_options_dict()`.
        """
        full_uri = self._full_uri(path)
        with self._duckdb_conn() as con:
            con.register("tmp_input", table)
            con.sql("SELECT * FROM tmp_input").write_parquet(
                full_uri,
                **self.save_options_dict(),
            )

load_arrow_record_batch_reader

load_arrow_record_batch_reader(path: str, where: str | None = None) -> pa.RecordBatchReader

Stream Parquet rows as record batches with an optional filter.

Source code in src/smallcat/datasets/parquet_dataset.py
def load_arrow_record_batch_reader(
    self,
    path: str,
    where: str | None = None,
) -> pa.RecordBatchReader:
    """Stream Parquet rows as record batches with an optional filter."""
    full_uri = self._full_uri(path)
    with self._duckdb_conn() as con:
        rel = con.read_parquet(full_uri, **self.load_options_dict())
        query = "select * from data"
        if where:
            query += f" where {where}"
        return rel.query("data", query).fetch_record_batch()

save_arrow_table

save_arrow_table(path: str, table: pa.Table) -> None

Write a PyArrow table to Parquet.

Parameters:

Name Type Description Default
path str

Relative output path (file or directory) joined under the dataset base URI.

required
table pa.Table

The pyarrow.Table to write.

required
Notes

Uses Relation.write_parquet with parameters from save_options_dict().

Source code in src/smallcat/datasets/parquet_dataset.py
def save_arrow_table(self, path: str, table: pa.Table) -> None:
    """Write a PyArrow table to Parquet.

    Args:
      path: Relative output path (file or directory) joined under the
        dataset base URI.
      table: The `pyarrow.Table` to write.

    Notes:
      Uses `Relation.write_parquet` with parameters from
      `save_options_dict()`.
    """
    full_uri = self._full_uri(path)
    with self._duckdb_conn() as con:
        con.register("tmp_input", table)
        con.sql("SELECT * FROM tmp_input").write_parquet(
            full_uri,
            **self.save_options_dict(),
        )

from_conn_id classmethod

from_conn_id(conn_id: str, *, load_options: L | None = None, save_options: S | None = None) -> BaseDataset[L, S]

Construct an instance by looking up an Airflow connection ID.

Uses airflow.hooks.base.BaseHook (or the SDK alternative) to fetch the connection and then calls the class constructor.

Parameters:

Name Type Description Default
conn_id str

Airflow connection ID to resolve.

required
load_options L | None

Optional load options model.

None
save_options S | None

Optional save options model.

None

Returns:

Type Description
BaseDataset[L, S]

A fully constructed BaseDataset subclass instance.

Source code in src/smallcat/datasets/base_dataset.py
@classmethod
def from_conn_id(
    cls: type["BaseDataset[L, S]"],
    conn_id: str,
    *,
    load_options: L | None = None,
    save_options: S | None = None,
) -> "BaseDataset[L, S]":
    """Construct an instance by looking up an Airflow connection ID.

    Uses `airflow.hooks.base.BaseHook` (or the SDK alternative) to fetch
    the connection and then calls the class constructor.

    Args:
      conn_id: Airflow connection ID to resolve.
      load_options: Optional load options model.
      save_options: Optional save options model.

    Returns:
      A fully constructed `BaseDataset` subclass instance.
    """
    if BaseHook is None:
        raise RuntimeError("Airflow not available. Install smallcat[airflow]")  # noqa: TRY003, EM101

    conn = BaseHook.get_connection(conn_id)
    return cls(conn=conn, load_options=load_options, save_options=save_options)

load_pandas

load_pandas(path: str, where: str | None = None) -> pd.DataFrame

Load data as a pandas DataFrame.

This is a convenience wrapper over load_arrow_table and pushes down filters when provided.

Parameters:

Name Type Description Default
path str

Relative dataset path.

required
where str | None

Optional SQL filter predicate injected into the query.

None

Returns:

Type Description
pd.DataFrame

A pandas DataFrame.

Source code in src/smallcat/datasets/base_dataset.py
def load_pandas(self, path: str, where: str | None = None) -> pd.DataFrame:
    """Load data as a pandas DataFrame.

    This is a convenience wrapper over `load_arrow_table` and pushes down
    filters when provided.

    Args:
      path: Relative dataset path.
      where: Optional SQL filter predicate injected into the query.

    Returns:
      A pandas `DataFrame`.
    """
    arrow_table = self.load_arrow_table(path=path, where=where)
    return arrow_table.to_pandas()

save_pandas

save_pandas(path: str, df: pd.DataFrame) -> None

Persist a pandas DataFrame.

Converts the DataFrame to a pyarrow.Table and delegates to save_arrow_table.

Parameters:

Name Type Description Default
path str

Relative dataset path.

required
df pd.DataFrame

DataFrame to persist.

required

Returns:

Type Description
None

None

Source code in src/smallcat/datasets/base_dataset.py
def save_pandas(self, path: str, df: pd.DataFrame) -> None:
    """Persist a pandas DataFrame.

    Converts the DataFrame to a `pyarrow.Table` and delegates to
    `save_arrow_table`.

    Args:
        path: Relative dataset path.
        df: DataFrame to persist.

    Returns:
        None
    """
    arrow_table = pa.Table.from_pandas(df)
    self.save_arrow_table(path, table=arrow_table)

Options

smallcat.datasets.parquet_dataset.ParquetLoadOptions

Bases: BaseModel

Options that control how Parquet is read via DuckDB.

Attributes:

Name Type Description
binary_as_string bool | None

If True, interpret BINARY columns as strings.

file_row_number bool | None

If True, include a synthetic row-number column per file.

hive_partitioning bool | None

If True, parse Hive-style directory partitions.

union_by_name bool | None

If True, align/union schemas by column name across files.

Source code in src/smallcat/datasets/parquet_dataset.py
class ParquetLoadOptions(BaseModel):
    """Options that control how Parquet is read via DuckDB.

    Attributes:
      binary_as_string: If True, interpret BINARY columns as strings.
      file_row_number: If True, include a synthetic row-number column per file.
      hive_partitioning: If True, parse Hive-style directory partitions.
      union_by_name: If True, align/union schemas by column name across files.
    """

    binary_as_string: bool | None = Field(None)
    file_row_number: bool | None = Field(None)
    hive_partitioning: bool | None = Field(None)
    union_by_name: bool | None = Field(None)
binary_as_string class-attribute instance-attribute
binary_as_string: bool | None = Field(None)
file_row_number class-attribute instance-attribute
file_row_number: bool | None = Field(None)
hive_partitioning class-attribute instance-attribute
hive_partitioning: bool | None = Field(None)
union_by_name class-attribute instance-attribute
union_by_name: bool | None = Field(None)

smallcat.datasets.parquet_dataset.ParquetSaveOptions

Bases: BaseModel

Options that control how Parquet is written via DuckDB.

Attributes:

Name Type Description
overwrite bool | None

If True, overwrite existing output.

partition_by list[str] | None

Columns to partition by (Hive-style layout).

write_partition_columns bool | None

If True, also materialize partition cols in files.

Source code in src/smallcat/datasets/parquet_dataset.py
class ParquetSaveOptions(BaseModel):
    """Options that control how Parquet is written via DuckDB.

    Attributes:
      overwrite: If True, overwrite existing output.
      partition_by: Columns to partition by (Hive-style layout).
      write_partition_columns: If True, also materialize partition cols in files.
    """

    overwrite: bool | None = Field(None)
    partition_by: list[str] | None = Field(None)
    write_partition_columns: bool | None = Field(None)
overwrite class-attribute instance-attribute
overwrite: bool | None = Field(None)
partition_by class-attribute instance-attribute
partition_by: list[str] | None = Field(None)
write_partition_columns class-attribute instance-attribute
write_partition_columns: bool | None = Field(None)

Delta Table

Delta Lake dataset using delta-rs (deltalake) with Smallcat.

This module implements :class:DeltaTableDataset, a Delta Lake reader/writer powered by deltalake (delta-rs). It resolves relative paths against the connection base (e.g., gs://bucket/prefix) and returns/accepts Arrow tables.

Storage backends
  • Local filesystem (fs) - no extra options.
  • Google Cloud Storage (google_cloud_platform) - credentials derived from connection extras: keyfile_dict / keyfile / key_path.
  • Databricks - minimal env vars exported (workspace URL and token).
Example

ds = DeltaTableDataset.from_conn_id("gcs_delta") tbl = ds.load_arrow_table("bronze/events_delta") ds.save_arrow_table("silver/events_delta", tbl)

Options
  • DeltaTableLoadOptions: version, without_files, log_buffer_size.
  • DeltaTableSaveOptions: mode, partition_by, schema_mode.
Notes

For Databricks, this module sets: DATABRICKS_WORKSPACE_URL and DATABRICKS_ACCESS_TOKEN before access.

smallcat.datasets.delta_table_dataset.DeltaTableDataset

Bases: BaseDataset[DeltaTableLoadOptions, DeltaTableSaveOptions]

Delta Lake dataset that reads/writes via delta-rs (DeltaTable / write_deltalake).

Paths passed to public methods are treated as relative to the dataset's configured base (e.g., local file:// or gs://). Reads return a PyArrow table.

Notes
  • For Google Cloud Storage, credentials are derived from the connection's extras (e.g., keyfile_dict, keyfile, or key_path).
  • For conn_type == "databricks", environment variables are set to support Databricks-hosted Delta.
Source code in src/smallcat/datasets/delta_table_dataset.py
class DeltaTableDataset(BaseDataset[DeltaTableLoadOptions, DeltaTableSaveOptions]):
    """Delta Lake dataset that reads/writes via delta-rs (DeltaTable / write_deltalake).

    Paths passed to public methods are treated as **relative** to the dataset's
    configured base (e.g., local `file://` or `gs://`). Reads return a
    PyArrow table.

    Notes:
      * For Google Cloud Storage, credentials are derived from the connection's
        extras (e.g., `keyfile_dict`, `keyfile`, or `key_path`).
      * For `conn_type == "databricks"`, environment variables are set to
        support Databricks-hosted Delta.
    """

    def _delta_storage_options(self) -> dict:
        """Build `storage_options` for delta-rs reads/writes.

        The options are derived from the active connection:
          * `fs` (local): returns `{}`.
          * `google_cloud_platform`: uses one of:
              - `extras.keyfile_dict` (dict or JSON string)
              - `extras.keyfile` (raw JSON string)
              - `extras.key_path` (path on worker)

        Returns:
          A mapping suitable for the `storage_options` parameter used by
          `deltalake.DeltaTable` and `deltalake.write_deltalake`. For GCS,
          keys include one of:
            * `google_service_account_key` (serialized JSON)
            * `google_service_account` (path to keyfile)

        Raises:
          ValueError: If the connection type is not supported.
        """
        if self.conn.conn_type not in ["fs", "google_cloud_platform"]:
            msg = f"Storage options not implemented for type {self.conn.conn_type}"
            raise ValueError(
                msg,
            )
        x = self.extras

        # keyfile_dict can be dict or JSON string
        kfd = x.get("keyfile_dict")
        if isinstance(kfd, str):
            try:
                kfd = json.loads(kfd)
            except json.JSONDecodeError:
                # If it's not JSON, ignore and fall through
                kfd = None

        if isinstance(kfd, dict) and kfd:
            # Provide serialized key via 'google_service_account_key'
            return {"google_service_account_key": json.dumps(kfd)}

        if x.get("keyfile"):  # raw JSON string
            return {"google_service_account_key": x["keyfile"]}

        if x.get("key_path"):  # path on worker
            return {"google_service_account": x["key_path"]}

        return {}

    def _set_databricks_acces_variables(self) -> None:
        """Export minimal environment variables for Databricks-hosted Delta.

        Sets:
          * `DATABRICKS_WORKSPACE_URL` from `self.conn.host`
          * `DATABRICKS_ACCESS_TOKEN` from `self.conn.password`

        Notes:
          These variables are used by delta-rs when accessing Databricks.
        """
        if self.conn.host is None or self.conn.password is None:
            msg = "Databricks connection requires both host and password."
            raise ValueError(msg)
        os.environ["DATABRICKS_WORKSPACE_URL"] = self.conn.host
        os.environ["DATABRICKS_ACCESS_TOKEN"] = self.conn.password

    def load_arrow_record_batch_reader(
        self,
        path: str,
        where: str | None = None,
    ) -> pa.RecordBatchReader:
        """Stream Delta Lake rows via DuckDB with an optional filter."""
        full_uri = self._full_uri(path)
        if self.conn.conn_type == "databricks":
            self._set_databricks_acces_variables()
            raise NotImplementedError

        storage_options = self._delta_storage_options()
        dt = DeltaTable(
            full_uri,
            storage_options=storage_options,
            **self.load_options_dict(),
        )
        dataset = dt.to_pyarrow_dataset()
        with self._duckdb_conn() as con:
            con.register("data", dataset)
            query = "select * from data"
            if where:
                query += f" where {where}"
            return con.sql(query).fetch_record_batch()

    def save_arrow_table(self, path: str, table: pa.Table) -> None:
        """Write a PyArrow table to Delta Lake using delta-rs.

        Args:
          path: Relative path to the target Delta table (joined under the
            dataset's base URI).
          table: The `pyarrow.Table` to write.

        Notes:
          * If `conn_type == "databricks"`, this method sets Databricks
            environment variables via `_set_databricks_acces_variables`.
            (The write is otherwise handled by delta-rs for non-Databricks.)
        """
        """True Delta write using delta-rs."""
        table_uri = self._full_uri(path)
        if self.conn.conn_type == "databricks":
            self._set_databricks_acces_variables()
        else:
            storage_options = self._delta_storage_options()
            try:
                engine = (
                    "rust"
                    if self.save_options is not None
                    and self.save_options.schema_mode == SchemaMode.MERGE
                    else "pyarrow"
                )
                write_deltalake(
                    table_or_uri=table_uri,
                    data=table,
                    storage_options=storage_options,
                    engine=engine,
                    **self.save_options_dict(),
                )
            except TypeError:
                # Newer versions use the rust engine and don't take the engine parameter
                write_deltalake(
                    table_or_uri=table_uri,
                    data=table,
                    storage_options=storage_options,
                    **self.save_options_dict(),
                )

_delta_storage_options

_delta_storage_options() -> dict

Build storage_options for delta-rs reads/writes.

The options are derived from the active connection
  • fs (local): returns {}.
  • google_cloud_platform: uses one of:
    • extras.keyfile_dict (dict or JSON string)
    • extras.keyfile (raw JSON string)
    • extras.key_path (path on worker)

Returns:

Type Description
dict

A mapping suitable for the storage_options parameter used by

dict

deltalake.DeltaTable and deltalake.write_deltalake. For GCS,

dict

keys include one of: * google_service_account_key (serialized JSON) * google_service_account (path to keyfile)

Raises:

Type Description
ValueError

If the connection type is not supported.

Source code in src/smallcat/datasets/delta_table_dataset.py
def _delta_storage_options(self) -> dict:
    """Build `storage_options` for delta-rs reads/writes.

    The options are derived from the active connection:
      * `fs` (local): returns `{}`.
      * `google_cloud_platform`: uses one of:
          - `extras.keyfile_dict` (dict or JSON string)
          - `extras.keyfile` (raw JSON string)
          - `extras.key_path` (path on worker)

    Returns:
      A mapping suitable for the `storage_options` parameter used by
      `deltalake.DeltaTable` and `deltalake.write_deltalake`. For GCS,
      keys include one of:
        * `google_service_account_key` (serialized JSON)
        * `google_service_account` (path to keyfile)

    Raises:
      ValueError: If the connection type is not supported.
    """
    if self.conn.conn_type not in ["fs", "google_cloud_platform"]:
        msg = f"Storage options not implemented for type {self.conn.conn_type}"
        raise ValueError(
            msg,
        )
    x = self.extras

    # keyfile_dict can be dict or JSON string
    kfd = x.get("keyfile_dict")
    if isinstance(kfd, str):
        try:
            kfd = json.loads(kfd)
        except json.JSONDecodeError:
            # If it's not JSON, ignore and fall through
            kfd = None

    if isinstance(kfd, dict) and kfd:
        # Provide serialized key via 'google_service_account_key'
        return {"google_service_account_key": json.dumps(kfd)}

    if x.get("keyfile"):  # raw JSON string
        return {"google_service_account_key": x["keyfile"]}

    if x.get("key_path"):  # path on worker
        return {"google_service_account": x["key_path"]}

    return {}

_set_databricks_acces_variables

_set_databricks_acces_variables() -> None

Export minimal environment variables for Databricks-hosted Delta.

Sets
  • DATABRICKS_WORKSPACE_URL from self.conn.host
  • DATABRICKS_ACCESS_TOKEN from self.conn.password
Notes

These variables are used by delta-rs when accessing Databricks.

Source code in src/smallcat/datasets/delta_table_dataset.py
def _set_databricks_acces_variables(self) -> None:
    """Export minimal environment variables for Databricks-hosted Delta.

    Sets:
      * `DATABRICKS_WORKSPACE_URL` from `self.conn.host`
      * `DATABRICKS_ACCESS_TOKEN` from `self.conn.password`

    Notes:
      These variables are used by delta-rs when accessing Databricks.
    """
    if self.conn.host is None or self.conn.password is None:
        msg = "Databricks connection requires both host and password."
        raise ValueError(msg)
    os.environ["DATABRICKS_WORKSPACE_URL"] = self.conn.host
    os.environ["DATABRICKS_ACCESS_TOKEN"] = self.conn.password

load_arrow_record_batch_reader

load_arrow_record_batch_reader(path: str, where: str | None = None) -> pa.RecordBatchReader

Stream Delta Lake rows via DuckDB with an optional filter.

Source code in src/smallcat/datasets/delta_table_dataset.py
def load_arrow_record_batch_reader(
    self,
    path: str,
    where: str | None = None,
) -> pa.RecordBatchReader:
    """Stream Delta Lake rows via DuckDB with an optional filter."""
    full_uri = self._full_uri(path)
    if self.conn.conn_type == "databricks":
        self._set_databricks_acces_variables()
        raise NotImplementedError

    storage_options = self._delta_storage_options()
    dt = DeltaTable(
        full_uri,
        storage_options=storage_options,
        **self.load_options_dict(),
    )
    dataset = dt.to_pyarrow_dataset()
    with self._duckdb_conn() as con:
        con.register("data", dataset)
        query = "select * from data"
        if where:
            query += f" where {where}"
        return con.sql(query).fetch_record_batch()

save_arrow_table

save_arrow_table(path: str, table: pa.Table) -> None

Write a PyArrow table to Delta Lake using delta-rs.

Parameters:

Name Type Description Default
path str

Relative path to the target Delta table (joined under the dataset's base URI).

required
table pa.Table

The pyarrow.Table to write.

required
Notes
  • If conn_type == "databricks", this method sets Databricks environment variables via _set_databricks_acces_variables. (The write is otherwise handled by delta-rs for non-Databricks.)
Source code in src/smallcat/datasets/delta_table_dataset.py
def save_arrow_table(self, path: str, table: pa.Table) -> None:
    """Write a PyArrow table to Delta Lake using delta-rs.

    Args:
      path: Relative path to the target Delta table (joined under the
        dataset's base URI).
      table: The `pyarrow.Table` to write.

    Notes:
      * If `conn_type == "databricks"`, this method sets Databricks
        environment variables via `_set_databricks_acces_variables`.
        (The write is otherwise handled by delta-rs for non-Databricks.)
    """
    """True Delta write using delta-rs."""
    table_uri = self._full_uri(path)
    if self.conn.conn_type == "databricks":
        self._set_databricks_acces_variables()
    else:
        storage_options = self._delta_storage_options()
        try:
            engine = (
                "rust"
                if self.save_options is not None
                and self.save_options.schema_mode == SchemaMode.MERGE
                else "pyarrow"
            )
            write_deltalake(
                table_or_uri=table_uri,
                data=table,
                storage_options=storage_options,
                engine=engine,
                **self.save_options_dict(),
            )
        except TypeError:
            # Newer versions use the rust engine and don't take the engine parameter
            write_deltalake(
                table_or_uri=table_uri,
                data=table,
                storage_options=storage_options,
                **self.save_options_dict(),
            )

from_conn_id classmethod

from_conn_id(conn_id: str, *, load_options: L | None = None, save_options: S | None = None) -> BaseDataset[L, S]

Construct an instance by looking up an Airflow connection ID.

Uses airflow.hooks.base.BaseHook (or the SDK alternative) to fetch the connection and then calls the class constructor.

Parameters:

Name Type Description Default
conn_id str

Airflow connection ID to resolve.

required
load_options L | None

Optional load options model.

None
save_options S | None

Optional save options model.

None

Returns:

Type Description
BaseDataset[L, S]

A fully constructed BaseDataset subclass instance.

Source code in src/smallcat/datasets/base_dataset.py
@classmethod
def from_conn_id(
    cls: type["BaseDataset[L, S]"],
    conn_id: str,
    *,
    load_options: L | None = None,
    save_options: S | None = None,
) -> "BaseDataset[L, S]":
    """Construct an instance by looking up an Airflow connection ID.

    Uses `airflow.hooks.base.BaseHook` (or the SDK alternative) to fetch
    the connection and then calls the class constructor.

    Args:
      conn_id: Airflow connection ID to resolve.
      load_options: Optional load options model.
      save_options: Optional save options model.

    Returns:
      A fully constructed `BaseDataset` subclass instance.
    """
    if BaseHook is None:
        raise RuntimeError("Airflow not available. Install smallcat[airflow]")  # noqa: TRY003, EM101

    conn = BaseHook.get_connection(conn_id)
    return cls(conn=conn, load_options=load_options, save_options=save_options)

load_pandas

load_pandas(path: str, where: str | None = None) -> pd.DataFrame

Load data as a pandas DataFrame.

This is a convenience wrapper over load_arrow_table and pushes down filters when provided.

Parameters:

Name Type Description Default
path str

Relative dataset path.

required
where str | None

Optional SQL filter predicate injected into the query.

None

Returns:

Type Description
pd.DataFrame

A pandas DataFrame.

Source code in src/smallcat/datasets/base_dataset.py
def load_pandas(self, path: str, where: str | None = None) -> pd.DataFrame:
    """Load data as a pandas DataFrame.

    This is a convenience wrapper over `load_arrow_table` and pushes down
    filters when provided.

    Args:
      path: Relative dataset path.
      where: Optional SQL filter predicate injected into the query.

    Returns:
      A pandas `DataFrame`.
    """
    arrow_table = self.load_arrow_table(path=path, where=where)
    return arrow_table.to_pandas()

save_pandas

save_pandas(path: str, df: pd.DataFrame) -> None

Persist a pandas DataFrame.

Converts the DataFrame to a pyarrow.Table and delegates to save_arrow_table.

Parameters:

Name Type Description Default
path str

Relative dataset path.

required
df pd.DataFrame

DataFrame to persist.

required

Returns:

Type Description
None

None

Source code in src/smallcat/datasets/base_dataset.py
def save_pandas(self, path: str, df: pd.DataFrame) -> None:
    """Persist a pandas DataFrame.

    Converts the DataFrame to a `pyarrow.Table` and delegates to
    `save_arrow_table`.

    Args:
        path: Relative dataset path.
        df: DataFrame to persist.

    Returns:
        None
    """
    arrow_table = pa.Table.from_pandas(df)
    self.save_arrow_table(path, table=arrow_table)

Options

smallcat.datasets.delta_table_dataset.DeltaTableLoadOptions

Bases: BaseModel

Options controlling how a Delta table is read.

Attributes:

Name Type Description
version int | None

Optional table version to read.

without_files bool | None

If True, skip listing data files (metadata-only read).

log_buffer_size int | None

Buffer size for reading Delta logs.

Source code in src/smallcat/datasets/delta_table_dataset.py
class DeltaTableLoadOptions(BaseModel):
    """Options controlling how a Delta table is read.

    Attributes:
      version: Optional table version to read.
      without_files: If True, skip listing data files (metadata-only read).
      log_buffer_size: Buffer size for reading Delta logs.
    """

    version: int | None = Field(None)
    without_files: bool | None = Field(None)
    log_buffer_size: int | None = Field(None)
version class-attribute instance-attribute
version: int | None = Field(None)
without_files class-attribute instance-attribute
without_files: bool | None = Field(None)
log_buffer_size class-attribute instance-attribute
log_buffer_size: int | None = Field(None)

smallcat.datasets.delta_table_dataset.DeltaTableSaveOptions

Bases: BaseModel

Options controlling how a Delta table is written.

Attributes:

Name Type Description
mode WriteMode | None

Write mode to apply if the table exists.

partition_by list[str] | None

Columns to partition by (Hive-style directory layout).

schema_mode SchemaMode | None

Strategy to reconcile schema differences during write.

Source code in src/smallcat/datasets/delta_table_dataset.py
class DeltaTableSaveOptions(BaseModel):
    """Options controlling how a Delta table is written.

    Attributes:
      mode: Write mode to apply if the table exists.
      partition_by: Columns to partition by (Hive-style directory layout).
      schema_mode: Strategy to reconcile schema differences during write.
    """

    mode: WriteMode | None = Field(None, description="Write mode for existing tables.")
    partition_by: list[str] | None = Field(
        None,
        description="Columns to partition by (Hive-style directory layout).",
    )
    schema_mode: SchemaMode | None = Field(
        None,
        description="How to handle schema differences on write.",
    )
mode class-attribute instance-attribute
mode: WriteMode | None = Field(None, description='Write mode for existing tables.')
partition_by class-attribute instance-attribute
partition_by: list[str] | None = Field(None, description='Columns to partition by (Hive-style directory layout).')
schema_mode class-attribute instance-attribute
schema_mode: SchemaMode | None = Field(None, description='How to handle schema differences on write.')