用 MapReduce 解决与云计算相关的 Big Data 问题

2017 年 9 月 21 日 全球人工智能

“全球人工智能”拥有十多万AI产业用户,10000多名AI技术专家。主要来自:北大,清华,中科院,麻省理工,卡内基梅隆,斯坦福,哈佛,牛津,剑桥...以及谷歌,腾讯,百度,脸谱,微软,阿里,海康威视,英伟达......等全球名校和名企。


-免费加入AI高管投资群>>

-免费加入AI技术专家群>>

微观经济学指出,比起让大多数成员来执行要在该系统中存在所必需的活动,基于专业分工的系统生产力更高。换句话说,对于每个任务,全能的人的生产力比不上专门从事特定任务的人。这称为比较优势 — 如果一个人相当精通某种服务,而不熟悉其他服务,他在提供这种服务方面就具有优势。专业分工可以促进特定技能的提升。(Robert Frank 和 Ben Bernanke 所著的 Principles of Microeconomics 很好地论述了这一现象;有一个故事说,Peace Corps 的一位志愿者在尼泊尔雇佣了一位名叫 Birkhaman 的厨师;这位厨师几乎无所不能;从宰羊到修理闹钟,他什么都会干。在尼泊尔,即使是水平最低的工人也能够提供很多种服务。)

云计算就是比较优势原理的直接示例。在本文中,我将讨论 MapReduce 编程模式(最初为对并行的复杂性进行抽象而设计)为什么非常适合云计算,尤其是在处理涉及大量数据的问题时。

通过让把两个数字相加的位置变得透明和不相关,可以在 MapReduce 抽象的基础上完美进行云计算。在研究示例之前,我们先看看 MapReduce 为什么很成功。

分布式计算中的 Erlang

云计算的潮流催生了许多新东西,包括 Erlang。Erlang 是一种独特的编程语言,它提供了用来描述操作系统的许多特性。这些特殊的特性让它成为构建大型分布式系统的理想语言。很自然,许多分布式算法的 “云” 实现都是用 Erlang 编写的,比如 CouchDB 或 Disco。甚至在云这个词出现之前,就已经使用 Erlang 构建云系统了。

为什么要在云中使用 MapReduce

MapReduce 编程模式是在 Google 开发出来的。Google 工程师发表的文章 "MapReduce: Simplified Data Processing on Large Clusters" 清楚地解释了 MapReduce 的工作方式。这篇文章导致的结果是,从 2004 年到现在出现了许多开放源码的 MapReduce 实现。

MapReduce 系统获得成功的原因之一是,它为编写需要大规模并行处理的代码提供了简单的编程模式。它受到了 Lisp 的函数编程特性和其他函数式语言的启发。

现在,讨论 MapReduce 和云计算为什么非常相配。MapReduce 的关键特点是它能够对开发人员隐藏操作并行语义 — 并行编程的具体工作方式。

即使您的公司拥有数千台计算机(这几乎不可能),这个特点也非常有意义。即使组织有多余的处理能力,要想在组织内建立网格,也常常要克服许多技术、行政和后勤障碍。

突然之间,云计算成了非常引人注目的思想。

有了云,开发人员就可以通过编写脚本供应任意数量的计算机,运行 MapReduce 作业,而且只按照使用每个系统的时间付费。使用时间可以是 10 分钟,也可以是 10 个月,无论是哪种情况,都同样简便。

这种模式的精彩案例出现在 Yelp("Real people. Real reviews®: A review site for local businesses")。在此公司的工程博客上,最近发表了一篇关于 如何使用 MapReduce 增强特性 的文章,题目为 “People Who Viewed This Also Viewed...”。这是一个典型的 Big Data 问题,因为 Yelp 每天生成 100GB 的日志数据。

工程师最初建立了自己的 Hadoop 集群,但是最终他们编写了自己的 MapReduce 框架 mrjob,它在 Amazon 的 Elastic MapReduce 服务上运行。Yelp 的搜索和数据挖掘工程师 Dave M 说:

“我们如何增强 People Who Viewed this Also Viewed... 特性?正如您所猜测的,我们使用 MapReduce。MapReduce 是把大任务分解为小任务的最简单的方法。Mapper 读取输入行并返回 (key, value) 元组。把每个键和对应的所有值发送给一个 Reducer ... 我们在 mrjob Python 框架中编写了这个简单的 MapReduce 作业,它执行单词频率统计。”

Dave M 还说:

“我们以前所做的像许多公司一样运行 Hadoop 集群一样 ... 当我们把代码放到 web 服务器上时,就已经把它送到 Hadoop 计算机上。

这不错,因为我们的作业可以引用代码库中的任何其他代码。

