开始使用 AWS Glue 数据质量动态规则来构建 ETL 管道 大数据博客
开始使用 AWS Glue 数据质量动态规则进行 ETL 管道
作者:Prasad Nadig Edward Cho Mahammadali Saheb Rahul Sharma McDaniel Tyler日期:2024年5月23日分类:分析、AWS Glue、中级 (200)、技术操作指南永久链接评论
关键要点
数据集成管道可以提取和转换数据数据质量规则确保提取的数据达到业务决策的高质量标准动态规则根据历史数据自动调整,不必定期更新静态规则本文介绍如何创建 AWS Glue 作业,以动态规则测量和监控数据质量成千上万的组织正在构建数据集成管道,以提取和转换数据。通过设定数据质量规则,确保所提取的数据能够满足高质量标准,从而进行准确的商业决策。这些规则根据固定标准评估数据,但当商业环境变化时,数据属性也随之改变,这会使这些固定标准过时,从而影响数据质量。
例如,某零售公司数据工程师建立了一个规则,要求每日电商销售额必须超过100万美元。然而,几个月后每日销售额超过200万美元,这使得该阈值变得过时。由于缺乏通知和手动分析与更新规则的工作量,数据工程师无法及时更新规则。随后,业务用户发现销售额下降了25。经过几个小时的调查,数据工程师发现负责从某些商店提取数据的 ETL 管道在没有生成错误的情况下失败了。陷入过时阈值规则的作业仍然成功运行,但未能识别出这一问题。如果数据工程师能够设置动态阈值,随商业数据的变化而自动调整,那会怎样呢?
我们欣喜地介绍如何使用 动态规则,这是 AWS Glue 数据质量 的一项新功能。现在,您可以定义动态规则,而不必定期更新静态规则,以适应数据趋势的变化。该功能使您能够编写动态规则,将当前指标与历史值进行比较。这种历史比较通过在表达式中使用 last(k) 操作符来实现。例如,不再需要编写像 RowCount gt 1000 这样的静态规则随着数据量增长可能变得过时,而可以用 RowCount gt min(last(3)) 替代。当当前运行的行数大于最近三次运行中最低的行数时,该动态规则将成功。
这是七部分系列文章的第七篇,旨在解释 AWS Glue 数据质量的工作原理。您可以查看系列文章中的其他内容:
第1部分:从 AWS Glue 数据目录开始使用 AWS Glue 数据质量第2部分:开始使用 AWS Glue 数据质量进行 ETL 管道第3部分:使用 AWS Glue 数据质量在多个数据集上设置数据质量规则第4部分:设置警报并配置 AWS Glue 数据质量规则第5部分:可视化 AWS Glue 数据质量生成的数据质量分数和指标第6部分:衡量 AWS Glue 数据质量在 ETL 管道中的性能第7部分:开始使用 AWS Glue 数据质量动态规则进行 ETL 管道前面的文章解释了如何编写静态数据质量规则。在本文中,我们将展示如何创建 AWS Glue 作业,使用动态规则来测量和监控数据管道的数据质量,并基于数据质量结果采取行动。
解决方案概述
让我们考虑一个示例数据质量管道,其中数据工程师从原始区中获取数据并将其加载到数据湖的整理区。数据工程师的任务不仅是提取、转换和加载数据,还要识别与历史运行的数据质量统计数据相比的异常情况。
在本篇文章中,您将学习如何在您的 AWS Glue 作业中编写动态规则,以便根据结果采取适当的行动。
本文使用的数据来源于 纽约市黄色出租车行程数据。黄色出租车行程记录包含多个字段,包括提取和卸载日期和时间、提取和卸载位置、行程距离、分项费用、费率类型、付款类型以及司机报告的乘客人数。以下截图展示了数据的示例。

