在 Slurm 下使用 MySQL 进行负载均衡和作业控制
和现在的大多数事物一样,现代大气科学完全是关于大数据的。无论是飞行器上搭载的仪器每秒多次拍摄图像,在为期两周的飞行任务中每天产生四分之三个 TB 的数据,还是卫星仪器在 10-15 年的寿命期内每天产生数百 GB 的光谱数据,数据量都是巨大的。仅仅分析一天的数据以跟踪基本仪器稳定性就非常消耗 CPU。完全处理一天的数据以检索大气状态或查看十年数据的趋势则更是如此。
高性能并行集群计算是关键。多年来,我一直在非常基本的层面上这样做,即在实验室周围的几台计算机上启动少量处理脚本副本,但在最近搬到一个新实验室后,我得到了第一个在真正的集群系统上工作的机会,处理来自名为 AIRS(参见“资源”)的星载高光谱探测器的数据。AIRS 是美国国家航空航天局 AQUA 卫星上搭载的仪器之一,该卫星于 2002 年底发射,此后一直在持续运行。来自 AIRS 和类似仪器的数据用于绘制全球大气温度和痕量气体的垂直剖面图,但我们必须首先能够处理这些数据。
这里的集群计算游戏严格来说是让大量计算机对大量数据做同样的事情,以便我们能够以比收集数据更快的速度处理数据(快得多当然更好)。由于几个月前我还是这个游戏的新手,我有很多东西要学习,关于集群计算以及如何设计算法和处理软件以利用多个 CPU 进行处理。这是我第一次拥有数百个 CPU 可供支配的经验,它真的改变了我处理数据的方式。我开始写这篇文章是为了描述我如何被展示并行化这种类型的数据处理,以及我整理出来的一种使该过程更加简洁的方法。
基本 Slurm这里的集群系统由 240 个计算节点组成,每个节点都配备双路 8 核处理器和 64GB 主内存,运行 Red Hat Enterprise Linux。集群作业通过 Slurm 工作负载管理器(参见“资源”)进行调度运行。简而言之,Slurm 是一套程序,其作用是在用户和计算作业之间分配计算机资源,并执行共享规则以确保每个人都有机会完成他们的工作。该套件中用于实际在系统上工作的两个最重要的程序是 sbatch
和 srun
。
sbatch
是 Slurm 调度器的入口点,它读取一个高级 Bash 控制脚本,该脚本指定作业参数(所需的节点数、每个进程的内存、预期运行时间等等),并通过调用 srun 生成请求数量的相同作业。
#!/bin/bash
# sbatch options
#SBATCH --job-name=RUN_CALFLAG
#SBATCH --partition=batch
#SBATCH --qos=short
#SBATCH --ntasks=20
#SBATCH --mem-per-cpu=18000
#SBATCH --cpus-per-task 1
#SBATCH --time=00:20:00
# matlab options
MATLAB=/usr/bin/matlab
MATOPT=' -nojvm -nodisplay -nosplash'
srun --output=~/run_calflag.log
↪$MATLAB $MATOPT -r "run_calflag; exit;"
清单 1 中的脚本要求 Slurm 分配 20 个处理器 (ntasks
/cpus-per-task
),为每个处理器分配 18GB 内存 (mem-per-cpu
),并在每个处理器上运行一个作业 (srun
...),该作业将大约需要 20 分钟 (time
) 才能运行。partition
和 qos
指令帮助系统管理其资源并设置用户允许的处理器数量、CPU 时间限制等等规则。job-name
指令为您的任务命名,以帮助您在系统队列的 squeue
列表中区分您的作业。
此处显示的 srun
请求是在每个分配的节点上运行 MATLAB 实例,并在每个实例中运行脚本 run_calflag
并退出。任何消息输出都将发送到由 output
参数指定的文件。run_calflag
可以是一个简单的“hello world”脚本,也可以是一个循环来处理一千个文件。它也不必是 MATLAB。MATLAB 是我们在这里选择的工具,本文中的示例以某种背景方式使用它。没有必要理解 MATLAB 才能继续阅读。
只要此请求不违反任何集群良好行为规则,例如占用处理器、占用内存等等,Slurm 就会将该请求排队,直到有处理器可用来运行它。当资源可用时,Slurm 通过获取 20 个 CPU,然后在每个 CPU 上启动一个 matlab
/run_calflag
副本开始处理。一旦控制脚本就绪,此请求将通过 sbatch
命令提交给 Slurm
sbatch run_calflag_batch.sh
Slurm 还管理一组环境变量,这些变量可用于将一些作业参数传递到处理脚本中。
基本数据分块清单 1 已经显示出足以看到并行化此类数据处理的一个问题:如何对数据进行分块以传递给 srun
启动的每个 run_calflag
实例?如果我想处理一年,我应该请求 365 个处理器并在每个处理器上处理一天吗?52 个处理器,每个处理器处理一周?每个处理器处理一个月?我该如何处理闰年?集群资源分配规则阻止我采用 365 个处理器的想法,但除此之外,没有明确的、唯一的答案。
作为集群计算的新手,我研究了我的同事们为设置他们的处理运行所做的工作。大多数人采用了三层系统来运行这些作业
-
一个带有
sbatch
指令和srun
调用的 bash 脚本,用于启动所有内容。 -
一个 MATLAB 脚本(由前一个脚本中的
srun
调用)在每个计算节点上运行,该脚本使用 Slurm 环境中的某个节点 ID(通常是SLURM_PROC_ID
)来索引处理到要处理的年/日/文件范围。最常见的方法是请求 12 个 CPU,并为每个 CPU 分配一个月,或请求 52 个 CPU,并为每个 CPU 分配一周。然后,此脚本循环遍历“分配”给此节点的年/日/文件。在此循环中,调用最终的 MATLAB 脚本,该脚本对序列中的每个年/日/文件执行实际处理。
这种方法当然可以工作,但它存在一些重大问题
-
数据的临时分块:一般来说,如何将要处理的“x”项分配到“y”个节点上?在实践中,这似乎意味着您必须编辑运行脚本,并针对您希望执行的几乎每次运行进行定制。(几乎可以肯定的是,您将在这个业务中进行多次运行:一次尝试处理连续的数据段,第二次重新运行现在不连续的、由于某种原因而失败的天数。)
-
未能充分利用分配的系统资源:按月并行化时,处理二月的节点几乎肯定会在总运行时间的 8-10% 内处于空闲状态,仅仅是因为它比其他月份短 2-3 天。或者,如果较低级别的处理失败并导致在该 CPU 上运行的整个进程崩溃,则该块的其余部分将需要在稍后重新处理,并且处理器在其余处理器完成时处于空闲状态。
-
未能充分利用我的时间:每当我需要更改处理作业的数量或将它们分散到的节点数量时,我都必须手动重新计算如何分散作业,并且很可能编辑我的控制脚本。我绝对讨厌保留一堆脚本,它们都做同样的事情,但每个脚本都针对一些特殊的边缘情况。
好的,那么,我们如何解决这个问题呢?
作业控制堆栈真正需要的是某种作业调度堆栈,人们可以在其中存储每个原子处理步骤所需的任何参数。可以将查找要处理的正确数据所需的任何信息推送到此列表中,然后,当节点空闲时,各种脚本可以弹出下一个未处理的项目。假设我们需要处理的数据是每天存档一个文件,并且我们想要处理几年的数据。作业控制堆栈的想法是,我们可以列出我们想要处理的文件。这可能就像运行 ls
或 find
一样简单。然后,我们可以将它们的详细信息推送到堆栈上并开始并行处理它们。我们要求 Slurm 为我们提供一些处理器,并在每个处理器上启动 MATLAB 以及我们的二级处理脚本。每个实例都是一个与之前类似的循环,但每个循环迭代都查询堆栈以查找下一个可用的未处理数据。如果有数据要处理,则将其传递给低级别处理脚本。如果没有,循环退出,MATLAB 终止。系统不在乎我们是否有 365 天要处理或 2,431 天要处理,或者我们是否想将这些天数分散到两个或 200 个处理器上。这意味着我们也不必在乎。
当主作业完成,并且我们发现某些天数没有正确处理时,我们只需列出失败的文件,将其推入作业堆栈并再次运行(当然,在修复初始失败的原因之后)。几乎没有理由编辑任何脚本来做到这一点。
这个想法似乎解决了所有重大问题
-
没有数据的临时分块:一个进程,一个文件。根据需要重复。
-
节点资源利用率更高且更均匀:如果处理器遇到一系列处理速度很快的天数,它只会获取更多天数来处理,直到没有更多天数为止。
-
如果节点死机——无论是由于系统错误还是由于其他未充分捕获的错误(文件丢失、文件/数组索引问题过短)——处理会在剩余节点上平衡。您丢失并需要重新处理一天的数据,但其余数据将在其他地方完成。
如果能够使用平面文件来实现简单性会很好,但这种方法导致了不同的处理器抓取相同数据进行处理时出现了一些严重问题。需要能够在处理器抓取数据后锁定数据,这促使我使用 MySQL 数据库表来实现此作业堆栈。
清单 2. SQLCREATE TABLE
脚本,用于定义基本作业控制表
CREATE TABLE `JobManagement` ( `task_id` int(11) NOT
↪NULL, `entry_id` int(11) NOT NULL AUTO_INCREMENT,
↪`node_id` int(11) DEFAULT NULL, `node_start` timestamp
↪NOT NULL DEFAULT '0000-00-00 00:00:00', `node_end`
↪timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
↪`datapath` varchar(256) DEFAULT NULL, `task_name`
↪varchar(128) NOT NULL, PRIMARY KEY (`task_id`,`entry_id`) )
↪ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;
清单 2 是一个 MySQL CREATE TABLE
命令,用于创建当前实现的作业控制表。此表中的大多数值在将运行推送到堆栈时填充。node_start
和 node_id
在作业从堆栈中弹出并开始活动处理时填充。node_end
在运行完成时填充。node_start
、node_end
和 node_id
不是特别必要,但它们收集了有关运行时性能的有用统计信息(尽管您可以从 Slurm 的 sacct
命令中获得相同的信息)。
push
函数。sDataPath
是要处理的数据文件的绝对路径。这被推送到作业控制表中,使其可用于处理。
function push_job_table(iTaskID, sJobName, sDataPath)
sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB';
sSQL = sprintf(['echo "insert into JobManagement (task_id, year, ' ...
'doy, task_name) values (%d, \\"%s\\", \\"%s\\");"'],
iTaskID, ...
sDataPath, sJobName);
[status, cmdout] = system([sSQL ' | ' sMYSQL]);
要运行的作业由如清单 3 所示的例程推送到堆栈。当作业被推送到堆栈时,task_id
采用 Slurm 分配给整个处理运行的进程 ID,而 entry_id
被分配一个从 1 开始的计数器。此计数器随着在 task_id
下添加的每个处理运行而递增。entry_id
和 task_id
共同构成表的主键,因此对于表中的每个记录都是唯一的。检索要处理的数据所需的实际信息存储在 datapath
中。datapath
指定需要处理的主数据文件的绝对路径,在大多数情况下可以直接从 ls
或 find
填充,但如果我们需要匹配多个输入文件或首先检查文件是否存在等等,也可以来自具有更复杂逻辑的例程。当处理器查询数据库以获取新作业时,如清单 4 所示,datapath
的值是返回的主要信息。
pop
函数。查询作业控制表以查找下一个可用的数据文件进行处理。如果数据可用,则返回其路径。如果不可用,则返回一个空字符串,导致进一步处理结束。11846l4.qrk
function stJobEntry = pop_job_table(taskid, entryid)
% determine which node we are on for table update (for performance
% tracking)
iNodeID = str2num(getenv('SLURM_PROCID'));
sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB';
% select one entry from the job management table that hasn't been
% done yet. Immediately update with this processor's node_id to try
% to lock the row out from further requests.
sPOPSQL = sprintf(['echo "set @B=%d; set @C=%d;' ...
'select @A:=entry_id as entry_id, datapath from ' ...
'JobManagement where task_id = @B and entry_id = @C' ...
'and node_id ' ...
'is null limit 1 for update;' ...
'update JobManagement set node_id = %d,' ...
'node_start = now() where task_id = @B and ' ...
'entry_id = @A;' ...
'set @A=0;"'], taskid, entryid, iNodeID);
% execute the table pop and get a day that is not being processed
[status, cmdout] = system([sPOPSQL ' | ' sMYSQL ' | head -2 | tail ' ...
'-1']);
stJobEntry = struct('entry',0, 'datapath','');
if length(cmdout) > 0
% parse cmdout to entry_id, datapath
iTokens = str2num(cmdout);
stJobEntry.entry = iTokens(1);
stJobEntry.datapath = iTokens(2);
end
清单 4 为我们提供了堆栈的 pop
函数。pop
中的想法是选择数据库中下一个可用的行并将其锁定以进行更新。通过使用抓取它的处理器的 node_id
以及处理的开始时间来更新记录,从而完全锁定记录。这在一个命令中完成,以最大限度地减少另一个处理器可以抓取同一记录的 CPU 时间。一旦 node_id
被设置,该记录将永远不会再次被考虑。
检索到的记录,或者更确切地说,存储在其中的 datapath,被传递到一个 MATLAB 结构中,该结构将被返回给调用函数,在那里它将被用于开始处理。push
和 pop
是真正需要的全部,但因为我们努力包含了 node_stop
来跟踪运行时,所以需要一个例程来关闭作业表条目。清单 5 显示了此关闭的一个版本,该版本只是更新记录以在 node_stop
字段中添加结束时间。这里不需要显式锁定。
function close_job_table_entry(iTaskID, iEntryID)
sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB';
% close out day by adding processing end time to db table
sSQL = sprintf(['echo "update JobManagement set node_end=now() ' ...
'where task_id = %d and entry_id = %d;"'], ...
iTaskID, iEntryID);
[status, cmdout] = system([sSQL ' | ' sMYSQL]);
一些读者可能想知道为什么我通过 shell 命令和 system()
转义而不是某些本机数据库访问来完成数据库工作。这实际上是出于三个原因。首先,我很懒,我无法使任何类型的 MATLAB 本机数据库连接工作。其次,这样我可以轻松地在命令行开发和测试,然后粘贴到我的代码中,最后,我发现这种方法更易于移植。如果我们决定成为 IDL 或 Python 商店,我可以尽快移植这些例程,就像我可以查找它们各自的系统调用一样快。系统 shell 转义可能较慢,但在这种用法中,它们只会为运行时间为数十小时的运行增加几分钟。花时间追逐这几分钟并没有多大意义。
希望您现在确信作业控制表是在 Slurm 下运行作业的最佳方式。这种方法显着提高了我们例行处理的几件事的处理时间。过去需要数天才能完成的运行现在一天即可完成,而且我根本没有花任何时间为每次运行重写脚本,因此它绝对对我有用。
资源AIRS: http://airs.jpl.nasa.gov