这也很不好。在把作业放进生产环境之前,根本无法确定它是否有效。最糟糕的是,我们的集群在大多数时候空闲着,甚至时常出现一个非常大的作业,会占用所有节点,所有其他任务不得不等待。”

在 Amazon 云上运行的 MapReduce 帮助 Yelp 淘汰了 Hadoop 集群。经过一年时间,Yelp 的 mrjob 框架现在非常稳定了,所以 Yelp 现在在 GitHub 上共享它。

云计算和 MapReduce 的组合看起来非常适合处理 Big Data 作业。现在,讲解如何处理大量日志数据。

真实环境中的日志文件处理

许多人都要面对的一个真实问题是如何处理大量日志数据。清单 1 中的代码示例(也可以下载)演示我如何只使用 Python 的多处理模块汇总 6.3GB 的 Internet Information Services (IIS) 日志文件。在一台 MacBook Pro 笔记本上,它只用大约 2 分钟就运行完了,结果是生成了最常出现的 25 个 IP 地址。

清单 1. 使用 Python 的 MP 模块汇总 6.3GB 的日志文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
Code Listing:  iis_map_reduce_ipsum.py
"""N-Core Map Reduce Log Parser/Summation"""
 
from collections import defaultdict
from operator import itemgetter
from glob import glob
from multiprocessing import Pool, current_process
from itertools import chain
 
def ip_start_mapper(logfile):
     log = open(logfile)
     for line in log:
         yield line.split()
 
def ip_cut(lines):
     for line in lines:
         try:
             ip = line[8]
         except IndexError:
             continue
         yield ip, 1
 
def mapper(logfile):
     print "Processing Log File: %s-%s" % (current_process().name, logfile)
     lines = ip_start_mapper(logfile)
     cut_lines = ip_cut(lines)
     return ip_partition(cut_lines)
 
def ip_partition(lines):
     partitioned_data = defaultdict(list)
     for ip, count in lines:
         partitioned_data[ip].append(count)
     return partitioned_data.items()       
 
def reducer(ip_key_val):
     ip, count = ip_key_val
     return (ip, sum(sum(count,[])))
 
def start_mr(mapper_func, reducer_func, files, processes=8, chunksize=1):
     pool = Pool(processes)
     map_output = pool.map(mapper_func, files, chunksize)
     partitioned_data = ip_partition(chain(*map_output))
     reduced_output = pool.map(reducer_func, partitioned_data)
     return reduced_output
 
def print_report(sort_list, num=25):
     for items in sort_list[0:num]:
         print "%s, %s" % (items[0], items[1])
def run():
     files = glob("*.log")
     ip_stats = start_mr(mapper, reducer, files)
     sorted_ip_stats = sorted(ip_stats, key=itemgetter(1), reverse=True)
     print_report(sorted_ip_stats)
     
if __name__ == "__main__":
     run()

图 1 以图形方式说明操作过程。


图 1. IIS 日志文件 MapReduce 图

我们来研究一下代码。您可以看到它非常简短,只有大约 50 行:

  • mapper 函数提取出每行中的 IP 地址并返回它和值 1。是 (key,value) 提取阶段,这在生成的每个进程中执行。当产生结果时,把结果收集到一个链式的迭代器中(参见 more on chain(*iterables)and other Python itertools),为缩减阶段做好准备。这称为数据分区。

  • 在 MapReduce 生命周期中,下一步是浓缩和汇总所有中间结果。这由示例中的缩减函数完成,包含缩减阶段。

  • 最后,生成一个很大的列表并输出前 25 个结果。

使用多处理模块是为了便于解释 MapReduce,但是这段代码只需稍加修改,就可以在其他一些 MapReduce 云上运行。这个作业的完整输出见清单 2。


清单 2. 运行的清单 1 的完整输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
lion% time python iisparse.py
Processing Log File: PoolWorker-1-ex100812.log
Processing Log File: PoolWorker-2-ex100813.log
Processing Log File: PoolWorker-3-ex100814.log
Processing Log File: PoolWorker-4-ex100815.log
Processing Log File: PoolWorker-5-ex100816.log
Processing Log File: PoolWorker-6-ex100817.log
Processing Log File: PoolWorker-7-ex100818.log
Processing Log File: PoolWorker-8-ex100819.log
Processing Log File: PoolWorker-7-ex100820.log
Processing Log File: PoolWorker-3-ex100821.log
Processing Log File: PoolWorker-8-ex100822.log
Processing Log File: PoolWorker-4-ex100823.log
Processing Log File: PoolWorker-6-ex100824.log
Processing Log File: PoolWorker-1-ex100825.log
Processing Log File: PoolWorker-2-ex100826.log
10.0.1.1, 24047
10.0.1.2, 22667
10.0.1.4, 20234
10.0.1.5, 18180
[...output supressed for space, and IP addresses changed for privacy]
python iisparse.py  57.40s user 7.48s system 54% cpu 1:59.47 total


