Linux 上使用 Hadoop 进行 MapReduce 简介

作者:Adam Monsen

当您的数据和工作增长时,而您仍然希望及时产生结果,您就开始考虑规模化。您的一台强大的服务器达到了极限。您需要一种方法将您的工作分散到多台计算机上。您真正需要横向扩展。

在拓荒时代,他们用牛来拉重物,当一头牛无法移动原木时,他们不会尝试饲养更大的牛。我们不应该尝试制造更大的计算机,而应该制造更多的计算机系统。——Grace Hopper

显然,集群计算早已不是新鲜事。 发生了什么变化? 今天

  • 我们收集的数据比以往任何时候都多。
  • 即使是中小企业也可以从 Hadoop 和 MapReduce 等工具中获益。
  • 您不必拥有博士学位即可创建和使用自己的集群。
  • 许多不错的自由/开源工具可以帮助您轻松地将商用硬件集群化。

让我从一些简单的例子开始,这些例子可以在一台机器上运行,并可以扩展以满足更大的需求。您可以在笔记本电脑上尝试它们,然后过渡到更大的集群——例如您使用商用 Linux 机器构建的集群、您公司或大学的 Hadoop 集群或 Amazon Elastic MapReduce。

并行问题

让我们从可以划分为更小的独立工作单元的问题开始。 这些问题大致分为“易于并行”,顾名思义,它们适合并行处理。 例子

  • 将电子邮件消息分类为垃圾邮件。
  • 转码视频。
  • 渲染地球规模的地图瓦片图像。
  • 计算与模式匹配的日志行数。
  • 计算特定应用程序每周每天的错误数。

现在艰苦的工作开始了。 并行计算很复杂。 竞争条件、部分故障和同步阻碍了我们的进步。 这就是 MapReduce 拯救我们口头上的培根的地方。

MapReduce 示例

MapReduce 是一种编码模式,它抽象了可扩展计算的许多棘手部分。 我们可以自由地专注于手头的问题,但这需要练习。 让我们练习一下!

假设您有来自某些自定义应用程序的 100 个 10GB 日志文件——大约 1PB 的数据。 您进行快速测试并估计您的桌面需要几天时间才能 grep 每行(假设您甚至可以将数据放在桌面上)。 而且,这还是在您添加按主机分组和计算总数的逻辑之前。 您久经考验的 shell 实用程序无济于事,但 MapReduce 可以轻松处理此问题。

首先,让我们看一下原始数据。 来自自定义应用程序的日志行如下所示


localhost: restarting
dsl5.example.com: invalid user 'bart'
dsl5.example.com: invalid user 'charlie'
dsl5.example.com: invalid user 'david'
dsl8.example.net: invalid password for user 'admin'
dsl8.example.net: user 'admin' logged in

日志格式是主机名、冒号、消息。 您的老板怀疑有人恶意尝试暴力攻击该应用程序。 同一台主机尝试许多不同的用户名可能表明存在攻击。 他想要按主机名分组的“无效用户”消息的总数。 过滤上面的日志行应该产生


dsl5.example.com        3

对于千兆字节的日志文件,您值得信赖的 shell 工具可以很好地工作。 对于太字节,需要更多功能。 这是 Hadoop 和 MapReduce 的工作。

在开始使用 Hadoop 之前,让我们调用一些 Python 并在小数据集上本地测试。 我假设您安装了最新的 Python。 我在 Ubuntu 12.10 上使用 Python 2.7.3 进行了测试。

要编写的第一个程序消耗来自我们自定义应用程序的日志行。 让我们称之为 map.py


#!/usr/bin/python
import sys
for line in sys.stdin:
  if 'invalid user' in line:
    host = line.split(':')[0]
    print '%s\t%s' % (host, 1)

map.py 在任何时候看到包含字符串“无效用户”的行时,都会打印主机名、制表符和数字 1。 将示例日志行写入 log.txt,然后测试 map.py


chmod 755 map.py
./map.py < log.txt

输出是


dsl5.example.com        1
dsl5.example.com        1
dsl5.example.com        1

map.py 的输出将通过管道传输到我们的下一个程序 reduce.py


