This short article shows how decorators, a feature of Python, can be used to implement a Pipeline-y thing — think “feature selection pipeline.” We do it in a way that’s easy to extend — both in adding new pipeline steps and broadening individual steps’ core functionality.
Starting implementation
Let’s see first what we can do without any syntactic sugar. A “feature selection” is a function we run on our dataset of features, and it tells us, “Hey, features A and B are lousy, drop them now to have better ML models later.” There are various feature selection techniques — so ideally, you employ multiple of them in a sequence:
def eliminate_constant_features(df: pd.DataFrame) -> pd.DataFrame:
"""if a feature has a single constant value, drop it"""
stats = df.agg("nunique")
constant_features = list(stats[stats == 1].index)
return df.drop(columns=constant_features)
def eliminate_id_features(df: pd.DataFrame) -> pd.DataFrame:
"""if a feature has unique value for *each* row, drop it"""
stats = df.agg("nunique")
id_features = list(stats[stats == df.shape[0]].index)
return df.drop(columns=id_features)
def feature_selection_pipeline(df: pd.DataFrame) -> pd.DataFrame:
return eliminate_constant_features(eliminate_id_features(df))
Those are two trivial examples of what a feature selection may look like. However, there is an ugly part that should strike us — the implementation of the feature_selection_pipeline. Can we do better? Here’s a straightforward example:
def feature_selection_pipeline(df: pd.DataFrame) -> pd.DataFrame:
step1 = eliminate_constant_features(df)
step2 = eliminate_id_features(step1)
return step2
This is a bit better, but as you add more steps and change their order, you risk introducing a bug if you don’t rename the intermediate variables correctly. Why should you even have those variables?
Reduce to the rescue
Transformation = typing.Callable[[pd.DataFrame], pd.DataFrame]
Pipeline = list[Transformation]
my_pipeline: Pipeline = [
eliminate_constant_features,
eliminate_id_features
]
def apply_pipeline(df: pd.DataFrame, pipeline: Pipeline) -> pd.DataFrame:
return functools.reduce(lambda df, func: func(df), pipeline, df)
Though apply_pipeline is not as visually appealing, it offers a rather nice my_pipeline definition in a list without intermediate variables to pass data.
Can we do even better? Possibly — I have two concerns with this code:
- It is still required to define the function in one place and then use it in another
- When the function has other parameters than just df — for example, for some thresholds or configurable constants — you need to magicalize them via lambda in the my_pipeline declaration. After that, you’ll obtain the config who-knows-where.
Decorate, not elaborate
Let’s address the repeated declaration first. We will introduce a decorator, @feature_filter, which will register the given function in a certain singleton. When that’s done, we’ll use it to execute all the steps, as shown below:
pipeline: list[Transformation] = []
def feature_filter(f: Transformation) -> Transformation:
@functools.wraps(f)
def wrapper(df: pd.DataFrame) -> pd.DataFrame:
return f(df)
pipeline.append(wrapper)
return wrapper
def apply_pipeline(df: pd.DataFrame) -> pd.DataFrame:
return functools.reduce(lambda df, func: func(df), pipeline, df)
Now, we just prepend every def eliminate_<somehow>_features with @feature_filter, and they all get automatically applied. However, we seemed to have lost a bit of control just to get rid of a single parameter and repeated declaration. How do we turn some steps off conditionally? How do we configure them?
Pydantic to make the concept gigantic
Pydantic is a nice library to represent your configuration models in the code, so we will do that. Imagine we extend our removal of constant features to support “number of unique values is less than some_constant” instead. Here’s what that code will look like:
class NearConstantFeaturesConfig(pydantic.BaseModel):
some_constant: int
def remove_near_constant_features(
df: pd.DataFrame,
near_constant_features_config: NearConstantFeaturesConfig
) -> pd.DataFrame:
stats = df.agg("nunique")
constant_features = list(stats[
stats < near_constant_features_config.some_constant
].index)
return df.drop(columns=constant_features)
class PipelineConfig(pydantic.BaseModel):
near_constant_features_config: typing.Optional[NearConstantFeaturesConfig]
Huh, nice config object we have here. But how to get it to the pipeline, especially since we define it inside some arcane decorator? The point is that the decorator can do much more, as you can see below:
AnyConfig = TypeVar("AnyConfig", bound=pydantic.BaseModel)
Transformation = typing.Callable[[pd.DataFrame, AnyConfig], pd.DataFrame]
TransformationLifted = typing.Callable[[pd.DataFrame, PipelineConfig], pd.DataFrame]
pipeline: list[TransformationLifted] = []
def feature_filter(f: Transformation) -> Transformation:
@functools.wraps(f)
def wrapper(df: pd.DataFrame, config: PipelineConfig) -> pd.DataFrame):
specs = inspect.getfullargspec(f)
arg_name = specs.args[1]
arg_val = config.__dict__[arg_name]
if arg_val:
return f(df, arg_val)
else:
return df
pipeline.append(wrapper)
return f
@dataclasses.dataclass
class Accumulator():
df: pd.DataFrame
config: PipelineConfig
def apply_pipeline(df: pd.DataFrame, config: PipelineConfig) -> pd.DataFrame:
return functools.reduce(
lambda acc, func: acc.replace(df=func(acc.df, acc.config)),
pipeline,
Accumulator(df, config)
)
Whoa, whoa! Lots of weirdo code! The change happens because we pass the big config object, PipelineConfig, to the pipeline, and it can selectively decide to apply individual registered functions based on whether their config was provided.
Looks like a lot of code to achieve one small thing, right? Imagine you have about thirty of those transformation functions defined now — each with a non-trivial config. The general-purpose decorator does not change, and you don’t need to maintain the explicit feature transformations list.
You still need to have the large PipelineConfig object — but that would be required anyway if you want to read that config from somewhere. If you forget to add your new @feature_filter-decorated function to the PipelineConfig, your code will crash at the first unit test. Why? Because the decorator will attempt to access a nonexistent object. Not as good as mypy giving you a block, but better than nothing.
To boldly extend a bit further
Often, nightmares come when you have used a certain design broadly, say, you wrote 30 different feature selection pipelines, and then you need to extend their depth. For example, not just remove the features, but collect a report of which function removed which feature. In the case of non-robust design, this could entail going through all 30 functions and extending them individually. Is our design robust?
Verily. We have decoupled the combination-application logic from the functions themselves. Thus, we can apply those few changes to the core code without touching any of the individual pipeline members:
@dataclasses.dataclass
class Accumulator():
df: pd.DataFrame
report: dict[str, set[str]] = dataclasses.field(default_factory=dict)
config: PipelineConfig
TransformationLifted = typing.Callable[[pd.DataFrame, PipelineConfig], tuple[pd.DataFrame, dict[str, set[str]]]]
def feature_filter(f: Transformation) -> Transformation:
@functools.wraps(f)
def wrapper(df: pd.DataFrame, config: PipelineConfig) -> pd.DataFrame):
specs = inspect.getfullargspec(f)
arg_name = specs.args[1]
arg_val = config.__dict__[arg_name]
if arg_val:
result = f(df, arg_val)
removed = set(df.columns).difference(result.columns)
return (result, {arg_name: removed})
else:
return (df, {})
pipeline.append(wrapper)
return f
def apply_pipeline(df: pd.DataFrame, config: PipelineConfig) -> pd.DataFrame:
def _step(acc: Accumulator, func: TransformationLifted) -> Accumulator:
result = func(acc.df, acc.config)
return Accumulator(result[0], {**acc.report, **result[1]}, acc.config)
return functools.reduce(
lambda acc, func: acc.replace(df=func(acc.df, acc.config)),
pipeline,
Accumulator(df, {}, config)
)
(In the real code, I would have introduced a few more data classes instead of tuples — here, I keep it brief).
And we don’t have to stop here — what if we exclude features based on how they co-relate with the labels? We can introduce a new decorator, @feature_label_filter, with a different signature and extend the internals of the apply_pipeline and Accumulator to process that correctly, without any changes to the existing function.
Closing Thoughts
With a bit of hassle under the hood, one can end up with a mechanism that allows for a neat developer experience that can be extended in depth and breadth. The core of the argument was about decoupling the function application, which required us to combine the function definition.
Technically, it was backed by a decorator doing registration under the hood and by a config object used as a dependency injection/lookup. Both patterns find usage in other situations, not just data pipelines.
There is a certain categorical backstory behind all of this, but that can wait for another article (and perhaps another language).
The hassle is not worth it if you don’t expect those 30+ functions to be decorated and configured a lot. If all your pipeline will ever consist of is two functions applied in a sequence with reasonable defaults, you have my full blessing to stick to the sequential application.
Python Case Study: Decorators and Pipelines was originally published in Better Programming on Medium, where people are continuing the conversation by highlighting and responding to this story.