Artificial Intelligence 16 min read

Building a Production‑Ready Recommendation System with Python, LLR, and ElasticSearch

This tutorial explains how to construct a recommendation system by loading transaction data, creating sparse user‑item and item‑item matrices, applying Log‑Likelihood Ratio for item similarity, and indexing the results into ElasticSearch for real‑time serving, using Python and open‑source big‑data tools.

DataFunTalk
DataFunTalk
DataFunTalk
Building a Production‑Ready Recommendation System with Python, LLR, and ElasticSearch

Recommendation systems aim to discover user preferences by learning from interaction data and automatically suggesting items, such as showing a shoe a user browsed on an e‑commerce site while they browse social or video apps.

The typical data flow includes data sources (e.g., e‑commerce, news, video), data collection (SDK, Nginx logs, crawlers), storage/cleaning (HDFS, Hive), recommendation modeling (collaborative filtering, content filtering, user matrix), and finally delivering results to target users.

Dependencies : install the required Python packages, for example:

pip install numpy
pip install scipy
pip install pandas
pip install jupyter
pip install requests

Using Anaconda3 is recommended to simplify the environment.

Data loading : read the event CSV, inspect the first rows, and identify the columns timestamp , visitorid , event , itemid , transactionid . Filter the data to keep only transaction events:

trans = df[df['event'] == 'transaction']

Extract unique users and items, then limit each user to the most recent 50 transactions to reduce sample size while preserving accuracy:

trans2 = trans.groupby(['visitorid']).head(50)

Map the original IDs to integer indices for matrix construction:

trans2['visitors'] = trans2['visitorid'].apply(lambda x: np.argwhere(visitors == x)[0][0])
trans2['items'] = trans2['itemid'].apply(lambda x: np.argwhere(items == x)[0][0])

Matrix construction : build a sparse user‑item occurrence matrix using scipy.sparse.csr_matrix :

occurences = csr_matrix((visitors.shape[0], items.shape[0]), dtype='int8')
def set_occurences(visitor, item):
    occurences[visitor, item] += 1
trans2.apply(lambda row: set_occurences(row['visitors'], row['items']), axis=1)

Compute the item‑item co‑occurrence matrix and zero out the diagonal:

cooc = occurences.transpose().dot(occurences)
cooc.setdiag(0)

LLR similarity : implement Log‑Likelihood Ratio to score item pairs. The helper functions are:

def xLogX(x):
    return x * np.log(x) if x != 0 else 0.0
def entropy(x1, x2=0, x3=0, x4=0):
    return xLogX(x1 + x2 + x3 + x4) - xLogX(x1) - xLogX(x2) - xLogX(x3) - xLogX(x4)
def LLR(k11, k12, k21, k22):
    rowEntropy = entropy(k11 + k12, k21 + k22)
    colEntropy = entropy(k11 + k21, k12 + k22)
    matEntropy = entropy(k11, k12, k21, k22)
    if rowEntropy + colEntropy < matEntropy:
        return 0.0
    return 2.0 * (rowEntropy + colEntropy - matEntropy)
def rootLLR(k11, k12, k21, k22):
    llr = LLR(k11, k12, k21, k22)
    sqrt = np.sqrt(llr)
    if k11 * 1.0 / (k11 + k12) < k21 * 1.0 / (k21 + k22):
        sqrt = -sqrt
    return sqrt

Iterate over non‑zero entries of the co‑occurrence matrix to compute LLR scores:

row_sum = np.sum(cooc, axis=0).A.flatten()
column_sum = np.sum(cooc, axis=1).A.flatten()
total = np.sum(row_sum)
pp_score = csr_matrix((cooc.shape[0], cooc.shape[1]), dtype='double')
for i, j, v in zip(cooc.tocoo().row, cooc.tocoo().col, cooc.tocoo().data):
    if v != 0:
        k11 = v
        k12 = row_sum[i] - k11
        k21 = column_sum[j] - k11
        k22 = total - k11 - k12 - k21
        pp_score[i, j] = rootLLR(k11, k12, k21, k22)

Sort each row to keep the top similarity indicators (e.g., top 50 with LLR ≥ 5) and prepare bulk update actions for ElasticSearch:

result = np.flip(np.sort(pp_score.A, axis=1), axis=1)
result_indices = np.flip(np.argsort(pp_score.A, axis=1), axis=1)
minLLR = 5
indicators = result[:, :50]
indicators[indicators < minLLR] = 0.0
indicators_indices = result_indices[:, :50]
# Build bulk actions
actions = []
for i in range(indicators.shape[0]):
    length = indicators[i].nonzero()[0].shape[0]
    real_inds = items[indicators_indices[i, :length]].astype('int').tolist()
    id = items[i]
    action = {"index": {"_index": "items2", "_id": str(id)}}
    data = {"id": int(id), "indicators": real_inds}
    actions.append(json.dumps(action))
    actions.append(json.dumps(data))
    if len(actions) == 200:
        actions_string = "\n".join(actions) + "\n"
        requests.post("http://127.0.0.1:9200/_bulk/", headers={"Content-Type": "application/x-ndjson"}, data=actions_string)
        actions = []
if actions:
    actions_string = "\n".join(actions) + "\n"
    requests.post("http://127.0.0.1:9200/_bulk/", headers={"Content-Type": "application/x-ndjson"}, data=actions_string)

After indexing, the recommendation data can be queried via ElasticSearch endpoints such as http://127.0.0.1:9200/items2/_count or http://127.0.0.1:9200/items2/{item_id} to retrieve similar items.

Conclusion : By combining open‑source big‑data components (Hadoop, Hive, Kafka, ElasticSearch) with Python‑based matrix processing and LLR similarity, a scalable, production‑grade recommendation system can be built without excessive complexity.

PythonData Processingrecommendation systemElasticsearchLLRsparse matrix
DataFunTalk
Written by

DataFunTalk

Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.