#!/usr/bin/python
import sys
last_host = None
last_count = 0
host = None
for line in sys.stdin:
  host, count = line.split('\t')
  count = int(count)
  if last_host == host:
    last_count += count
  else:
    if last_host:
      print '%s\t%s' % (last_host, last_count)
    last_host = host
    last_count = count
if last_host == host:
  print '%s\t%s' % (last_host, last_count)

reduce.py 汇总特定主机的连续行。 让我们假设行按主机名分组。 如果我们看到相同的主机名,我们会增加总数。 如果我们遇到不同的主机名,我们将打印到目前为止的总数,并重置总数和主机名。 当我们耗尽标准输入时,我们会在必要时打印总数。 这假设具有相同主机名的行始终连续出现。 它们会的,稍后我会解释原因。 通过像这样将其与 map.py 管道连接在一起来测试


chmod 755 reduce.py
./map.py < log.txt | sort | ./reduce.py

稍后,我将解释为什么我在管道中添加了 sort。 这打印


dsl5.example.com        3

正是我们想要的。 测试成功! 我们的测试日志行包含主机 dsl5.example.com 的三个“无效用户”消息。 稍后我们将让这个本地测试在 Hadoop 集群上运行。

让我们更深入地研究一下。 map.py 到底做了什么? 它将非结构化日志数据转换为制表符分隔的键值对。 它为键发出主机名,为值发出制表符和数字 1(同样,仅适用于包含“无效用户”消息的行)。 请注意,任意数量的日志行可以馈送到任意数量的 map.py 程序实例——每行都可以独立检查。 同样,map.py 的每一行输出都可以独立检查。

map.py 的输出成为 reduce.py 的输入。 reduce.py 的输出(主机名、制表符、数字)看起来非常类似于其输入。 这是经过设计的。 键值对可能会被多次减少,因此 reduce.py 必须优雅地处理此问题。 如果我们要重新减少我们的最终答案,我们将得到完全相同的结果。 reduce.py 的这种可重复的、可预测的行为称为幂等性。

我们刚刚使用了一个 reduce.py 实例进行了测试,但您可以想象许多 reduce.py 实例处理来自 map.py 的许多行输出。 请注意,这仅在具有相同主机名的行连续出现时才有效。 在我们的测试中,我们通过在管道中添加 sort 来强制执行此约束。 这模拟了我们的代码在 Hadoop MapReduce 中的行为方式。 Hadoop 将以类似方式对 reduce.py 的输入进行分组和排序。

我们不必担心执行将如何进行以及将运行多少个 map.py 和 reduce.py 实例。 我们只需遵循 MapReduce 模式,Hadoop 就会完成剩下的工作。

使用 Hadoop 的 MapReduce

Hadoop 主要是一个 Java 框架,但神奇的 Streaming 实用程序允许我们使用其他语言编写的程序。 该程序只需遵守标准输入和输出的某些约定(我们已经做到了)。

您需要 Java 1.6.x 或更高版本(我使用了 OpenJDK 7)。 其余操作可以并且应该以非 root 用户身份执行。

下载最新的稳定 Hadoop tarball(请参阅“资源”)。 不要使用特定于发行版的(.rpm 或 .deb)软件包。 我假设您下载了 hadoop-1.0.4.tar.gz。 解压缩它并更改到 hadoop-1.0.4 目录。 目录 hadoop-1.0.4 以及文件 map.py、reduce.py 和 log.txt 应位于 /tmp 中。 如果没有,请根据需要在下面的示例中调整路径。

像这样在 Hadoop 上运行作业


cd /tmp/hadoop-1.0.4
bin/hadoop jar \
  contrib/streaming/hadoop-streaming-1.0.4.jar \
  -mapper /tmp/map.py -reducer /tmp/reduce.py \
  -input /tmp/log.txt -output /tmp/output

Hadoop 会将一些内容记录到控制台。 查找以下内容


...
INFO streaming.StreamJob:  map 0%  reduce 0%
INFO streaming.StreamJob:  map 100%  reduce 0%
INFO streaming.StreamJob:  map 100%  reduce 100%
INFO streaming.StreamJob: Output: /tmp/output

这意味着我们的作业已成功完成。 我看到一个名为 /tmp/output/part-00000 的文件,其中包含我们期望的内容


dsl5.example.com        3

