Big Data 13 min read

Client-side Complex Event Processing with Flink CEP and Python

The article describes how Xianyu’s recommendation system shifts complex event processing from server‑side Blink to client‑side Python using Flink CEP concepts, detailing the NFA‑based state and transition model, pattern‑building API, aggregation support, achieving sub‑second execution with modest memory, and outlines future optimizations such as NFA persistence, windowing, DSL script generation, and C++/TensorFlow Lite acceleration.

Xianyu Technology
Xianyu Technology
Xianyu Technology
Client-side Complex Event Processing with Flink CEP and Python

Background: Users on Xianyu need content recommendation based on user profiles; server‑side Blink faces resource limits.

Solution: Move CEP (Complex Event Processing) to the client side using Flink CEP concepts.

CEP model: Flink CEP uses an NFA (Non‑deterministic Finite Automaton) to match patterns in event streams. States and transitions are defined, with actions take , proceed , and ignore .

State definition example:

class State(object):
    def __init__(self, name, state_type):
        self.__name = name  # node name, same as Pattern name
        self.__state_type = state_type  # Start/Normal/Stop/Final
        self.__state_transitions = []  # outgoing edges

Transition definition example:

class StateTransition:
    def __init__(self, source_state, action, target_state, condition):
        self.__source_state = source_state  # start node
        self.__action = action  # take / ignore / proceed
        self.__target_state = target_state  # end node
        self.__condition = condition  # guard condition

Consuming strategies (STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY, NOT_FOLLOW, NOT_NEXT) determine how patterns are matched.

Pattern API (Python) builds a linked list of Pattern objects which the compiler converts to an NFA.

Example pattern building:

Pattern.begin("start").where(SimpleCondition())\
    .followed_by('middle').where(SimpleCondition())\
    .next_('end').where(SimpleCondition())

The compiler creates a Final node, then intermediate Normal/Stop nodes in reverse order, and finally a Start node, linking them via StateTransition objects.

Python CEP aggregation adds an AggregationState between the $end$ node and a group_by node to perform aggregations on matched events.

Sample Python CEP script:

_pattern = Pattern.begin("e1").where(KVCondition('scene','Page_xyItemDetail')).times(3)
_cep = CEP.pattern(_batch_data['eventSeq'], _pattern)

def select_function(data):
    pass

self.result = _cep.select(select_function)

Performance: client‑side execution time stays under 1 s (including a network request); a single script runs ~100 ms, memory peak ~15 MB. Optimizations under discussion include persisting NFA state to avoid repeated computation and re‑implementing the engine in C++ for higher throughput.

Future work: reduce repeated NFA creation, introduce window mechanisms, support group patterns, generate scripts via a DSL, and explore TensorFlow Lite for parameter‑level optimization.

CEPNFAClientSideFlinkpythonStreamProcessing
Xianyu Technology
Written by

Xianyu Technology

Official account of the Xianyu technology team

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.