Skip to content

SQL-Native Architecture

All data processing in Pilz runs through SQL via DuckDB. Every filter — including the final model rules — is expressed as SQL and executed directly in the database.

Data Loading

Training and test data is loaded from CSV or Parquet files via DuckDB SQL queries:

# src/pilz/service/darkwing.py:92-104
def _get_pl_train_df(self, full_filters, max_eval_fit):
    df = self.get_cached_train_df()
    sql_str = SympyToSqlHelper.to_sql_where(full_filters)

    sql = f"""
            SELECT {", ".join(self.dc.feature_names_sql_save)}
            FROM df
            WHERE {sql_str}
            ORDER BY RANDOM()
            LIMIT {max_eval_fit};
          """
    return duckdb.sql(sql).pl()

The raw CSV is loaded once with Polars and cached. All subsequent filtering, sampling, and aggregation runs as SQL on top of the cached DataFrame via DuckDB.

Filter Translation

Every decision in Pilz is a SymPy boolean expression (using And, Or, Eq, Gt, etc.). These are translated to SQL WHERE clauses recursively:

# src/pilz/model/filter.py:27-73
class SympyToSqlHelper:
    @staticmethod
    def to_sql_where(expr):
        return SympyToSqlHelper._to_sql_impl(expr)

    @staticmethod
    def _to_sql_impl(expr):
        conv = [SympyToSqlHelper._to_sql_impl(arg) for arg in expr.args]

        if isinstance(expr, And):
            return f"( {' AND '.join(conv)} )"
        if isinstance(expr, Or):
            return f"( {' OR '.join(conv)} )"
        if isinstance(expr, Not):
            return f" NOT {conv[0]} "
        if isinstance(expr, Eq):
            return f" {conv[0]}={conv[1]} "
        if isinstance(expr, Ge):
            return f" {conv[0]}>={conv[1]} "
        if isinstance(expr, Symbol):
            name = str(expr)
            if name in SympyToSqlHelper.columns:
                return f'"{name}"'
            return f"'{name}'"
        if isinstance(expr, sympy.core.numbers.Number):
            return f"{float(expr)}"

Filters are created via SympyContainer, which wraps a feature name, operator, and value:

# src/pilz/model/filter.py:76-144
class SympyContainer(BaseModel):
    feat_name: str
    operator: Literal["=", "<", "<=", ">", ">=", "!=", "not in", "in", "between"]
    value: str | int | float | list[str] | list[int] | list[float]

    def get_sympy(self) -> Boolean:
        x = self.get_symbol(self.feat_name, self.assumptions)
        match self.operator:
            case "=":
                return Eq(x, self.get_value_symbol(self.feat_name, self.value))
            case "in":
                return Or(*[Eq(x, value) for value in self.value])
            case "between":
                return And(Gt(x, self.value[0]), Le(x, self.value[1]))
            # ...

class Filter:
    def __init__(self, combine: Boolean):
        self.combine = combine

    def sql(self, do_invert: bool = False) -> str:
        combine = Not(self.combine) if do_invert else self.combine
        return SympyToSqlHelper.to_sql_where(combine)

Evaluation via SQL

During evaluation, the trained model's SQL is executed on the test set:

# src/pilz/service/darkwing.py:125-164
def get_eval_sr(self, pilze, max_parallel_where):
    for name, pilz in pilze.pilze.items():
        if len(pilz.spore) < max_parallel_where:
            case_sql = pilz.get_sql(name)
            df = self._get_pl_eval_df(col_sql=case_sql)
        else:
            # Split large models into batched SQL queries
            where_cases = pilz.get_split_sql(max_parallel_where)
            df = pl.concat([
                self._get_pl_eval_df(col_sql=sub_sql)
                for sub_sql in where_cases
            ], how="horizontal").sum_horizontal().alias(name).to_frame()

    return df.mean_horizontal().alias(pilze.target)

def _get_pl_eval_df(self, col_sql):
    sql = f"SELECT {col_sql}\nFROM df;"
    return duckdb.sql(sql).pl()

Model SQL Output

A trained Pilz model can generate SQL directly:

# src/pilz/model/pilz.py:18-32
class Pilz(BaseModel):
    spore: list[Spore]
    target: str | int

    def get_sql(self, res_name: str) -> str:
        return "CASE \n" + "\n".join(self.get_where_sql()) + f"END AS {res_name}"

    def get_split_sql(self, max_parallel_where):
        where_cases = self.get_where_sql()
        return [
            "CASE \n" + "\n".join(batch) + "\nELSE CAST(0 AS DOUBLE)\n END AS res_{i}"
            for i, batch in enumerate(batched(where_cases, max_parallel_where))
        ]

    def get_where_sql(self) -> list[str]:
        return [
            f"  WHEN {' AND '.join(spore.cut)} THEN CAST({spore.score} AS DOUBLE)"
            for spore in self.spore
        ]

Each spore becomes one WHEN clause. For models with many spores, get_split_sql() batches them into parallel SQL queries to stay within database limits.

Cross-Process SQL Generation

The Pilze wrapper generates SQL for all target classes:

# src/pilz/model/pilz.py:35-43
class Pilze(BaseModel):
    pilze: dict[str, Pilz]
    target: str

    def get_sql(self) -> dict[str, str]:
        return {
            f"res_{name}": pilz.get_sql(f"res_{name}")
            for name, pilz in self.pilze.items()
        }

This produces one CASE WHEN expression per target class. For multi-class problems, the class with the highest score wins.

Summary

Component SQL Role
Data loading SELECT ... FROM df WHERE ... ORDER BY RANDOM() LIMIT ...
Filter translation SymPy → SQL WHERE clause (handles AND, OR, comparisons)
Training queries Separate target/non-target queries with random sampling
Evaluation Execute model SQL on test set via DuckDB
Model output CASE WHEN cond1 AND cond2 THEN score ELSE 0 END

Next Steps