SoatDev IT Consulting
SoatDev IT Consulting
  • About us
  • Expertise
  • Services
  • How it works
  • Contact Us
  • News
  • June 20, 2023
  • Rss Fetcher
Photo by Firmbee.com on Unsplash

This short article shows how decorators, a feature of Python, can be used to implement a Pipeline-y thing — think “feature selection pipeline.” We do it in a way that’s easy to extend — both in adding new pipeline steps and broadening individual steps’ core functionality.

Starting implementation

Let’s see first what we can do without any syntactic sugar. A “feature selection” is a function we run on our dataset of features, and it tells us, “Hey, features A and B are lousy, drop them now to have better ML models later.” There are various feature selection techniques — so ideally, you employ multiple of them in a sequence:

def eliminate_constant_features(df: pd.DataFrame) -> pd.DataFrame:
"""if a feature has a single constant value, drop it"""
stats = df.agg("nunique")
constant_features = list(stats[stats == 1].index)
return df.drop(columns=constant_features)

def eliminate_id_features(df: pd.DataFrame) -> pd.DataFrame:
"""if a feature has unique value for *each* row, drop it"""
stats = df.agg("nunique")
id_features = list(stats[stats == df.shape[0]].index)
return df.drop(columns=id_features)

def feature_selection_pipeline(df: pd.DataFrame) -> pd.DataFrame:
return eliminate_constant_features(eliminate_id_features(df))

Those are two trivial examples of what a feature selection may look like. However, there is an ugly part that should strike us — the implementation of the feature_selection_pipeline. Can we do better? Here’s a straightforward example:

def feature_selection_pipeline(df: pd.DataFrame) -> pd.DataFrame:
step1 = eliminate_constant_features(df)
step2 = eliminate_id_features(step1)
return step2

This is a bit better, but as you add more steps and change their order, you risk introducing a bug if you don’t rename the intermediate variables correctly. Why should you even have those variables?

Reduce to the rescue

Transformation = typing.Callable[[pd.DataFrame], pd.DataFrame]
Pipeline = list[Transformation]
my_pipeline: Pipeline = [
eliminate_constant_features,
eliminate_id_features
]

def apply_pipeline(df: pd.DataFrame, pipeline: Pipeline) -> pd.DataFrame:
return functools.reduce(lambda df, func: func(df), pipeline, df)

Though apply_pipeline is not as visually appealing, it offers a rather nice my_pipeline definition in a list without intermediate variables to pass data.

Can we do even better? Possibly — I have two concerns with this code:

  • It is still required to define the function in one place and then use it in another
  • When the function has other parameters than just df — for example, for some thresholds or configurable constants — you need to magicalize them via lambda in the my_pipeline declaration. After that, you’ll obtain the config who-knows-where.

Decorate, not elaborate

Let’s address the repeated declaration first. We will introduce a decorator, @feature_filter, which will register the given function in a certain singleton. When that’s done, we’ll use it to execute all the steps, as shown below:

pipeline: list[Transformation] = []

def feature_filter(f: Transformation) -> Transformation:
@functools.wraps(f)
def wrapper(df: pd.DataFrame) -> pd.DataFrame:
return f(df)

pipeline.append(wrapper)
return wrapper

def apply_pipeline(df: pd.DataFrame) -> pd.DataFrame:
return functools.reduce(lambda df, func: func(df), pipeline, df)

Now, we just prepend every def eliminate_<somehow>_features with @feature_filter, and they all get automatically applied. However, we seemed to have lost a bit of control just to get rid of a single parameter and repeated declaration. How do we turn some steps off conditionally? How do we configure them?

Pydantic to make the concept gigantic

Pydantic is a nice library to represent your configuration models in the code, so we will do that. Imagine we extend our removal of constant features to support “number of unique values is less than some_constant” instead. Here’s what that code will look like:

class NearConstantFeaturesConfig(pydantic.BaseModel):
some_constant: int

def remove_near_constant_features(
df: pd.DataFrame,
near_constant_features_config: NearConstantFeaturesConfig
) -> pd.DataFrame:
stats = df.agg("nunique")
constant_features = list(stats[
stats < near_constant_features_config.some_constant
].index)
return df.drop(columns=constant_features)

class PipelineConfig(pydantic.BaseModel):
near_constant_features_config: typing.Optional[NearConstantFeaturesConfig]

Huh, nice config object we have here. But how to get it to the pipeline, especially since we define it inside some arcane decorator? The point is that the decorator can do much more, as you can see below:

AnyConfig = TypeVar("AnyConfig", bound=pydantic.BaseModel)
Transformation = typing.Callable[[pd.DataFrame, AnyConfig], pd.DataFrame]
TransformationLifted = typing.Callable[[pd.DataFrame, PipelineConfig], pd.DataFrame]
pipeline: list[TransformationLifted] = []

def feature_filter(f: Transformation) -> Transformation:
@functools.wraps(f)
def wrapper(df: pd.DataFrame, config: PipelineConfig) -> pd.DataFrame):
specs = inspect.getfullargspec(f)
arg_name = specs.args[1]
arg_val = config.__dict__[arg_name]

