SQL-Native Architecture¶
All data processing in Pilz runs through SQL via DuckDB. Every filter — including the final model rules — is expressed as SQL and executed directly in the database.
Data Loading¶
Training and test data is loaded from CSV or Parquet files via DuckDB SQL queries:
# src/pilz/service/darkwing.py:92-104
def _get_pl_train_df(self, full_filters, max_eval_fit):
df = self.get_cached_train_df()
sql_str = SympyToSqlHelper.to_sql_where(full_filters)
sql = f"""
SELECT {", ".join(self.dc.feature_names_sql_save)}
FROM df
WHERE {sql_str}
ORDER BY RANDOM()
LIMIT {max_eval_fit};
"""
return duckdb.sql(sql).pl()
The raw CSV is loaded once with Polars and cached. All subsequent filtering, sampling, and aggregation runs as SQL on top of the cached DataFrame via DuckDB.
Filter Translation¶
Every decision in Pilz is a SymPy boolean expression (using And, Or, Eq, Gt, etc.). These are translated to SQL WHERE clauses recursively:
# src/pilz/model/filter.py:27-73
class SympyToSqlHelper:
@staticmethod
def to_sql_where(expr):
return SympyToSqlHelper._to_sql_impl(expr)
@staticmethod
def _to_sql_impl(expr):
conv = [SympyToSqlHelper._to_sql_impl(arg) for arg in expr.args]
if isinstance(expr, And):
return f"( {' AND '.join(conv)} )"
if isinstance(expr, Or):
return f"( {' OR '.join(conv)} )"
if isinstance(expr, Not):
return f" NOT {conv[0]} "
if isinstance(expr, Eq):
return f" {conv[0]}={conv[1]} "
if isinstance(expr, Ge):
return f" {conv[0]}>={conv[1]} "
if isinstance(expr, Symbol):
name = str(expr)
if name in SympyToSqlHelper.columns:
return f'"{name}"'
return f"'{name}'"
if isinstance(expr, sympy.core.numbers.Number):
return f"{float(expr)}"
Filters are created via SympyContainer, which wraps a feature name, operator, and value:
# src/pilz/model/filter.py:76-144
class SympyContainer(BaseModel):
feat_name: str
operator: Literal["=", "<", "<=", ">", ">=", "!=", "not in", "in", "between"]
value: str | int | float | list[str] | list[int] | list[float]
def get_sympy(self) -> Boolean:
x = self.get_symbol(self.feat_name, self.assumptions)
match self.operator:
case "=":
return Eq(x, self.get_value_symbol(self.feat_name, self.value))
case "in":
return Or(*[Eq(x, value) for value in self.value])
case "between":
return And(Gt(x, self.value[0]), Le(x, self.value[1]))
# ...
class Filter:
def __init__(self, combine: Boolean):
self.combine = combine
def sql(self, do_invert: bool = False) -> str:
combine = Not(self.combine) if do_invert else self.combine
return SympyToSqlHelper.to_sql_where(combine)
Evaluation via SQL¶
During evaluation, the trained model's SQL is executed on the test set:
# src/pilz/service/darkwing.py:125-164
def get_eval_sr(self, pilze, max_parallel_where):
for name, pilz in pilze.pilze.items():
if len(pilz.spore) < max_parallel_where:
case_sql = pilz.get_sql(name)
df = self._get_pl_eval_df(col_sql=case_sql)
else:
# Split large models into batched SQL queries
where_cases = pilz.get_split_sql(max_parallel_where)
df = pl.concat([
self._get_pl_eval_df(col_sql=sub_sql)
for sub_sql in where_cases
], how="horizontal").sum_horizontal().alias(name).to_frame()
return df.mean_horizontal().alias(pilze.target)
def _get_pl_eval_df(self, col_sql):
sql = f"SELECT {col_sql}\nFROM df;"
return duckdb.sql(sql).pl()
Model SQL Output¶
A trained Pilz model can generate SQL directly:
# src/pilz/model/pilz.py:18-32
class Pilz(BaseModel):
spore: list[Spore]
target: str | int
def get_sql(self, res_name: str) -> str:
return "CASE \n" + "\n".join(self.get_where_sql()) + f"END AS {res_name}"
def get_split_sql(self, max_parallel_where):
where_cases = self.get_where_sql()
return [
"CASE \n" + "\n".join(batch) + "\nELSE CAST(0 AS DOUBLE)\n END AS res_{i}"
for i, batch in enumerate(batched(where_cases, max_parallel_where))
]
def get_where_sql(self) -> list[str]:
return [
f" WHEN {' AND '.join(spore.cut)} THEN CAST({spore.score} AS DOUBLE)"
for spore in self.spore
]
Each spore becomes one WHEN clause. For models with many spores, get_split_sql() batches them into parallel SQL queries to stay within database limits.
Cross-Process SQL Generation¶
The Pilze wrapper generates SQL for all target classes:
# src/pilz/model/pilz.py:35-43
class Pilze(BaseModel):
pilze: dict[str, Pilz]
target: str
def get_sql(self) -> dict[str, str]:
return {
f"res_{name}": pilz.get_sql(f"res_{name}")
for name, pilz in self.pilze.items()
}
This produces one CASE WHEN expression per target class. For multi-class problems, the class with the highest score wins.
Summary¶
| Component | SQL Role |
|---|---|
| Data loading | SELECT ... FROM df WHERE ... ORDER BY RANDOM() LIMIT ... |
| Filter translation | SymPy → SQL WHERE clause (handles AND, OR, comparisons) |
| Training queries | Separate target/non-target queries with random sampling |
| Evaluation | Execute model SQL on test set via DuckDB |
| Model output | CASE WHEN cond1 AND cond2 THEN score ELSE 0 END |
Next Steps¶
- Three-Way Splits — How the splits work
- Training Internals — Full algorithm reference
- SQL Rules for Deployment — Deploying models in production