Client-side Complex Event Processing with Flink CEP and Python
The article describes how Xianyu’s recommendation system shifts complex event processing from server‑side Blink to client‑side Python using Flink CEP concepts, detailing the NFA‑based state and transition model, pattern‑building API, aggregation support, achieving sub‑second execution with modest memory, and outlines future optimizations such as NFA persistence, windowing, DSL script generation, and C++/TensorFlow Lite acceleration.
Background: Users on Xianyu need content recommendation based on user profiles; server‑side Blink faces resource limits.
Solution: Move CEP (Complex Event Processing) to the client side using Flink CEP concepts.
CEP model: Flink CEP uses an NFA (Non‑deterministic Finite Automaton) to match patterns in event streams. States and transitions are defined, with actions take , proceed , and ignore .
State definition example:
class State(object):
def __init__(self, name, state_type):
self.__name = name # node name, same as Pattern name
self.__state_type = state_type # Start/Normal/Stop/Final
self.__state_transitions = [] # outgoing edgesTransition definition example:
class StateTransition:
def __init__(self, source_state, action, target_state, condition):
self.__source_state = source_state # start node
self.__action = action # take / ignore / proceed
self.__target_state = target_state # end node
self.__condition = condition # guard conditionConsuming strategies (STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY, NOT_FOLLOW, NOT_NEXT) determine how patterns are matched.
Pattern API (Python) builds a linked list of Pattern objects which the compiler converts to an NFA.
Example pattern building:
Pattern.begin("start").where(SimpleCondition())\
.followed_by('middle').where(SimpleCondition())\
.next_('end').where(SimpleCondition())The compiler creates a Final node, then intermediate Normal/Stop nodes in reverse order, and finally a Start node, linking them via StateTransition objects.
Python CEP aggregation adds an AggregationState between the $end$ node and a group_by node to perform aggregations on matched events.
Sample Python CEP script:
_pattern = Pattern.begin("e1").where(KVCondition('scene','Page_xyItemDetail')).times(3)
_cep = CEP.pattern(_batch_data['eventSeq'], _pattern)
def select_function(data):
pass
self.result = _cep.select(select_function)Performance: client‑side execution time stays under 1 s (including a network request); a single script runs ~100 ms, memory peak ~15 MB. Optimizations under discussion include persisting NFA state to avoid repeated computation and re‑implementing the engine in C++ for higher throughput.
Future work: reduce repeated NFA creation, introduce window mechanisms, support group patterns, generate scripts via a DSL, and explore TensorFlow Lite for parameter‑level optimization.
Xianyu Technology
Official account of the Xianyu technology team
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.