189 8069 5689

如何分析deltalake表schema演进

如何分析delta lake表schema演进,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

创新互联主营铁东网站建设的网络公司,主营网站建设方案,成都App定制开发,铁东h5微信平台小程序开发搭建,铁东网站营销推广欢迎铁东等地区企业咨询

下面主要是深入探究一下delta lake的schema演变。

数据,就像我们的经验一样,总是在不断发展和积累。为了跟上时代的步伐,我们的思维模式必须适应新数据,其中一些包含新的维度-一种新的方式来查看我们以前从未想到的事物。这些思维模式与表的schema没有什么不同,它们定义了我们如何对新信息进行分类和处理。

随着业务问题和需求的发展,数据的结构也将随之变化。使用Delta Lake,随着数据的变化,合并新维度变得容易。用户可以使用简单的语义来控制其表的schema。这些工具包括schema校验(可防止用户因错误或垃圾数据而无意中污染其表)以及schema演进(也就是为了丰富数据而增加一些新的列)。

了解表schema

Apache Spark™中的每个DataFrame都包含一个schema,定义了数据的格式,例如数据类型和列以及元数据。使用Delta Lake,表的schema以JSON格式保存在事务日志中。

什么是schema校验?

schema校验是Delta Lake中的一种安全措施,它通过拒绝对表的schema不匹配的写入来确保数据质量。就像忙碌的餐厅的前台经理只接受预订一样,它会检查插入表中的数据中的每一列是否在其预期列的列表中(换句话说,每一列是否都有“预订”),以及拒绝所有不在列表中的列的写操作。

schema 校验如何工作?

Delta Lake 在write操作上使用schema验证,这意味着在写入时会检查对表的所有新写入是否与目标表的schema兼容。如果schema不兼容,则Delta Lake将完全取消事务(不写入任何数据),并引发异常以使用户知道不匹配的情况。

为了确定对表的写入是否兼容,Delta Lake使用以下规则。要写入的DataFrame:

  • 不能包含目标表的架构中不存在的任何其他列。相反输入的数据不包含表中的某些列是可以的,这些列将被简单地分配为空值。

  • 列的数据类型不能与目标表中的列数据类型不同。如果目标表的列包含StringType数据,但DataFrame中的相应列包含IntegerType数据,则schema强制实施将引发异常并阻止进行写操作。

  • 不能包含仅大小写不同的列名。这意味着不能在同一表中定义诸如“ Foo”和“ foo”之类的列。尽管Spark可用于区分大小写或不区分大小写(默认)模式,但是Delta Lake保留大小写,却在存储schema时不区分大小写。存储和返回列信息时,Parquet区分大小写。为了避免潜在的错误,数据损坏或丢失问题,才添加此限制。

为了说明,请看下面的代码,当试图将一些新计算的列追加到不兼容它们的delta lake表的时候,将发生什么。

# Generate a DataFrame of loans that we'll append to our Delta Lake tableloans = sql("""            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,            CAST(rand(10) * 10000 * count AS double) AS amount            FROM loan_by_state_delta            """)
# Show original DataFrame's schemaoriginal_loans.printSchema()
"""root  |-- addr_state: string (nullable = true)  |-- count: integer (nullable = true)"""
# Show new DataFrame's schemaloans.printSchema()
"""root  |-- addr_state: string (nullable = true)  |-- count: integer (nullable = true)  |-- amount: double (nullable = true) # new column"""
# Attempt to append new DataFrame (with new column) to existing tableloans.write.format("delta") \           .mode("append") \           .save(DELTALAKE_PATH)
""" Returns:
A schema mismatch detected when writing to the Delta table.
To enable schema migration, please set:'.option("mergeSchema", "true")\'
Table schema:root-- addr_state: string (nullable = true)-- count: long (nullable = true)

Data schema:root-- addr_state: string (nullable = true)-- count: long (nullable = true)-- amount: double (nullable = true)
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.
"""

Delta Lake不会自动添加新列,而是强制校验schema并阻止写入。为了帮助确定导致不匹配的列,Spark在堆栈跟踪中打印出了两种schema以进行比较。

模式校验有何用处?

由于这种检查非常严格,所以数据可以直接用于生产环境。常见的使用场景如下:

  • 机器学习算法

  • BI仪表板

  • 数据分析和可视化工具

  • 任何需要高度结构化,强类型语义schema的生产系统

防止数据稀疏

强制性的schema校验,可能会导致大家在编写spark任务的时候拘束比较多,一遇到schema不兼容任务就会崩溃,这个可能是会令人头疼。

但是假设不对schema进行校验,那么随时可能新增列,导致表变的越来越稀疏。其实,这也是一种性能消耗。

所以,schema校验也有防止数据变的越来越稀疏的作用。

什么是schema演变?

schema演变简单来数就是表的schema会随着数据的变化而变化。最常见的是,在执行附加或覆盖操作时使用它来自动调整schema以包括一个或多个新列。

schema演变如何工作?

配置很简单,通过添加 .option('mergeSchema', 'true')到您的.write或.writeStreamSpark命令来启动schema演变  。

# Add the mergeSchema optionloans.write.format("delta") \           .option("mergeSchema", "true") \           .mode("append") \           .save(DELTALAKE_SILVER_PATH)

执行下面的sql表达式:

# Create a plot with the new column to confirm the write was successful%sqlSELECT addr_state, sum(`amount`) AS amountFROM loan_by_state_deltaGROUP BY addr_stateORDER BY sum(`amount`)DESC LIMIT 10

可以绘制一张统计图:

如何分析delta lake表schema演进

通过mergeSchema设置为true,DataFrame中存在但目标表中不存在的所有列将作为写事务的一部分自动添加到schema的末尾。还可以添加嵌套字段,并且这些字段也将添加到其各自的struct列的末尾。

数据工程师和科学家可以使用此选项在其现有的机器学习生产表中添加新列(也许是新跟踪的指标,或本月销售数字的列),而不会破坏依赖旧列的现有模型。

在表追加或覆盖期间,以下类型的模式更改可用于schema演变:

  • 添加新列(这是最常见的情况)

  • 从NullType->任何其他类型更改数据类型,或从ByteType-> ShortType-> IntegerType更改数据

其他不适合架构演变的更改要求通过添加.option("overwriteSchema", "true")来覆盖schema和数据。例如,在“ Foo”列最初是integer数据类型,而新模式将是字符串数据类型的情况下,则需要重写所有Parquet(数据)文件。这些更改包括:

  • 删除列

  • 更改现有列的数据类型

  • 重命名仅因大小写而异的列名(例如“ Foo”和“ foo”)

最后,在Spark 3.0中,ALTER TABLE将完全支持显式DDL,从而允许用户对表schema执行以下操作:

  • 添加列

  • 更改列注释

  • 设置定义表行为的表属性,例如设置事务日志的保留期限

模式演化有何用处?

在打算更改表的schema时可以使用模式演变。这是迁移架构的最简单方法,因为它会自动添加正确的列名称和数据类型,而无需显式声明它们。

模式校验会拒绝与表不兼容的任何新列或其他模式更改。通过制定和遵守这些高标准,分析人员和工程师可以相信他们的数据具有最高的完整性,并且可以清晰地进行推理,从而使他们能够做出更好的业务决策。

在另一方面,schema演变通过使schema自动发生更改变得容易,从而补充了schema的强制校验。毕竟,添加一列并不难。

schema校验是架构演进的核心。当一起使用时,这些功能比以往任何时候都更容易阻止噪声的产生。

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。


当前标题:如何分析deltalake表schema演进
分享路径:http://jkwzsj.com/article/jopsoo.html

其他资讯