使用解释计划调试 Apache Spark 性能

13,855次阅读
没有评论

共计 6324 个字符,预计需要花费 16 分钟才能阅读完成。

在数据处理领域,Apache Spark 已成为一个强大且多功能的框架。然而,随着数据量和复杂性不断增长,确保最佳性能变得至关重要。

在这篇博文中,我们将探讨解释 计划如何成为调试和优化 Spark 应用程序的秘密武器。我们将深入探讨 Spark Scala 的基础知识并提供清晰的示例,以帮助您了解如何利用这个有价值的工具。

解释计划是什么?

解释计划是 Spark 处理数据所遵循的逻辑和物理执行步骤的全面细分。将其视为指导您完成 Spark 作业内部运作的路线图。

Spark 解释计划的两个重要组成部分是:

  1. 逻辑计划:逻辑计划代表 Spark 应用程序中指定的高级转换和操作。它是对您想要对数据执行的操作的抽象描述。

  2. 物理计划:另一方面,物理计划提供了 Spark 如何将逻辑计划转化为一组具体操作的具体细节。它揭示了 Spark 如何优化您的工作以获得性能。

Explain API 还有一些其他重载方法:

  • explain()- 打印物理计划。

  • explain(extended: Boolean)- 打印计划(逻辑和物理)。

  • explain(mode: String)- 使用给定解释模式指定的格式打印计划(逻辑和物理):

    • simple 仅打印物理计划。

    • extended:打印逻辑计划和物理计划。

    • codegen:打印物理计划并生成代码(如果可用)。

    • cost:打印逻辑计划和统计数据(如果有)。

    • formatted:将解释输出分为两部分:物理计划大纲和节点详细信息。

解释计划的使用

我们可以使用 explain()DataFrame 或 Dataset 上的方法来做到这一点。这是使用解释计划的一个简单示例:

import org.apache.spark.sql.SparkSession

// 创建 SparkSession
val spark = SparkSession.builder()
  .appName("ExplainPlanExample")
  .getOrCreate()

// 为员工创建一个虚拟 DataFrame
val employeesData = Seq((1, "Alice", "HR"),
  (2, "Bob", "Engineering"),
  (3, "Charlie", "Sales"),
  (4, "David", "Engineering")
)

val employeesDF = employeesData.toDF("employee_id", "employee_name", "department")

// 为工资创建另一个虚拟 DataFrame
val salariesData = Seq((1, 50000),
  (2, 60000),
  (3, 55000),
  (4, 62000)
)

val salariesDF = salariesData.toDF("employee_id", "salary")

// 将 DataFrame 注册为 SQL 临时表
employeesDF.createOrReplaceTempView("employees")
salariesDF.createOrReplaceTempView("salaries")

// 使用 Spark SQL 计算每个部门的平均工资
val avgSalaryDF = spark.sql("""
  SELECT department, AVG(salary) as avg_salary
  FROM employees e
  JOIN salaries s ON e.employee_id = s.employee_id
  GROUP BY department
""")


// 使用扩展模式调用解释计划来打印物理和逻辑计划
avgSalaryDF.explain(true)

// 停止 SparkSession
spark.stop()

在上面的示例中,我们创建了一个实例 employeeData 和 salariesData  DataFrame,并执行连接,然后进行聚合以获得部门的平均工资。以下是给定数据框的解释计划。

