Building a Production‑Ready Recommendation System with Python, LLR, and ElasticSearch
This tutorial explains how to construct a recommendation system by loading transaction data, creating sparse user‑item and item‑item matrices, applying Log‑Likelihood Ratio for item similarity, and indexing the results into ElasticSearch for real‑time serving, using Python and open‑source big‑data tools.
Recommendation systems aim to discover user preferences by learning from interaction data and automatically suggesting items, such as showing a shoe a user browsed on an e‑commerce site while they browse social or video apps.
The typical data flow includes data sources (e.g., e‑commerce, news, video), data collection (SDK, Nginx logs, crawlers), storage/cleaning (HDFS, Hive), recommendation modeling (collaborative filtering, content filtering, user matrix), and finally delivering results to target users.
Dependencies : install the required Python packages, for example:
pip install numpy pip install scipy pip install pandas pip install jupyter pip install requestsUsing Anaconda3 is recommended to simplify the environment.
Data loading : read the event CSV, inspect the first rows, and identify the columns timestamp , visitorid , event , itemid , transactionid . Filter the data to keep only transaction events:
trans = df[df['event'] == 'transaction']Extract unique users and items, then limit each user to the most recent 50 transactions to reduce sample size while preserving accuracy:
trans2 = trans.groupby(['visitorid']).head(50)Map the original IDs to integer indices for matrix construction:
trans2['visitors'] = trans2['visitorid'].apply(lambda x: np.argwhere(visitors == x)[0][0]) trans2['items'] = trans2['itemid'].apply(lambda x: np.argwhere(items == x)[0][0])Matrix construction : build a sparse user‑item occurrence matrix using scipy.sparse.csr_matrix :
occurences = csr_matrix((visitors.shape[0], items.shape[0]), dtype='int8') def set_occurences(visitor, item):
occurences[visitor, item] += 1 trans2.apply(lambda row: set_occurences(row['visitors'], row['items']), axis=1)Compute the item‑item co‑occurrence matrix and zero out the diagonal:
cooc = occurences.transpose().dot(occurences) cooc.setdiag(0)LLR similarity : implement Log‑Likelihood Ratio to score item pairs. The helper functions are:
def xLogX(x):
return x * np.log(x) if x != 0 else 0.0 def entropy(x1, x2=0, x3=0, x4=0):
return xLogX(x1 + x2 + x3 + x4) - xLogX(x1) - xLogX(x2) - xLogX(x3) - xLogX(x4) def LLR(k11, k12, k21, k22):
rowEntropy = entropy(k11 + k12, k21 + k22)
colEntropy = entropy(k11 + k21, k12 + k22)
matEntropy = entropy(k11, k12, k21, k22)
if rowEntropy + colEntropy < matEntropy:
return 0.0
return 2.0 * (rowEntropy + colEntropy - matEntropy) def rootLLR(k11, k12, k21, k22):
llr = LLR(k11, k12, k21, k22)
sqrt = np.sqrt(llr)
if k11 * 1.0 / (k11 + k12) < k21 * 1.0 / (k21 + k22):
sqrt = -sqrt
return sqrtIterate over non‑zero entries of the co‑occurrence matrix to compute LLR scores:
row_sum = np.sum(cooc, axis=0).A.flatten()
column_sum = np.sum(cooc, axis=1).A.flatten()
total = np.sum(row_sum)
pp_score = csr_matrix((cooc.shape[0], cooc.shape[1]), dtype='double')
for i, j, v in zip(cooc.tocoo().row, cooc.tocoo().col, cooc.tocoo().data):
if v != 0:
k11 = v
k12 = row_sum[i] - k11
k21 = column_sum[j] - k11
k22 = total - k11 - k12 - k21
pp_score[i, j] = rootLLR(k11, k12, k21, k22)Sort each row to keep the top similarity indicators (e.g., top 50 with LLR ≥ 5) and prepare bulk update actions for ElasticSearch:
result = np.flip(np.sort(pp_score.A, axis=1), axis=1)
result_indices = np.flip(np.argsort(pp_score.A, axis=1), axis=1)
minLLR = 5
indicators = result[:, :50]
indicators[indicators < minLLR] = 0.0
indicators_indices = result_indices[:, :50]
# Build bulk actions
actions = []
for i in range(indicators.shape[0]):
length = indicators[i].nonzero()[0].shape[0]
real_inds = items[indicators_indices[i, :length]].astype('int').tolist()
id = items[i]
action = {"index": {"_index": "items2", "_id": str(id)}}
data = {"id": int(id), "indicators": real_inds}
actions.append(json.dumps(action))
actions.append(json.dumps(data))
if len(actions) == 200:
actions_string = "\n".join(actions) + "\n"
requests.post("http://127.0.0.1:9200/_bulk/", headers={"Content-Type": "application/x-ndjson"}, data=actions_string)
actions = []
if actions:
actions_string = "\n".join(actions) + "\n"
requests.post("http://127.0.0.1:9200/_bulk/", headers={"Content-Type": "application/x-ndjson"}, data=actions_string)After indexing, the recommendation data can be queried via ElasticSearch endpoints such as http://127.0.0.1:9200/items2/_count or http://127.0.0.1:9200/items2/{item_id} to retrieve similar items.
Conclusion : By combining open‑source big‑data components (Hadoop, Hive, Kafka, ElasticSearch) with Python‑based matrix processing and LLR similarity, a scalable, production‑grade recommendation system can be built without excessive complexity.
DataFunTalk
Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.