现在是暂停、微笑并用四份特大冰焦糖玛奇朵奖励自己的好时机。 你是个摇滚明星。

图 1. 这是我们在 map 和 reduce 步骤中所做的。 我们执行的转换使我们能够在任意数量的机器上运行任意数量的 mapper 和 reducer。 Hadoop 负责处理繁琐的细节。 它启动 mapper 和 reducer,在它们之间传递数据并输出答案。

集群 MapReduce

如果您到目前为止一切正常,也请尝试启动您自己的集群! 在具有多个 Java 虚拟机的单个物理机上运行 Hadoop 称为伪分布式操作。

伪分布式操作需要一些配置。 您运行 Hadoop 的用户还必须能够与 localhost 建立 SSH 无密码连接。 安装和配置超出了本文的范围,但您将在“资源”中提到的“单节点设置”教程中找到更多信息。 如果您从上面推荐的 1.0.4 tarball 版本开始,则本教程应在任何标准 GNU/Linux 发行版上逐字逐句地工作。

如果您设置了伪分布式(或分布式)Hadoop,您将获得两个简陋但有用的 Web 界面的好处。 NameNode Web 界面允许您浏览日志并浏览 Hadoop 分布式文件系统。 JobTracker Web 界面允许您监视 MapReduce 作业和调试问题。

图 2. NameNode Web 界面

图 3. JobTracker Web 界面

极其简单的 Python MapReduce

您可能想知道为什么 reduce.py(上面)是一个复杂的迷你状态机。 这是因为主机名在 Hadoop 提供的输入行中会发生变化。 Dumbo Python 库(请参阅“资源”)隐藏了 Hadoop 的这个细节。 Dumbo 让我们更加专注于我们的映射和减少。

在 Dumbo 中,我们的 MapReduce 实现变为


def mapper(key, value):
  if 'invalid user' in value:
    yield value.split(':')[0], 1

def reducer(key, values):
  yield key, sum(values)

if __name__ == '__main__':
  import dumbo
  dumbo.run(mapper, reducer)

状态机消失了。 Dumbo 负责按键(主机名)分组。

将上面的代码保存在名为 /tmp/smart.py 的文件中。 安装 Dumbo。 有关链接,请参阅“资源”,请放心,这很容易。 安装 Dumbo 后,运行代码


cd /tmp
dumbo start smart.py -hadoop hadoop-1.0.4 \
  -input log.txt -output totals \
  -outputformat text

最后,检查输出


cat totals/part-00000

内容应与我们之前从 Hadoop Streaming 获得的结果相匹配。

非用例

Hadoop 非常适合一次性作业和离线批处理,尤其是在数据已在 Hadoop 文件系统中并且将被多次读取的情况下。 如果您假设这一点,我的第一个例子更有意义。 也许这项工作必须每天运行,并且必须在几分钟内完成。

考虑 Hadoop 是错误工具的一些情况。 小数据集? 别费心了。 在火箭和踏板车之间的一米赛跑中,踏板车在火箭发动机启动之前就消失了。 网站的事务数据存储? 请尝试 MySQL 或 MongoDB。

Hadoop 也不会帮助您在数据到达时处理数据。 这通常被称为“实时”或“流式传输”。 为此,请考虑 Storm(有关更多信息,请参阅“资源”)。

通过实践,您将很快能够辨别何时 Hadoop 是工作的正确工具。

资源

您可以从 下载最新的稳定 Hadoop tarball。

请参阅 https://hadoop.apache.ac.cn/docs/current/single_node_setup.html 以获取有关如何运行伪分布式 Hadoop 集群的信息。

如果您想在 Python 中使用 MapReduce 做更多事情,请查看 Dumbo:http://projects.dumbotics.com/dumbo。 有关安装说明,请参阅 https://github.com/klbostee/dumbo/wiki/Building-and-installing,有关优秀教程,请参阅 https://github.com/klbostee/dumbo/wiki/Short-tutorial

请参阅 https://github.com/nathanmarz/storm 以获取有关 Storm(一种实时分布式计算系统)的信息。

要运行 Storm 和 Hadoop 并集中管理两者,请查看 Mesos 项目:http://www.mesosproject.org

加载 Disqus 评论