我将以下表格存储为单独的 csv 文件:
客户(c_id、性别、地址、出生日期)
餐食(r_id,c_id,日期)(因此顾客在餐厅用餐)
餐厅(类型,r_id)
餐厅规模为 10.000,餐食为 1.000.000,顾客人数为 2.000.000
我需要以下地图缩减工作:对于所有餐厅,显示它们出现在“小酒馆”类型且顾客为男性的餐厅的餐食数量。
这将转换为以下 sql 查询:
SELECT r.r_id, COUNT(*) AS count_meals
FROM restaurants r
INNER JOIN meals m ON r.r_id = r.r_id
INNER JOIN customers c ON m.c_id = c.c_id
WHERE c.gender = 'MALE' AND r.type = 'bistro'
GROUP BY r.r_id
上述条件会减小表的大小,如下所示: 餐厅 300 家,顾客 900,000 人,餐食保持不变
MapReduce作业通过以下命令启动:
python3 mrCustomers.py < "data/restaurants.csv" "data/customers.csv" "data/meals.csv" > output.csv
要读取文件,我依赖于每个条目的长度,我的映射器如下所示:
from mrjob.job import MRJob
from mrjob.step import MRStep
from mr3px.csvprotocol import CsvProtocol
import csv
class MRCustomers(MRJob):
OUTPUT_PROTOCOL = CsvProtocol
def mapper(self, _, line):
if line.startswith('c_id') or line.startswith('r_id') or line.startswith('type'):
return
reader = csv.reader([line])
columns = next(reader)
if len(columns) == 4:
if str(columns[1]) != 'MALE':
return
c_id = columns[0]
yield c_id, "customer"
elif len(columns) == 3:
r_id = columns[0]
c_id = columns[1]
yield r_id, ("M", c_id)
else:
type = columns[0]
if type == 'bistro':
r_id = columns[1]
yield r_id, "restaurant"
我遇到的问题是,在减速器中我总是收到每个表的块,例如在第一轮中,我有 30 位顾客、20,000 顿饭菜,但没有餐馆。如果我没有从每一轮映射器中获得匹配集,我就不可能执行连接。我的映射器逻辑有缺陷吗?另外我应该如何编写减速器?
理论上,在减速器中我应该获得具有相同键的聚合元组,因此
yield r_id, "restaurant"
和 yield r_id, ("M", c_id)
应该在减速器中作为一场比赛进行聚合和接收,但这不会发生
编辑:所以我在第一个工作步骤中成功地将
meals
和 restaurants
在减速器端连接在一起,如下所示:
def reducer1(self, key, values):
joins = [x for x in values]
if len(joins) > 1:
if joins[0][0] == "restaurants":
for tup in joins[1:]:
c_id = tup[1]
yield c_id, (tup[0], key)
elif joins[0][0] == "customer":
for customer in joins:
yield key, ("customer", key)
然后在第二个工作步骤中,我尝试将结果聚合到映射器中,然后将其传递给减速器并加入
customers
def mapper2(self, key, value):
yield key, value
def reducer2(self, key, values):
joins = [x for x in values]
if len(joins) > 1:
if joins[0][0] == "customer":
for tup in joins[1:]:
yield tup[1], 1
def reducer3(self, key, values):
yield None, (key, sum(values))
def steps(self):
first_step = MRStep(
mapper=self.mapper1,
reducer=self.reducer1,
)
second_step = MRStep(
mapper=self.mapper2,
reducer=self.reducer2,
)
third_step = MRStep(
reducer=self.reducer3
)
return [first_step, second_step, third_step]
现在我遇到以下问题:输出中生成的元组是 SQL 查询的完整结果的一部分,因此这是一个不完整的解决方案。我不明白为什么连接没有按预期工作
这种方法对于我来说是“纯金”,让我能够理解在 MapReduce 中连接三个表来完成我必须做的非常相似的任务。我希望这也能帮助其他人!! 注意:还记录/打印中间结果(在下面的代码中进行了注释),然后停止内核,这对我充分理解 MapReduce 作业的进程有很大帮助。
原来的任务
使用三个数据集作为输入和以下输出编写一个 MapReduce 作业:
对于所有带有 fashion_news_frequency = 'Regularly'
的客户,显示他们在文章中带有
graphical_appearance_name == 'Solid'
且 color_group_name 等于“浅米色”的交易数量。确保最终输出采用以下格式:
customer_id,count_transactions
用 Jupyter 笔记本编写:
%%file mymrjob3.py
# This will create a local file to run your MapReduce program
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.util import log_to_stream, log_to_null
from mr3px.csvprotocol import CsvProtocol
import csv
import logging
from collections import defaultdict
class MyMRJob3(MRJob):
OUTPUT_PROTOCOL = CsvProtocol # write output as CSV
def mapper(self, _, line):
# Process each line in transactions.csv
data = line.strip().split(',')
# 1. There are 25 column names for articles.csv
# 2. There are 5 column names for transactions.csv: t_dat,customer_id,article_id,price,sales_channel_id
# 3. There are 7 column names for customers.csv: customer_id,FN,Active,club_member_status,fashion_news_frequency,age,postal_code
if len(data) == 5:
customer_id = data[1]
articles_id = data[2]
yield articles_id, ("transaction", customer_id)
elif len(data) == 7:
if data[4] == 'Regularly':
customer_id = data[0]
yield customer_id, ("customer",2)
else:
if data[7] == 'Solid' and data[9] == 'Light Beige':
articles_id = data[0]
yield articles_id, ("articles",1)
def reducer(self, key, value):
logger = logging.getLogger('__main__')
# First all values are collected within the same key
joins = [x for x in value]
# If there is more than one tuple, there must be matching article_ids
if len(joins) > 1:
# We only care about the matching article_ids from the articles.csv
# This if-statement assures that no duplicate article_ids from the transactions.csv are taken
if joins[0][0] == "articles":
#logger.info(joins)
for tup in joins[1:]:
#logger.info(tup)
customer_id = tup[1]
# Yield the matching number of transactions, remove the first one, which is from the articles.csv
yield customer_id, (tup[0], len(joins) - 1)
elif joins[0][0] == "customer":
yield key, ("customer", key)
def final_reducer(self, key, value):
logger = logging.getLogger('__main__')
customer_dict = defaultdict(int)
joins = [x for x in value]
# Same as above
if len(joins) > 1:
if joins[0][0] == "customer":
#logger.info(joins[1:])
for transaction, count in joins[1:]:
customer_dict[key] += count
yield None, (key, customer_dict[key])
def steps(self):
first_step = MRStep(
mapper=self.mapper,
reducer=self.reducer
)
second_step = MRStep(
reducer=self.final_reducer
)
#return [first_step]
return [first_step, second_step]
if __name__ == '__main__':
MyMRJob3.run()
之后我只需执行
! python3 mymrjob3.py path/to/articles.csv path/to/transactions.csv path/to/customers.csv