推荐系统(推荐引擎)是根据用户行为和兴趣点等信息去预测并推送用户当前需要或感兴趣的物品(服务)的一类应用。常见推荐系统包括电影、书籍、音乐或新闻文章推荐系统等。
基于不同的算法或技术,推荐系统有很多种类型,如协同过滤算法(Collaborative Filtering)推荐系统、基于内容(Content-Based)的推荐系统、混合推荐系统和基于向量的推荐系统。其中,基于向量的推荐系统使用向量空间来寻找(即推荐)数据库中最相似的产品或内容。而存储向量数据最有效的方法便是使用 Milvus 这样全球领先的向量数据库。
本文将介绍如何使用 Milvus 和 Python 搭建电影推荐系统。在搭建过程中,我们会使用 SentenceTransformers 将文本信息转换为向量,并将这些向量存储在 Milvus 中。搭建完成后,用户便可输入描述并在推荐系统中搜索到相似的电影。如需本教程中的所有代码,参考:
-
GitHub上的 Milvus Bootcamp 仓库
-
Jupyter Notebook
01.设置环境
开始前,请先安装:
-
Python 3.x
-
Python Package Manager (PIP)
-
Jupyter Notebook
-
Docker
-
至少 32 GB RAM 的硬件系统或 Zilliz Cloud 账号
使用Python安装所需工具和软件
$ python -m pip install pymilvus pandas sentence_transformers kaggle
向量数据库 Milvus
本教程中我们将用 Milvus 存储由电影信息转化而来的 Embedding 向量。由于使用到的数据集较大,推荐大家创建 Zilliz Cloud 集群来存储向量数据库。但如果仍想安装本地实例,可下载 docker-compose 配置文件并运行文件。
$ wget https://github.com/milvus-io/milvus/releases/download/v2.3.0/milvus-standalone-docker-compose.yml -O docker-compose.yml
$ docker-compose up -d
一切准备就绪后,便可以搭建电影推荐系统啦!
02.准备并预处理数据
首先,我们选择用 Kaggle上的电影数据集,该数据集包含 45000 部电影的元数据信息。大家可以直接下载数据集或通过 Python 使用 Kaggle API 下载数据集。如需通过 Python 下载,请先在 Kaggle.com 的 Profile 中下载 kaggle.json 文件。注意!一定要将此文件存储在 API 可以获取的路径中。
接着,设置环境变量以验证 Kaggle 身份。打开 Jupyter Notebook 输入以下代码:
%env KAGGLE_USERNAME=username
%env KAGGLE_KEY=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
%env TOKENIZERS_PARALLELISM=true
用 Kaggle 的 Python 依赖下载 Kaggle 上的电影数据集:
import kaggle dependency
import kaggle
kaggle.api.authenticate()
kaggle.api.dataset_download_files('rounakbanik/the-movies-dataset', path='dataset', unzip=True)
数据集下载完成后,使用 pandas 的 read_csv()
读取数据集数据:
import pandas
import pandas as pd
read csv data
movies=pd.read_csv('dataset/movies_metadata.csv',low_memory=False)
check shapeof data
movies.shape
上图展示了读取到 45466 条电影元数据。每条电影数据中包含 24 列。使用以下命令查看所有列的信息:
check column names
movies.columns
搭建电影推荐系统过程中,不需要使用到所有的列。通过以下代码筛选我们需要的列:
filter required columnstrimmed_movies = movies[["id", "title", "overview", "release_date", "genres"]]
trimmed_movies.head(5)
此外,一些数据中缺少部分字段。删除这些缺少字段的数据:
unclean_movies_dict = trimmed_movies.to_dict('records')
print('{} movies'.format(len(unclean_movies_dict)))
movies_dict = []for movie in unclean_movies_dict:if movie["overview"] == movie["overview"] and movie["release_date"] == movie["release_date"] and movie["genres"] == movie["genres"] and movie["title"] == movie["title"]:
movies_dict.append(movie)
03.搭建过程
连接 Milvus
处理完数据后,连接 Milvus 集群以导入数据。我们需要使用 URI 和 token 来连接 Milvus 集群,这些信息可以在 Zilliz Cloud 界面上找到。
使用以下命令通过 PyMilvus 连接 Milvus 服务器:
import milvus dependencyfrom pymilvus import *
connect to milvusmilvus_uri="YOUR_URI"token="YOUR_API_TOKEN"
connections.connect("default", uri=milvus_uri, token=token)
print("Connected!")
将电影信息转化为 Embedding 向量
接下来,把电影数据集转化为 Embedding 向量。首先,创建 1 个 Collection 用于存储电影 ID 和电影信息向量。创建 Collection 时还可以添加索引,使后续搜索变得更高效:
COLLECTION_NAME = 'film_vectors'
PARTITION_NAME = 'Movie'Here's our record schema"""
"title": Film title,
"overview": description,
"release_date": film release date,
"genres": film generes,
"embedding": embedding
"""
id = FieldSchema(name='title', dtype=DataType.VARCHAR, max_length=500, is_primary=True)
field = FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, dim=384)
schema = CollectionSchema(fields=[id, field], description="movie recommender: film vectors", enable_dynamic_field=True)
if utility.has_collection(COLLECTION_NAME): # drop the same collection created before
collection = Collection(COLLECTION_NAME)
collection.drop()
collection = Collection(name=COLLECTION_NAME, schema=schema)
print("Collection created.")
index_params = {"index_type": "IVF_FLAT","metric_type": "L2","params": {"nlist": 128},
}
collection.create_index(field_name="embedding", index_params=index_params)
collection.load()
print("Collection indexed!")
Collection创建完毕后,需要写一个函数来生成向量。电影向量中将包含电影简介、电影类型、发行日期等信息。用 SentenceTransformer 生成向量:
from sentence_transformers import SentenceTransformer
import ast
function to extract the text from genre columndef build_genres(data):
genres = data['genres']
genre_list = ""
entries= ast.literal_eval(genres)
genres = ""for entry in entries:
genre_list = genre_list + entry["name"] + ", "
genres += genre_list
genres = "".join(genres.rsplit(",", 1))return genres
create an object of SentenceTransformer
transformer = SentenceTransformer('all-MiniLM-L6-v2')
function to generate embeddingsdef embed_movie(data):
embed = "{} Released on {}. Genres are {}.".format(data["overview"], data["release_date"], build_genres(data)) embeddings = transformer.encode(embed)return embeddings
上述函数使用了 build_genres()
清除电影风格,并提取文本。随后创建了 SentenceTransformer 对象用于生成文本向量。最终用 encode()
方法生成电影向量。
将向量导入 Milvus
由于数据集过大,将数据逐条插入 Milvus 较为低效,且会导致网络流量增加。所以,我们推荐将数据批量导入 Milvus,1 次批量导入 5000 条数据。
Loop counter for batching and showing progressj = 0batch = []
for movie_dict in movies_dict:
try:
movie_dict["embedding"] = embed_movie(movie_dict)batch.append(movie_dict)
j += 1
if j % 5 == 0:
print("Embedded {} records".format(j))
collection.insert(batch)
print("Batch insert completed")batch=[]
except Exception as e:
print("Error inserting record {}".format(e))
pprint(batch)
break
collection.insert(movie_dict)
print("Final batch completed")
print("Finished with {} embeddings".format(j))
注意:大家可以根据自己的偏好和需求调整批量上传的数据量。同时,有一部分电影可能会由于其 ID 无法转换为整数而导入失败,我们可以相应调整 Schema 或者检查数据格式来避免导入失败的问题。
04.使用 Milvus 搜索和推荐电影
借助 Milvus 的近实时向量搜索能力为用户推荐合适的电影,创建以下两个函数:
-
embed_search()
用 Transformer 将用户搜索文本(字符串)转化为 Embedding 向量,Transformer 和前面的相同。 -
search_for_movies()
用于进行向量相似性搜索。
load collection memory before search
collection.load()
Set search parameters
topK = 5
SEARCH_PARAM = {
"metric_type":"L2",
"params":{"nprobe": 20},
}
convertsearch string to embeddings
def embed_search(search_string):
search_embeddings = transformer.encode(search_string)return search_embeddings
search similar embeddings for user's query
def search_for_movies(search_string):
user_vector = embed_search(search_string)
return collection.search([user_vector],"embedding",param=SEARCH_PARAM, limit=topK, expr=None, output_fields=['title', 'overview'])
上述代码中,我们设置了以下参数:
-
Top-K:
topK = 5
, 规定了搜索将返回 5 个最相似的向量 -
相似度类型:
metric_type
设置为欧氏距离(Euclidean/L2) -
nprobe:设置为 20,规定了搜索 20 个数据聚类
最后,用 search_for_movies()
函数根据用户搜索推荐相关电影:
from pprint import pprint
search_string = "A comedy from the 1990s set in a hospital. The main characters are in their 20s and are trying to stop a vampire."
results = search_for_movies(search_string)
check resultsfor hits in iter(results):for hit in hits:print(hit.entity.get('title'))print(hit.entity.get('overview'))print("-------------------------------")
上图显示,搜索到了 5 部相似电影。至此,我们已经成功用 Milvus 搭建了一个电影推荐系统。
本文最初发布于 The New Stack,已获得转载许可。