Linux 集群与 Ruby 队列:小即是美

作者:Ara T. Howard

我的朋友 Dave Clements 总是乐于参与头脑风暴会议,尤其是在我请客喝咖啡的情况下。最近,我们在老地方见面,在第一杯咖啡的工夫,我向他解释了我的问题。我的办公室里有一堆 Linux 节点闲置着,还有一堆工作排队等着它们处理,但我们没有办法将工作分配给它们。而且,项目完成的最后期限迫在眉睫。

在第二杯咖啡时,我讲述了自己如何评估了几个软件包,例如 openMosixSun 的 Grid Engine,但最终决定不使用它们。这一切都归结为一点:我想要比我见过的所有东西都更精简的东西,一些快速而简单的东西,而不是一个庞大的软件系统,需要花费数周的时间来安装和配置。

喝完第三杯咖啡后,我们有了主意:为什么不简单地创建一个 NFS 挂载的 优先级队列,让节点尽可能快地从中拉取作业呢?没有调度器,没有进程迁移,没有中央控制器,没有内核模块——仅仅是一组计算节点尽可能快地工作以完成任务列表。但有一个大问题:从多个节点并发访问 NFS 挂载的队列是否安全可行?我拿起了我最喜欢的开发工具——一个名为 Vim 的出色 IDE 和 Ruby 编程语言——我决心找出答案。

历史

我在 国家地球物理数据中心 (NGDC) 的 太阳-地球物理学部 (STP) 的 国防气象卫星计划 (DMSP) 小组工作。我的老板 Chris Elvidge 和我们小组的其他科学家研究来自太空的地球夜间灯光。我们收到的数据帮助研究人员了解人类人口的变化和森林火灾的移动等等。完成这类工作所需的基础设施令人震惊。下图显示了北美部分地区夜间灯光的平均强度,它需要超过 100 吉字节的输入数据和 142 太字节的中间文件才能生成。超过 50,000 个独立的进程分布在 18 个计算节点上,耗时一周的时钟时间才完成其制作。

Linux Clustering with Ruby Queue: Small Is Beautiful

Linux 集群已成为新的超级计算机。在研究经费日益减少的当前环境下,基于商用硬件构建的万亿次浮点运算性能的经济性是不容忽视的。然而,集群构建的一个关键方面,即编排,经常被购买者所忽视。开发人员在使用集群系统时面临的问题,类似于购房者只能负担得起一块地和一些砖块——他还有很多建筑工作要做。

用有限的预算建造一座小砖房

Yukihiro Matsumoto,又名 Matz,曾说过“Ruby 的目的是最大化编程乐趣”,经验告诉我,享受创造过程可以带来更快的开发和更高质量的代码。Ruby 具有强大的面向对象抽象技术、极端的动态性、易于扩展性以及大量的实用库。它是一把名副其实的瑞士军刀,正是人们应该带入未知领域(例如我试图构建的 NFS 挂载的优先级队列)的那种工具。

创建 Ruby Queue (rq) 时,我面临的第一个任务是解决并发访问 NFS 共享存储的问题,而我必须跨越的第一座桥梁是如何从 Ruby 内部实现 NFS 安全锁定。Ruby 有一个类似于 Perl 的 fcntl 接口,并且像 Perl 一样,该接口要求您使用 struct 参数打包缓冲区。这绝对安全,但不幸的是,不具备可移植性。我以前就对这个疏忽感到疑惑,并决定通过编写一个小型的 C 扩展 posixlock 来解决这个问题,该扩展为 Ruby 的内置 File 类添加了一个方法,用于将 fcntl 或 POSIX 风格的咨询锁应用于 File 对象。以下是 posixlock.c 中的大部分代码