结束语

严格地说,云计算可以意味着许多活动,包括在数据中心中的虚拟机上运行顺序的脚本。在本文中,我应用 MapReduce 和云计算背后的一些理论解决汇总大量数据这个实际问题。

基于云的 MapReduce 系统既有开放源码的,也有商用产品。您可以应用从本文学到的知识处理数 PB 的日志文件;因此,MapReduce 抽象是一种非常有用的工具,尤其是在云环境中。

下一步

当然,要看一下本文的 参考资料。您可能希望重点关注 “自然语言处理” 和 “进一步了解本文中的主题”。另外,下载 Yelp 的 mrjob 和 Apache Hadoop 的 IBM 发行版并做实验。

AI专家问答平台 

热门文章推荐

华裔女科学家钱璐璐,发明仅20纳米的DNA机器人!

Geoffrey Hinton提出capsule 概念,推翻反向传播!

2017年7大最受欢迎的AI编程语言:Python第一!

重磅|中国首家人工智能技术学院在京揭牌开学!

厉害 | 南京大学周志华教授当选欧洲科学院外籍院士!

5个月市值涨了1200亿,首次突破3100亿市值!

华为扔下这枚“AI芯弹”,全世界的智能手机都卡(慢)死了!

用57行代码搞定花8000万美元采购车牌识别项目

厉害|百度28位离职技术大牛和他们创建的AI公司!

一AI工程师下载200万GB色情内容,只为学习Python!

登录查看更多
0

相关内容

MapReduce 是 Google 提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。
【硬核书】可扩展机器学习:并行分布式方法
专知会员服务
80+阅读 · 2020年5月23日
Python分布式计算,171页pdf,Distributed Computing with Python
专知会员服务
105+阅读 · 2020年5月3日
【经典书】数据结构与算法C++,第二版,738页pdf
专知会员服务
165+阅读 · 2020年3月27日
【电子书】大数据挖掘,Mining of Massive Datasets,附513页PDF
专知会员服务
101+阅读 · 2020年3月22日
近期必读的12篇KDD 2019【图神经网络(GNN)】相关论文
专知会员服务
62+阅读 · 2020年1月10日
【大规模数据系统,552页ppt】Large-scale Data Systems
专知会员服务
58+阅读 · 2019年12月21日
21个必须知道的机器学习开源工具!
AI100
13+阅读 · 2019年9月13日
Tensorflow框架是如何支持分布式训练的?
AI100
9+阅读 · 2019年3月26日
深度学习开发必备开源框架
九章算法
12+阅读 · 2018年5月30日
五位专家跟你讲讲为啥Python更适合做AI/机器学习
全球人工智能
3+阅读 · 2018年3月18日
基于 Storm 的实时数据处理方案
开源中国
4+阅读 · 2018年3月15日
TensorFlow 相关论文与研究汇总
云栖社区
4+阅读 · 2018年1月7日
【强烈推荐】:关于系统学习数据挖掘(Data Mining)的一些建议!!
机器学习算法与Python学习
8+阅读 · 2017年12月2日
干货|7步掌握基于Keras的深度学习!
全球人工智能
4+阅读 · 2017年11月14日
手把手教你安装深度学习软件环境(附代码)
数据派THU
4+阅读 · 2017年10月4日
Signed Graph Attention Networks
Arxiv
7+阅读 · 2019年9月5日
Arxiv
8+阅读 · 2019年3月21日
Arxiv
7+阅读 · 2018年1月10日
Arxiv
5+阅读 · 2016年1月15日
VIP会员
相关资讯
21个必须知道的机器学习开源工具!
AI100
13+阅读 · 2019年9月13日
Tensorflow框架是如何支持分布式训练的?
AI100
9+阅读 · 2019年3月26日
深度学习开发必备开源框架
九章算法
12+阅读 · 2018年5月30日
五位专家跟你讲讲为啥Python更适合做AI/机器学习
全球人工智能
3+阅读 · 2018年3月18日
基于 Storm 的实时数据处理方案
开源中国
4+阅读 · 2018年3月15日
TensorFlow 相关论文与研究汇总
云栖社区
4+阅读 · 2018年1月7日
【强烈推荐】:关于系统学习数据挖掘(Data Mining)的一些建议!!
机器学习算法与Python学习
8+阅读 · 2017年12月2日
干货|7步掌握基于Keras的深度学习!
全球人工智能
4+阅读 · 2017年11月14日
手把手教你安装深度学习软件环境(附代码)
数据派THU
4+阅读 · 2017年10月4日
相关论文
Top
微信扫码咨询专知VIP会员