Practical Guide to Using PyODPS for Flexible Data Processing
The article walks through a first‑time user’s experience with PyODPS, showing how its Python‑based DataFrame API offers more flexible JSON field statistics, multi‑condition filtering, and custom aggregations than traditional ODPS SQL, while noting a steep learning curve and syntax quirks.
This article shares the author’s first experience with PyODPS (the Python SDK for Alibaba MaxCompute) and compares it with traditional ODPS SQL. It demonstrates how PyODPS offers more flexibility for complex JSON field statistics and multi‑condition filtering.
Advantages highlighted include flexible row handling, the ability to load small tables to simplify logic, configurable keyword filtering, and reusable SQL‑style logic for different time windows.
Disadvantages noted are a steep learning curve, slower execution, incomplete documentation, and the fact that Python‑style conditions (e.g., a in a_list , a is None ) are ignored; instead one must use PyODPS methods such as a.isin(a_list) and a.isnull() .
The article explains a common pitfall when mixing Python logical operators with PyODPS DataFrame filters. Below are examples of incorrect and correct condition syntax:
# Incorrect – only the source condition is applied
uv_table = visit_table[
visit_table.key.isin(target_key_list) \
& (visit_table.source == "A")
].groupby(visit_table.target_id)
# Correct – both conditions are applied
uv_table = visit_table[
(visit_table.key.isin(target_key_list)) & (visit_table.source == "A")
].groupby(visit_table.target_id)Recommended practice is to wrap each condition in parentheses and combine them with & (and) or | (or).
A basic PyODPS script scaffold is provided:
from odps.df import DataFrame, Scalar, func, output
# args is a built‑in object containing runtime parameters
bizdate = args["bizdate"]
output_table = "xxxx"
# Load the source table as a DataFrame
data_process_table = DataFrame(o.get_table("xxxx"))
# Load filter words from a MaxCompute resource
import json
filters_words = []
with o.get_resource('filters_words.txt').open('r', encoding='utf-8') as f:
filters_words = json.loads(f.read())
# Apply filter on the DataFrame
data_process_table = data_process_table[data_process_table.content.isin(filters_words)]
data_process_table = data_process_table.query(
" or ".join([f"content.contains('{x}')" for x in filters_words])
)
@output(["content_len"], ["int64"])
def handle(row):
# Row‑wise processing
yield len(row.content)
res_t = data_process_table[data_process_table, data_process_table.apply(handle, axis=1)]
class Agg(object):
def buffer(self):
return {"merge_length": 0}
def __call__(self, buffer, content_len):
if content_len is not None:
buffer["merge_length"] += content_len
def merge(self, buffer, pbuffer):
buffer["merge_length"] += pbuffer["merge_length"]
def getvalue(self, buffer):
return buffer["merge_length"]
to_agg = agg([res_t.content_len], Agg, rtype="int64")
res_t = res_t.groupby("id").agg(value=to_agg)
# Debug output
res_t.head(10)
# Persist result
# res_t.persist(output_table, partition=f"ds='{bizdate}'", drop_partition=True, create_partition=True)The article also discusses handling of list‑type outputs and custom aggregation:
@output(["list_value"], ["list
"])
def handle_list_type(row):
yield [["test1", "test2"]]
@output(["int_value", "string_value"], ["int64", "string"])
def handle_tuple(row):
yield 10, "test"Common errors such as using .agg on a DataFrame instead of a GroupBy object are illustrated, along with the correct pattern:
# Wrong – calling agg on a DataFrame
closely_count_table = data_process_table.groupby('content_len').agg(closely_count=data_process_table.content_len)
# Correct – first obtain GroupBy, then agg
closely_count_table = data_process_table.groupby('content_len')
closely_count_table = closely_count_table.agg(closely_count=closely_count_table.content_len)Finally, the author concludes that PyODPS provides a powerful alternative to pure SQL for large‑scale data engineering, offering column‑level operations, row‑wise custom logic, and flexible aggregation, thus opening a new chapter for data engineers.
DaTaobao Tech
Official account of DaTaobao Technology
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.