static int
posixlock (fd, operation)
     int fd;
     int operation;
{
  struct flock lock;
  switch (operation & ~LOCK_NB)
    {
    case LOCK_SH:
      lock.l_type = F_RDLCK;
      break;
    case LOCK_EX:
      lock.l_type = F_WRLCK;
      break;
    case LOCK_UN:
      lock.l_type = F_UNLCK;
      break;
    default:
      errno = EINVAL;
      return -1;
    }
  lock.l_whence = SEEK_SET;
  lock.l_start = lock.l_len = 0L;
  return fcntl (fd,
		(operation & LOCK_NB) ? F_SETLK :
		F_SETLKW, &lock);
}

static VALUE
rb_file_posixlock (obj, operation)
     VALUE obj;
     VALUE operation;
{
  OpenFile *fptr;
  int ret;
  rb_secure (2);
  GetOpenFile (obj, fptr);
  if (fptr->mode & FMODE_WRITABLE)
    {
      fflush (GetWriteFile (fptr));
    }
retry:
  TRAP_BEG;
  ret =
    posixlock (fileno (fptr->f),
	       NUM2INT (operation));
  TRAP_END;
  if (ret < 0)
    {
      switch (errno)
	{
	case EAGAIN:
	case EACCES:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
	case EWOULDBLOCK:
#endif
	  return Qfalse;
	case EINTR:
#if defined(ERESTART)
	case ERESTART:
#endif
	  goto retry;
	}
      rb_sys_fail (fptr->path);
    }

void
Init_posixlock ()
{
  rb_define_method (rb_cFile, "posixlock",
		    rb_file_posixlock, 1);
}

诚然,它有点丑陋,但 C 代码几乎总是如此。然而,真正令人印象深刻的 Ruby 的优点之一是解释器本身的代码非常易读。源代码包括 array.c、hash.c 和 object.c——即使像我这样的人也能理解这些文件。事实上,我能够从 file.c 中定义的 Ruby 的 File.flock 实现中窃取大约 98% 的上述代码。

除了 posixlock.c 之外,我还需要编写一个 extconf.rb(扩展配置)文件,Ruby 会自动将其转换为 Makefile。以下是 posixlock 扩展使用的完整 extconf.rb 文件

require 'mkmf' and create_makefile 'posixlock'

扩展的用法镜像了 Ruby 自己的 File.flock 调用,但这对于 NFS 挂载的文件是安全的。以下示例可以从多个 NFS 客户端同时运行

require 'socket'
require 'posixlock'

host = Socket::gethostname
puts "test running on host #{ host }"

File::open('nfs/fcntl_locking.test','a+') do |file|
  file.sync = true
  loop do
    file.posixlock File::LOCK_EX
    file.puts "host : #{ host }"
    file.puts "locked : #{ Time::now }"
    file.posixlock File::LOCK_UN
    sleep 0.42
  end
end

Atail -fNFS 挂载的文件 fcntl_locking.test 的输出显示,该文件正在以安全的方式并发访问。请注意缺少错误检查:Ruby 是一种基于异常的语言,因此任何未成功的方法都会引发错误,并且详细的堆栈跟踪会打印在标准错误上。

关于这个扩展,需要注意的一点是,我实际上能够向 Ruby 的内置 File 类添加一个方法。Ruby 的类是开放的——您可以随时扩展任何类,甚至是内置类。显然,扩展内置类应该非常谨慎,但有时这样做是有道理的,Ruby 并不会阻止您在有意义的地方这样做。这里的重点不是您必须扩展 Ruby,而是您可以扩展。而且这样做并不困难。

在解决了我的锁定难题之后,我必须做出的下一个设计选择是使用什么格式来存储队列。Ruby 能够将任何对象序列化到磁盘,并且它包含一个基于事务的文件支持对象存储类 PStore,我已成功地将其用作许多 CGI 程序的迷你数据库。我首先在此类上实现了一个包装器,该包装器使用 posixlock 模块来确保 NFS 安全事务,并支持诸如 insert_job、delete_job 和 find_job 之类的方法。我立刻开始感觉自己像是在编写一个小数据库。

我不喜欢重新发明轮子(至少不太频繁),所以我决定使用 SQLite 嵌入式数据库和 Jamis Buck 编写的出色的 Ruby 绑定 作为存储后端。这真的帮助项目向前推进,因为我从编写大量类似数据库的功能中解放了出来。

许多数据库 API 都选择了返回哈希或数组来表示数据库元组(行)。当元组表示为哈希时,您可以编写易于阅读的代码,例如这样

ssn = tuple['ssn']

然而,您无法编写自然的代码,例如

sql = 
  "insert into jobs values ( #{ tuple.join ',' } )"

primary_key, rest = tuple

而使用数组表示,您最终会得到难以理解的代码,例如这样

field = tuple[7]

现在,字段 7 是什么来着?

当我第一次开始使用 Ruby 的 SQLite 绑定时,所有元组都作为哈希返回,并且我有很多稍微冗长的代码,用于将元组从哈希转换为数组,然后再转换回来。任何花了很多时间使用 Ruby 的人都知道,Ruby 的优雅会激励您使自己的代码更加优雅。所有这些转换不仅不优雅,而且效率低下。我想要的是一个元组类,它是一个数组,但允许关键字字段访问,以提高可读性和优雅性。

对于 Ruby 来说,这项任务没有问题。我编写了一个纯 Ruby 模块 ArrayFields,它允许任何数组完全做到这一点。在 Ruby 中,模块不仅是一个命名空间,而且可以混合到其他类中以赋予功能。效果类似于多重继承,但不太令人困惑。事实上,Ruby 类不仅可以以这种方式扩展,而且 Ruby 对象本身的实例也可以使用模块的功能动态扩展——而同一类的其他实例则不受影响。这是一个使用 Ruby 的 Observable 模块的示例,该模块实现了发布/订阅设计模式

require 'observer'
publisher = Publisher::new
publisher.extend Observable

在这个例子中,只有 Publisher 类的这个特定实例被扩展了 Observable 的方法。

Jamis 非常乐意与我合作,将 ArrayFields 支持添加到他的 SQLite 包中。它的工作方式很简单:如果在运行时检测到 ArrayFields 模块,则查询返回的元组会被动态扩展以支持命名字段访问。内存中的其他数组对象都不会被触及,只有作为元组返回的数组才会被 ArrayFields 扩展。

最后,我能够编写可读的代码,如下所示

require 'arrayfields'
require 'sqlite'

...

query = 'select * from jobs order by submitted asc'

tuples = db.execute query 

tuples.each do |tuple|

  jid, command = job['jid'], job['command']

  run command

  job['state'] = 'finished'

 # quoted list of job's fields 

  values = job.map{|val| "'#{ val }'" }.join ','

  sql = "insert into done values( #{ values } )"

  db.execute sql

end

以及优雅的代码,例如这样

jobs.sort_by{ |job| job['submitted'] }

这种扩展提供的不仅仅是便利;使用数组而不是哈希更快,内存占用减少约 30%,并且使元组上的许多操作更自然地编码。允许关键字访问数组使代码更具可读性,并使开发人员无需记住字段位置,或者更糟糕的是,如果数据库模式的更改应该更改字段顺序,则无需更新代码。最后,减少代码总行数几乎总是有助于开发和维护。

砌墙

使用 posixlock 和 SQLite 使编码持久化的 NFS 安全优先级队列类变得相对简单。当然,还有性能问题需要解决。添加了一个基于租约的锁定系统,以检测我在 SQLite 邮件列表中听说的可能的 lockd 饥饿问题。在这个开发阶段,我在 NFS 邮件列表中发布了很多问题,Trond Myklebust 等开发人员为我提供了宝贵的资源。

在猜测我自己编写的程序的状态方面,我不太聪明。明智的程序员知道,良好的日志记录是无可替代的。Ruby 附带了一个内置的 Logger 类,它提供了诸如自动日志滚动之类的功能。使用这个类作为基础,我能够抽象出一个小型模块,Ruby Queue 中的所有类都使用该模块,只需几行代码即可为其所有对象提供一致、可配置和普遍的日志记录。能够利用内置库来抽象日志记录等重要的构建块,可以节省时间和精力。

如果您仍然使用 XML 作为数据序列化格式,并且渴望更简单、更易读的东西,我强烈建议您查看 YAML。Ruby Queue 广泛使用 YAML 作为输入和输出格式。例如,rq 命令行工具将标记为“重要”的作业显示为

-
  jid: 1
  priority: 0
  state: pending
  submitted: 2004-11-12 15:06:49.514387
  started:
  finished:
  elapsed: 
  submitter: redfish
  runner: 
  pid: 
  exit_status: 
  tag: important
  command: my_job.sh
-
  jid: 2
  priority: 42 
  state: finished 
  submitted: 2004-11-12 17:37:10.312094
  started: 2004-11-12 17:37:13.132700
  finished: 2004-11-12 17:37:13.739824
  elapsed: 0.015724 
  submitter: redfish
  runner: bluefish
  pid: 5477 
  exit_status: 0 
  tag: important
  command: my_high_priority_job.sh

这种格式易于人类阅读,并且对 Linux 命令(如 egrep(1))友好。但最棒的是,上面的文档作为命令的输入时,可以使用单个命令加载到 Ruby 中,作为哈希数组

require 'yaml'
jobs = YAML::load STDIN

然后,它可以作为原生 Ruby 对象使用,而无需复杂的 API

jobs.each do |job|
  priority = job['priority']
  ...
end

也许对 Ruby 的 YAML 最好的总结是由它的作者 "_why" 提供的。他写道:“真的,它太棒了。像黄油涂在面包上一样,直接铺在你的 Rubyware 上!”

屋顶

当一个细微的错误突然出现时,我实际上已经有一个 Ruby Queue (rq) 的原型在生产环境中运行了,这是我们在 DMSP 小组中经常做的一个步骤。NFS 有一个称为“愚蠢重命名”的功能。当两个客户端打开一个 NFS 文件,其中一个客户端删除它时,就会发生这种情况,导致 NFS 服务器将文件重命名为类似 “.nfs123456789” 的名称,直到第二个客户端完成操作并且文件真正可以被删除。

rq 在处理队列(从中运行作业)时的一般操作模式是启动 SQLite 数据库上的事务,找到要运行的作业,fork 一个子进程来运行作业,使用作业的 pid 等信息更新数据库,并结束事务。事实证明,SQLite 中的事务涉及一些临时文件,这些文件在事务结束时会被删除。问题是我在事务中间进行了 fork,导致临时文件的文件句柄在子进程和父进程中都处于打开状态。当父进程然后在事务结束时删除临时文件时,就会发生愚蠢重命名,以便子进程的文件句柄仍然有效。我开始看到大量的这些愚蠢文件堆满了我的队列目录;它们最终会消失,但它们很丑陋,并且让用户感到不安。

最初,我研究了在 fork 后以某种方式关闭文件句柄的可能性,但我从 SQLite 的创建者 Richard Hipp 博士那里收到了坏消息。他说在事务中间进行 fork 会导致“未定义”的行为,并且不建议这样做。

这是一个坏消息,因为我的设计严重依赖于在事务中进行 fork,以便保留启动作业和更新其状态的原子性。我需要能够做到的是在不 fork 的情况下进行 fork。更具体地说,我需要另一个进程来 fork、运行作业并代表我等待它。现在,建立一个协进程并使用 IPC 来实现这种“fork 与 fork”的想法让我感到不寒而栗。幸运的是,Ruby 提供了一个无刺的解决方案。

DRb 或分布式 Ruby 是一个用于处理远程对象的内置库。它类似于 Java RMI 或 SOAP,只是 DRb 的上手难度要低一百万倍。但是,远程对象与在另一个进程中进行 fork 有什么关系呢?我所做的是编写了一个小型类,它为我执行 fork、作业运行和等待。然后可以将这个类的实例设置为子进程中的本地 DRb 服务器。通信通过 UNIX 域套接字透明地完成。换句话说,DRb 服务器是协进程,它为我执行所有 fork 和等待操作。与此对象交互类似于与任何其他 Ruby 对象交互。整个 JobRunnerDaemon 类包含 101 行代码,包括子进程设置。以下是 Feeder 类的一些摘录,其中显示了其用法的关键点。

JobRunnerDaemon 的实例在子进程中启动,并返回该远程(但在 localhost 上)对象的句柄

jrd = JobRunnerDaemon::daemon

为作业创建 JobRunner 对象,并且 JobRunner 是通过在稍后用于运行作业的 JobRunnerDaemon 进程中预先 fork 一个子进程来创建的。实际的 fork 发生在子进程中,因此它不会影响父进程的事务

runner = jrd.runner job
pid = runner.pid 
runner.run

稍后,可以使用 JobRunnerDaemon 上的 DRb 句柄来等待子进程。即使我们正在等待完全不同进程的子进程,这也会像正常的等待一样阻塞。

cid, status = jrd.waitpid2 -1, Process::WUNTRACED

在我的小组中,我们经常经历像这样的“运行它。破坏它。修复它。”周期,理念是生产环境才是最好的测试。与我合作最密切的科学家 Kim Baugh 和 Jeff Safran 非常乐于让程序在他们面前崩溃,如果最终结果是更好、更可靠的代码。用 Ruby 等动态语言编写的程序使我能够快速修复错误,这保持了他们对测试的高度热情。结合起来的效果是快速的进化式开发周期。

入住

在这里,我将逐步介绍用于设置由四个节点组成的即时 Linux 集群的 rq 命令的实际顺序。我们将要使用的节点称为 onefish、twofish、redfish 和 bluefish。每个主机都在下面的提示符中标识。在每个主机上的我的主目录中,我都有指向公共 NFS 目录的符号链接 ~/nfs。

我们首先要做的就是初始化队列

redfish:~/nfs > rq queue create
created <~/nfs/queue>

接下来,我们在所有四个主机上启动 feeder 守护进程

onefish:~/nfs > rq queue feed --daemon -l=~/rq.log
twofish:~/nfs > rq queue feed --daemon -l=~/rq.log
redfish:~/nfs > rq queue feed --daemon -l=~/rq.log
bluefish:~/nfs > rq queue feed --daemon -l=~/rq.log

在实践中,您不会希望在每个节点上手动启动 feeder,因此 rq 支持通过 crontab 条目保持活动状态。当 rq 在守护进程模式下运行时,它会获取一个锁文件,该锁文件有效地将其限制为每个主机、每个队列一个 feeding 进程。如果另一个守护进程已经在同一队列上 feeding,则启动 feeder 守护进程只会失败。因此,像这样的 crontab 条目

15/* * * * * rq queue feed --daemon --log=log

每 15 分钟检查一次是否正在运行守护进程,并且仅当没有守护进程已经在运行时才启动守护进程。通过这种方式,普通用户可以设置一个始终运行的进程,即使在机器重启后也是如此。

可以从命令行、输入文件或 Linux 传统中,从标准输入作为进程管道的一部分提交作业。当使用输入文件或 stdin 时,格式可以是 YAML(例如其他 can rq 命令的输出)或简单的作业列表,每行一个作业。格式是自动检测的。任何看到队列的主机都可以在其上运行命令

onefish:~/nfs > cat joblist 
echo 'job 0' && sleep 0
echo 'job 1' && sleep 1
echo 'job 2' && sleep 2
echo 'job 3' && sleep 3

onefish:~/nfs > cat joblist | rq queue submit
-
  jid: 1
  priority: 0
  state: pending
  submitted: 2004-11-12 20:14:13.360397
  started: 
  finished: 
  elapsed: 
  submitter: onefish
  runner: 
  pid: 
  exit_status: 
  tag: 
  command: echo 'job 0' && sleep 0
-
  jid: 2
  priority: 0
  state: pending
  submitted: 2004-11-12 20:14:13.360397
  started: 
  finished: 
  elapsed: 
  submitter: onefish
  runner: 
  pid: 
  exit_status: 
  tag: 
  command: echo 'job 1' && sleep 1 
-
  jid: 3
  priority: 0
  state: pending
  submitted: 2004-11-12 20:14:13.360397
  started: 
  finished: 
  elapsed: 
  submitter: onefish
  runner: 
  pid: 
  exit_status: 
  tag: 
  command: echo 'job 2' && sleep 2 
-
  jid: 4
  priority: 0
  state: pending
  submitted: 2004-11-12 20:14:13.360397
  started: 
  finished: 
  elapsed: 
  submitter: onefish
  runner: 
  pid: 
  exit_status: 
  tag: 
  command: echo 'job 3' && sleep 3

我们在提交到队列的输出中以 YAML 格式看到了关于每个作业的所有信息。当作业完成时,所有字段都会被填写。此时,我们检查队列的状态

redfish:~/nfs > rq queue status
---
pending : 2
running : 2
finished : 0
dead : 0

由此,我们看到其中两个作业已被节点拾取并正在运行。我们可以使用此输入找出哪些节点正在运行我们的作业

onefish:~/nfs > rq queue list running | egrep 'jid|runner'
 jid: 1
 runner: redfish
 jid: 2
 runner: bluefish

已完成作业的记录会保留在队列中,直到被删除,因为用户通常希望收集此信息。此时,我们期望所有作业都已完成,因此我们检查每个作业的退出状态

bluefish:~/nfs > rq queue list finished | egrep 'jid|command|exit_status'
 jid: 1
 exit_status: 0
 command: echo 'job 0' && sleep 0
 jid: 2
 exit_status: 0
 command: echo 'job 1' && sleep 1
 jid: 3
 exit_status: 0
 command: echo 'job 2' && sleep 2
 jid: 4
 exit_status: 0
 command: echo 'job 3' && sleep 3

所有命令都已成功完成。我们现在可以从队列中删除任何成功完成的作业

twofish:~/nfs > rq queue query exit_status=0 | rq queue delete
---
- 1
- 2
- 3
- 4

Ruby Queue 可以执行相当多的其他有用操作。有关完整说明,请键入rq help.

回顾与展望

选择自己动手构建工具始终是一个艰难的决定,因为它违反了程序员规则第 42 条,该规则明确规定:“每个问题都已解决。它是开源的。并且它是 Google 上的第一个链接。”

当您决定违反规则第 42 条时,拥有像 Ruby 这样的工具至关重要,而像 Ruby Queue 这样的项目可以用 3,292 行代码编写完成这一事实就证明了这一点。由于只计划进行一些主要的增强,因此随着代码库的改进和完善,此代码行总数很可能不会大幅增加。rq 的目标仍然是简单性和易用性。

Ruby Queue 旨在降低科学家们为了实现 Linux 集群的强大功能而必须克服的障碍。提供一个简单易懂的工具来利用多个 CPU 的强大功能,使他们能够将注意力从复杂分布式计算系统的繁琐细节转移回实际进行科学研究的任务。有时,小即是美。

Ara T. Howard 是环境科学合作研究所的研究助理。他将时间用于编写 Ruby 程序,或与妻子 Jennifer 以及三只边境牧羊犬——Eli、Joey 和 Zipper 一起进行山地自行车和滑雪运动。

加载 Disqus 评论