The combination of pandas and parquet is the backbone of many data pipelines. However, it can, at times, be a badly performing solution. I would like to show one such case here.
Introduction
Pandas and parquet are both primarily column-oriented/analytical tools. They shine the most when your pipeline is, for example, an aggregation of columns like return df.col1.sum(), df.col2.mean(), df.col3.nunique(), etc., and your data is just numbers. This is because parquet stores data by columns, and you can exploit columnar filters and pandas laying out data in memory by columns and running vectorized operations on them — avoiding both GIL and Python object overhead.
What are examples of pipelines that are row-oriented? A prime one is the building of features — for every row. You want to take the columns, combine/transform them in some way, and output a full row. Thus, the columnar layout gives you more cache misses and does not vectorize well anyway. Numpy is more open-minded regarding memory layout, but then it is less happy with heterogeneous data, e.g., when your data is more than just a numerical matrix.
Toy case study
I will focus on a toy example: our data consist of two columns, one being a random string of fixed length (uuid4), and the other also a string but always having the value ‘static’. Our pipeline is to calculate crc32 hash of either column and sum them for every row — this would be our “feature.” To simulate the “training,” I sum all the features in a dataset.
While the operation is a bit quirky, hashing of string data is a legitimate technique to project them to a fixed-sized type (see feature hashing for more context). In the real world, there would be more features, and their combinations would be more complex. Similarly, training would be a way more involved procedure. But for demonstrating a speedup over pandas+parquet, this is sufficient.
Data format
Let us first address how to store data. I will use a protobuf message, containing just the two byte fields containing the strings.
I will serialize them into a file sequentially, writing the first two bytes with the length of the message and then the message’s bytes themselves.
There are two options we will benchmark:
- gzip the whole file
- gzip every block of 1k messages
Lastly, I will create an index file containing the starting position of every thousandth message.
For one million messages, this custom format is 22M in either option, while parquet is 35M with snappy compression and 22M with gzip + explicating the second column as categorical. If more columns are mutually similar, then the columnar compression would perform comparably worse; if columns have nothing in common, then there is no gain by gzipping it all (as can be seen in this example!).
Processing
With pandas, the case is quite simple — we just call apply(axis=’columns’) with our crc32 + addition, and then sum(). In the ideal world, we never use apply on pandas and rely only on vectorized operations. Alas, feature building is often not expressible via the standard operations — especially when we have complex data types (lists, dictionaries) and not just numbers and invoke specific functions like hashing. The runtime on the one million dataset is about five seconds — independent of the compression algorithm (understandably, since the time inside the apply dominates).
With our custom data format, we explore multiple options:
- sequentially reading message by message: 1.5 seconds
- reading the whole file into memory, then iterating through the array: 1.0 seconds
- reading the whole file into shared memory, processing every chunk of 1k messages independently in a process pool of 4 processes: 0.5 seconds
- memory mapping the whole file, processing every chunk of 1k messages in a process pool: 0.29 seconds
Even the most naive, the first one demonstrates quite a gain over parquet+pandas. Five seconds does not look like a big delay in the first place — but then, with hundreds of features, this would be a very different situation.
The follow-up improvements each demonstrate a particular trick worth knowing. Reading the whole file simultaneously is a boon because we make fewer I/O calls. With larger file sizes, we could not read it at once but would instead read by e.g., 1kB chunks.
The shared memory + pool exploits the fact that we have created the index file as well. That allows us to assign to each worker a region of the file to process independently of others. Since read-only shared memory poses no big overhead and we start only a few processes at the very start of the processing via a fork, this effectively nulls any GIL disadvantage. I also tried multiprocessing.Array, but I was not able to get any good performance with that.
Lastly, memory mapping is the fastest because we don’t have the “read the whole file into memory at the start” — instead, we get served the contents on demand, keeping both I/O and CPU busy. To make this work, we had to use the more complicated format — gzipping every block of 1k messages independently — since we can’t transparently unzip via mmap.
Alternatives
While coming up with a custom data format may sound like a big deal, the bulk of the work is done by protobuf and gzip here. We do a bit of additional bookkeeping.
I have previously explored Avro — but the Python library’s performance was terrible.
Arrow looks at this situation rather similarly to pandas, as far as my skill and understanding reach, and it is similar for Polars.
The tensorflow’s dataset looks like a viable alternative (efficient representation, row-oriented), but I found it quite clunky to get working, imposing Tensor abstractions all over the place, focused primarily on numerical data (as is numpy), and overall too much tied to the tf ecosystem.
Conclusion
I was pleasantly surprised that the performance gains were so large without doing any Rust/Cython. In a further installment, I would explore how much one can squeeze with not-that-much-extra-Rust from the pandas-based (or arrow-based) solution but avoiding the Pythonic apply, and how that would compare to the protobuf-based one with the message2feature handled by Rust as well. A purely Rust solution would not be that interesting — I fear the cohort of Python-only folks will remain large still in the ages to come.
However, this article suggests staying in parquet and pandas for a while. I’m still sticking to it as the default starting solution for every data pipeline due to how flexible, simple, reliable, powerful, and well-implemented it is. The point was that if you have a situation where performance matters and it is not a good fit for parquet+pandas in the first place, you can gain a lot by ditching it even without going too low level.
Code for the best solution
Note: I have not optimized the data generation part anyhow. Readability, modularity, and reliability are also on the “benchmark experiment” level.
# // protobuf schema:
# syntax = "proto3";
# message Message {
# bytes f1 = 1;
# bytes f2 = 2;
#}
# data generation
from uuid import uuid4
import gzip
from proto.data_pb2 import Message
def build_message(e):
pb = Message()
pb.f1 = e.encode()
pb.f2 = 'static'.encode()
s = pb.SerializeToString()
l = len(s).to_bytes(2, 'little')
return l, s
def generate_data(n: int, path: str):
raw = [str(uuid4()).encode() for _ in range(n)]
with open(path, 'wb') as f:
index = []
head = 0
buffer = b""
for i, e in enumerate(raw, start=1):
l, s = build_message(e)
buffer += l
buffer += s
if i%1000 == 0:
buffer_compressed = gzip.compress(buffer)
f.write(buffer_compressed)
buffer = b""
index.append(head)
head += len(buffer_compressed)
if buffer:
f.write(gzip.compress(buffer))
if not index:
index = [0]
with open(path + ".index", 'w') as f:
for e in index:
f.write(f"{e}n")
# data pipeline
from mmap import mmap, PROT_READ
from multiprocessing set_start_method, pool
import zlib
def worker(path: str, start: int, end: int) -> int:
with open(path, 'rb') as f:
data = mmap(f.fileno(), 0, prot=PROT_READ)
deco = gzip.decompress(data[start:end])
i = 0
end = len(deco)
s = 0
while i < end:
l = int.from_bytes(memory[i:i+2], 'little')
p = Message()
p.ParseFromString(memory[i+2:i+2+l])
v = zlib.crc32(p.f1) + zlib.crc32(p.f2)
s += v
i += l+2
return s
def compute(path: str) -> int:
set_start_method('fork')
with open(path, 'rb') as f:
data = mmap(f.fileno(), 0, mmap.PROT_READ)
with open(path + ".index", 'r') as f:
indices = [int(e) for e in f.read().split("n")[:-1]]
with pool.Pool(4) as pl:
L = len(data)
args = [
(p, indices[i], indices[i+1] if i+1 < len(indices) else L)
for i in range(len(indices))
]
results = pl.starmap(worker, args)
s = sum(results)
return s
Data Formats for Row-Oriented Processing was originally published in Better Programming on Medium, where people are continuing the conversation by highlighting and responding to this story.