if arg_val:
return f(df, arg_val)
else:
return df
pipeline.append(wrapper)
return f

@dataclasses.dataclass
class Accumulator():
df: pd.DataFrame
config: PipelineConfig

def apply_pipeline(df: pd.DataFrame, config: PipelineConfig) -> pd.DataFrame:
return functools.reduce(
lambda acc, func: acc.replace(df=func(acc.df, acc.config)),
pipeline,
Accumulator(df, config)
)

Whoa, whoa! Lots of weirdo code! The change happens because we pass the big config object, PipelineConfig, to the pipeline, and it can selectively decide to apply individual registered functions based on whether their config was provided.

Looks like a lot of code to achieve one small thing, right? Imagine you have about thirty of those transformation functions defined now — each with a non-trivial config. The general-purpose decorator does not change, and you don’t need to maintain the explicit feature transformations list.

You still need to have the large PipelineConfig object — but that would be required anyway if you want to read that config from somewhere. If you forget to add your new @feature_filter-decorated function to the PipelineConfig, your code will crash at the first unit test. Why? Because the decorator will attempt to access a nonexistent object. Not as good as mypy giving you a block, but better than nothing.

To boldly extend a bit further

Often, nightmares come when you have used a certain design broadly, say, you wrote 30 different feature selection pipelines, and then you need to extend their depth. For example, not just remove the features, but collect a report of which function removed which feature. In the case of non-robust design, this could entail going through all 30 functions and extending them individually. Is our design robust?

Verily. We have decoupled the combination-application logic from the functions themselves. Thus, we can apply those few changes to the core code without touching any of the individual pipeline members:

@dataclasses.dataclass
class Accumulator():
df: pd.DataFrame
report: dict[str, set[str]] = dataclasses.field(default_factory=dict)
config: PipelineConfig

TransformationLifted = typing.Callable[[pd.DataFrame, PipelineConfig], tuple[pd.DataFrame, dict[str, set[str]]]]

def feature_filter(f: Transformation) -> Transformation:
@functools.wraps(f)
def wrapper(df: pd.DataFrame, config: PipelineConfig) -> pd.DataFrame):
specs = inspect.getfullargspec(f)
arg_name = specs.args[1]
arg_val = config.__dict__[arg_name]

if arg_val:
result = f(df, arg_val)
removed = set(df.columns).difference(result.columns)
return (result, {arg_name: removed})
else:
return (df, {})
pipeline.append(wrapper)
return f

def apply_pipeline(df: pd.DataFrame, config: PipelineConfig) -> pd.DataFrame:
def _step(acc: Accumulator, func: TransformationLifted) -> Accumulator:
result = func(acc.df, acc.config)
return Accumulator(result[0], {**acc.report, **result[1]}, acc.config)

return functools.reduce(
lambda acc, func: acc.replace(df=func(acc.df, acc.config)),
pipeline,
Accumulator(df, {}, config)
)

(In the real code, I would have introduced a few more data classes instead of tuples — here, I keep it brief).

And we don’t have to stop here — what if we exclude features based on how they co-relate with the labels? We can introduce a new decorator, @feature_label_filter, with a different signature and extend the internals of the apply_pipeline and Accumulator to process that correctly, without any changes to the existing function.

Closing Thoughts

With a bit of hassle under the hood, one can end up with a mechanism that allows for a neat developer experience that can be extended in depth and breadth. The core of the argument was about decoupling the function application, which required us to combine the function definition.

Technically, it was backed by a decorator doing registration under the hood and by a config object used as a dependency injection/lookup. Both patterns find usage in other situations, not just data pipelines.

There is a certain categorical backstory behind all of this, but that can wait for another article (and perhaps another language).

The hassle is not worth it if you don’t expect those 30+ functions to be decorated and configured a lot. If all your pipeline will ever consist of is two functions applied in a sequence with reasonable defaults, you have my full blessing to stick to the sequential application.


Python Case Study: Decorators and Pipelines was originally published in Better Programming on Medium, where people are continuing the conversation by highlighting and responding to this story.

Previous Post
Next Post

Recent Posts

  • Sage Unveils AI Trust Label to Empower SMB’s
  • How African Startups Are Attracting Global Fintech Funding
  • After its data was wiped, KiranaPro’s co-founder cannot rule out an external hack
  • Meet the Finalists: VivaTech’s 5 Most Visionary Startups of 2025
  • Trump fast-tracks supersonic travel, amid spate of flight-related executive orders

Categories

  • Industry News
  • Programming
  • RSS Fetched Articles
  • Uncategorized

Archives

  • June 2025
  • May 2025
  • April 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023

Tap into the power of Microservices, MVC Architecture, Cloud, Containers, UML, and Scrum methodologies to bolster your project planning, execution, and application development processes.

Solutions

  • IT Consultation
  • Agile Transformation
  • Software Development
  • DevOps & CI/CD

Regions Covered

  • Montreal
  • New York
  • Paris
  • Mauritius
  • Abidjan
  • Dakar

Subscribe to Newsletter

Join our monthly newsletter subscribers to get the latest news and insights.

© Copyright 2023. All Rights Reserved by Soatdev IT Consulting Inc.