scala> avgSalaryDF.explain(true)
== Parsed Logical Plan ==
'Aggregate ['department], ['department,'AVG('salary) AS avg_salary#150]
+- 'Join Inner, ('e.employee_id = 's.employee_id)
   :- 'SubqueryAlias e
   :  +- 'UnresolvedRelation [employees], [], false
   +- 'SubqueryAlias s
      +- 'UnresolvedRelation [salaries], [], false

== Analyzed Logical Plan ==
department: string, avg_salary: double
Aggregate [department#135], [department#135, avg(salary#147) AS avg_salary#150]
+- Join Inner, (employee_id#133 = employee_id#146)
   :- SubqueryAlias e
   :  +- SubqueryAlias employees
   :     +- View (`employees`, [employee_id#133,employee_name#134,department#135])
   :        +- Project [_1#126 AS employee_id#133, _2#127 AS employee_name#134, _3#128 AS department#135]
   :           +- LocalRelation [_1#126, _2#127, _3#128]
   +- SubqueryAlias s
      +- SubqueryAlias salaries
         +- View (`salaries`, [employee_id#146,salary#147])
            +- Project [_1#141 AS employee_id#146, _2#142 AS salary#147]
               +- LocalRelation [_1#141, _2#142]

== Optimized Logical Plan ==
Aggregate [department#135], [department#135, avg(salary#147) AS avg_salary#150]
+- Project [department#135, salary#147]
   +- Join Inner, (employee_id#133 = employee_id#146)
      :- LocalRelation [employee_id#133, department#135]
      +- LocalRelation [employee_id#146, salary#147]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[department#135], functions=[avg(salary#147)], output=[department#135, avg_salary#150])
   +- Exchange hashpartitioning(department#135, 200), ENSURE_REQUIREMENTS, [plan_id=271]
      +- HashAggregate(keys=[department#135], functions=[partial_avg(salary#147)], output=[department#135, sum#162, count#163L])
         +- Project [department#135, salary#147]
            +- BroadcastHashJoin [employee_id#133], [employee_id#146], Inner, BuildRight, false
               :- LocalTableScan [employee_id#133, department#135]
               +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=266]
                  +- LocalTableScan [employee_id#146, salary#147]


scala>

正如您在上面看到的,当 extended 标志设置为 时 true,我们有解析的逻辑计划、分析的逻辑计划、优化的逻辑计划和物理计划。

在尝试理解计划之前,我们需要自下而上地阅读所有计划。因此我们将在底部看到任何数据帧的创建或读取。

我们将了解其中的每一项:

解析逻辑计划

这是 Spark 解析用户提供的 SQL 或 DataFrame 操作并创建查询的解析表示的初始阶段。使用 Spark SQL 查询时,任何语法错误都会在此处捕获。如果我们在这里观察,列名称尚未解析。

初始阶段示意图

在上面解析的逻辑计划中,我们可以看到 UnresolvedRelation。这意味着架构尚未解决。解析后的逻辑计划概述了查询的逻辑结构,包括聚合和连接操作。employees 它还定义别名并标识和 DataFrame 的源 salaries。未解析的关系将在查询执行期间解析为其实际数据源。

分析逻辑计划

解析之后,Spark 会经历一个称为语义分析或解析的过程。在此阶段,Spark 根据可用表和列的目录检查查询,解析列名称,验证数据类型,并确保查询在语义上正确。结果是经过分析的逻辑计划,其中包含有关所涉及的表和列的元数据和信息。

语义分析或解析的过程

该计划表示解析和语义分析后查询的初始逻辑结构。它显示了包含两列的结果架构,department 和 avg_salary。该计划由两个子查询 e, 和组成 s,它们对应于 employees 和 salariesDataFrame。employee_id 内连接操作作为连接键应用于这些子查询之间。该计划还包括别名 (e 和 s) 以及用于选择特定列的投影操作。

优化的逻辑计划

一旦分析了查询并且 Spark 对数据和模式有了清晰的了解,它就会继续优化查询。在优化过程中,Spark 会对查询计划应用各种逻辑优化来提高性能。这可能涉及谓词下推、常量折叠和基于规则的转换等技术。优化的逻辑计划代表了在数据检索和处理方面更高效的查询版本。

1.png

优化阶段简化了计划以获得更好的性能。在这种情况下,计划被简化以删除子查询和不必要的投影操作。它使用连接键直接连接两个本地关系 (employees 和 salaries) employee_id,然后应用聚合来计算每个部门的平均工资。

物理计划

物理计划也称为执行计划,是查询优化的最后阶段。此时,Spark 会生成一个关于如何在集群上物理执行查询的计划。它考虑了数据分区、数据洗牌和跨节点的任务分配等因素。物理计划是实际执行查询的蓝图,它考虑了可用资源和并行性以有效地执行查询。

2.png

物理计划概述了 Spark 执行查询所采取的实际执行步骤。它涉及聚合、联接和数据扫描,以及广播联接等优化技术以提高效率。该计划反映了 Spark 将遵循的执行策略来计算查询结果。现在,让我们仔细检查每一行以更深入地了解(从下到上)。

  • LocalTableScan:这些是本地表的扫描。在本例中,它们代表表或 DataFrames employee_id#133、department#135、employee_id#146 和 salary#147。这些扫描从本地分区检索数据。

  • BroadcastExchange:此操作将较小的 DataFrame (employee_id#146 和 salary#147) 广播到所有工作节点以进行广播连接。它将广播模式指定为 HashedRelationBroadcastMode 并指示应广播输入数据。

  • BroadcastHashJoin:这是两个数据源(employee_id#133 和 employee_id#146)之间使用 inner join. 它构建连接的右侧,因为它被标记为“BuildRight”。此操作执行广播连接,这意味着它将较小的 DataFrame(右侧)广播到较大的 DataFrame(左侧)所在的所有节点。当一个 DataFrame 明显小于另一个 DataFrame 时,这样做是为了优化目的。

  • 项目:此操作从数据中选择 department 和列。salary

  • HashAggregate (partial_avg):这是一个部分聚合操作,计算每个部门的平均工资。它包括附加列 sum#162 和 count#163L,分别表示工资总和和记录数。

  • Exchange hashpartitioning:此操作基于列对数据执行哈希分区 department#135。它的目标是将数据均匀分布在 200 个分区中。该 ENSURE_REQUIREMENTS 属性表明该操作保证了后续操作的要求。

  • HashAggregate:avg(salary#147) 这是一个聚合操作,计算列中每个唯一值的平均工资 () department。输出包括两列:department#135 和 avg_salary#150。

  • AdaptiveSparkPlan:这表示 Spark 查询的顶级执行计划。该属性 isFinalPlan=false 表明该计划尚未最终确定,这表明 Spark 可能会在执行期间根据运行时统计数据调整该计划。

结论

了解 Spark SQL 生成的执行计划对于开发人员来说非常有价值,体现在以下几个方面:

  • 查询优化:通过检查物理计划,开发人员可以深入了解 Spark 如何优化其 SQL 查询。它可以帮助他们查看查询是否有效地使用可用资源、分区和联接。

  • 性能调优:开发人员可以识别计划中潜在的性能瓶颈。例如,如果他们注意到不必要的改组或数据重新分配,他们可以修改查询或调整 Spark 配置以提高性能。

  • 调试:当查询未产生预期结果或发生错误时,物理计划可以提供有关问题可能出在哪里的线索。开发人员可以查明计划中存在问题的阶段或转换。

  • 高效连接:了解广播连接等连接策略可以帮助开发人员就广播哪些表做出明智的决定。这可以显着减少 shuffle 并提高查询性能。

文章来源地址 https://www.toymoban.com/diary/problem/479.html

到此这篇关于使用解释计划调试 Apache Spark 性能的文章就介绍到这了, 更多相关内容可以在右上角搜索或继续浏览下面的相关文章,希望大家以后多多支持 TOY 模板网!

    正文完
     0
    Yojack
    版权声明:本篇文章由 Yojack 于1970-01-01发表,共计6324字。
    转载说明:
    1 本网站名称:优杰开发笔记
    2 本站永久网址:https://yojack.cn
    3 本网站的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,请联系站长进行删除处理。
    4 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
    5 本站所有内容均可转载及分享, 但请注明出处
    6 我们始终尊重原创作者的版权,所有文章在发布时,均尽可能注明出处与作者。
    7 站长邮箱:laylwenl@gmail.com
    评论(没有评论)