使用 AWS CloudFormation 设置资源
本篇文章提供了一个 AWS CloudFormation 模板,以帮助您快速进行设置。您可以审查并根据需要进行自定义。
CloudFormation 模板生成以下资源:
资源说明一个 Amazon Simple Storage Service 的桶 (gluedataqualitydynamicrules)用于存储 ETL 过程中生成的数据一个 AWS Lambda将在上述 Amazon S3 桶中创建以下文件夹结构: rawsrc/ landing/nytaxi/ processed/nytaxi/ dqresults/nytaxi/AWS 身份与访问管理 (IAM) 用户、角色和策略IAM 角色 GlueDataQuality 拥有在 S3 桶上的 AWS Glue 运行权限以及读取和写入权限创建资源,请完成以下步骤:
登录到 AWS CloudFormation 控制台,选择 useast1 区域。选择 Launch Stack 选择 我承认 AWS CloudFormation 可能会创建 IAM 资源。选择 创建堆栈,并等待堆栈创建步骤完成。上传示例数据
下载数据集 到您的本地计算机。解压文件,将 Parquet 文件提取到本地文件夹。将 parquet 文件上传到 Amazon S3 桶中的 rawsrc/ 前缀下 (gluedataqualitydynamicrules)实施解决方案
开始配置解决方案,请完成以下步骤:
在 AWS Glue Studio 控制台 中,导航到导航窗格中的 ETL Jobs,选择 Visual ETL。进入 作业详细信息 标签以配置作业。在 名称 中输入 GlueDataQualityDynamicRules在 IAM 角色 中,选择以 GlueDataQuality 开头的角色。在 作业书签 中,选择 启用。启用此选项可以使作业增量运行。有关作业书签的更多信息,请参考 使用作业书签跟踪处理过的数据。
保留所有其他设置为默认值。选择 保存。作业保存后,导航到 可视化 标签,在 来源 菜单中选择 Amazon S3。在 数据源属性 S3 面板中,对于 S3 来源类型,选择 S3 位置。选择 浏览 S3,并导航到以 gluedataqualitydynamicrules 开头的 S3 桶中的 /landing/nytaxi/ 的前缀。对于 数据格式,选择 Parquet,并选择 推断架构。在 转换 菜单中,选择 评估数据质量。现在,您能够在处理过程中实现验证逻辑,以识别源数据中可能的数据质量问题。
为实现此目的,请在 规则集编辑器 标签中指定以下 DQDL 规则:
plaintextCustomSql select vendorid from primary where passengercount gt 0 with threshold gt 09Mean tripdistance lt max(last(3)) 150Sum totalamount between min(last(3)) 08 and max(last(3)) 12RowCount between min(last(3)) 09 and max(last(3)) 12Completeness fareamount gt= avg(last(3)) 09DistinctValuesCount ratecodeid between avg(last(3))1 and avg(last(3))2DistinctValuesCount pulocationid gt avg(last(3)) 08ColumnCount = max(last(2))
选择 原始数据 以输出源的原始输入数据,并在 评估数据质量 节点下添加新节点。选择 添加新列以指示数据质量错误,向输出架构添加四个新列。选择 数据质量结果 以捕获每个配置规则的状态,并在 评估数据质量 节点下添加新节点。在选择 rowLevelOutcomes 节点后,选择 Amazon S3 在 目标 菜单中。将 S3 目标位置配置为桶名以 gluedataqualitydynamicrules 开头的 /processed/nytaxi/,将输出格式设置为 Parquet,压缩类型设置为 Snappy。在选择 ruleOutcomes 节点后,选择 Amazon S3 在 目标 菜单中。将 S3 目标位置配置为桶名以 gluedataqualitydynamicrules 开头的 /dqresults/。将输出格式设置为 Parquet,压缩类型设置为 Snappy。选择 保存。到目前为止,您已成功设置 AWS Glue 作业,指定了管道的动态规则,并配置了原始源数据和 AWS Glue 数据质量结果将被写入 Amazon S3 的目标位置。接下来,让我们探讨动态规则及其工作原理,并解释我们在作业中使用的每一条规则。
动态规则
现在,您可以编写动态规则,将当前规则生成的指标与其历史值进行比较。这种历史比较允许通过 last() 操作符在表达式中实现。例如,规则 RowCount gt max(last(1)) 将在当前运行中的行数大于前一个相同数据集的行数时成功。last() 接受一个可选的自然数参数,用于描述要考虑多少之前的指标;last(k),其中 k gt= 1,将引用最近的 k 个指标。该规则具有以下条件:
如果没有可用的数据点,last(k) 将返回默认值 00如果可用的指标少于 k 个,last(k) 将返回所有的先前指标例如,如果之前运行的值为 (5 3 2 1 4),则 max(last(3)) 将返回 5。
AWS Glue 支持超过 15 种类型的动态规则,提供了一整套强大的数据质量验证能力。有关更多信息,请参阅 动态规则。本节演示了多种规则类型,以展示其功能,并使您能够在自己的用例中应用这些功能。
小熊加速器下载官网CustomSQL
CustomSQL 规则允许您对数据集运行自定义 SQL 语句,并根据给定表达式检查返回值。
以下示例规则使用 SQL 语句,您在 SELECT 语句中指定列名,并与某个条件进行比较以获取行级结果。阈值条件表达式定义了必须有多少记录失败,以致整个规则失败。在此示例中,超过 90 的记录应包含 passengercount 大于 0 才能通过规则:
plaintextCustomSql select vendorid from primary where passengercount gt 0 with threshold gt 09
注意:Custom SQL 也支持动态规则,以下是如何在您的作业中使用的示例:
plaintextCustomSql select count() from primary between min(last(3)) 09 and max(last(3)) 12
Mean
Mean 规则检查某列所有值的均值平均值是否满足给定表达式。
以下示例规则检查 tripdistance 的均值是否小于过去三次运行的列值的最大值乘以 15:
plaintextMean tripdistance lt max(last(3)) 150
Sum
Sum 规则检查某列所有值的总和是否满足给定表达式。
以下示例规则检查 totalamount 的总和是否在过去三次运行的最小值的 80 和最大值的 120 之间:
plaintextSum totalamount between min(last(3)) 08 and max(last(3)) 12
RowCount
RowCount 规则检查数据集的行数是否满足给定表达式。在表达式中,您可以使用操作符如 gt 和 lt 来指定行的数量或范围。
以下示例规则检查行数是否在过去三次运行的最小值的 90 和最大值的 120 之间不包括当前运行。该规则适用于整个数据集。
plaintextRowCount between min(last(3)) 09 and max(last(3)) 12
在您的 IDE 中自定义 Amazon Q 开发者使用您的私人代码库 新闻博客
自定义 Amazon Q 开发者在你的 IDE 中与私有代码库重点摘要今天,我们正式推出了 Amazon Q 开发者的定制功能,支持代码编辑器中的内联代码完成,并推出了聊天定制的预览版。现在,用户可以在 IDE 中以及聊天中,自定义 Amazon Q 以从私有代码库生成特定的代码建议。Amazon ...