「www.0222.com」澳门新葡亰手机版-澳门新葡亰网址
做最好的网站

一文理解ApacheSparkDataSourceV2诞生背景及入门实

作者: 汽车  发布:2019-08-16

  Data Source API 定义如何从存储系统进行读写的相关 API 接口,比如 Hadoop 的 InputFormat/OutputFormat,Hive 的 Serde 等。这些 API 非常适合用在 Spark 中使用 RDD 编程的时候使用。使用这些 API 进行编程虽然能够解决我们的问题,但是对用户来说使用成本还是挺高的,而且 Spark 也不能对其进行优化。为了解决这些问题,Spark 1.3 版本开始引入了 Data Source API V1,通过这个 API 我们可以很方便的读取各种来源的数据,而且 Spark 使用 SQL 组件的一些优化引擎对数据源的读取进行优化,比如列裁剪、过滤下

  Data Source API V1 为我们抽象了一系列的接口,使用这些接口可以实现大部分的场景,这些接口如下(参见erfaces.scala 文件):

  但是随着 Spark 的不断发展,以及使用的用户越来越多,这个版本的 Data Source API 开始暴露出一些问题。

  我们可以看到高层次的 API 随着时间的推移而发展。较低层次的数据源 API 依赖于高层次的 API 不是一个好主意。

  当前数据源 API 仅支持 filter 下推和列修剪(参见上面的 PrunedFilteredScan 接口的 buildScan 方法)。如果我们想添加其他优化, 比如添加 limiy 优化,那么我们需要添加其他接口:

  从上面的 buildScan API 可以看出,Spark 数据源进支持以行式的形式读取数据。即使 Spark 内部引擎支持列式数据表示,它也不会暴露给数据源。但是我们知道使用列式数据进行分析会有很多性能提升,所以 Spark 完全没必要读取列式数据的时候把其转换成行式,然后再再 Spark 里面转换成列式进行分析。

  物理存储信息(例如,分区和排序)不会从数据源传递到 Spark 计算引擎,因此不会在 Spark 优化器中使用。这对于像 HBase/Cassandra 这些针对分区访问进行了优化的数据库来说并不友好。在 Data Source V1 API 中,当 Spark 从这些数据源读取数据时,它不会尝试将处理与分区相关联,这将导致性能不佳。

  当前的写接口非常通用。它的构建主要是为了支持在 HDFS 等系统中存储数据。但是像数据库这样更复杂的 Sink 需要更多地控制数据写入。例如,当数据部分写入数据库并且作业出现异常时,Spark 数据源接口将不会清理这些行。这个在 HDFS 写文件不存在这个问题,因为写 HDFS 文件时,如果写成功将生成一个名为 _SUCCESS 的文件,但是这种机制在数据库中是不存在的。在这种情况下,会导致数据库里面的数据出现不一致的状态。这种情况通常可以引入事务进行处理,但是 Data Source V1 版本不支持这个功能。

  越来越多的场景需要流式处理,但是 DataSource API V1 不支持这个功能,这导致想 Kafka 这样的数据源不得不调用一些专用的内部 API 或者独自实现。

  大家再仔细思路可以看出,DataSource V2 把每种优化都写到单独的一个接口里面,这样我们需要哪个优化就可以加哪个,这样就可以排列组合出很多种用法,这明显比 DataSource V1 版本的 PrunedFilteredScan 要灵活很多。假如我们需要将 limit 下推,我们只需要定义一个类似于 SupportsPushDownLimit 接口即可,非常的灵活。

  到这里,我们需要定义每个分区具体是如何读取的,这里就是真实的数据读取实现逻辑,比如本文例子的实现如下:

  具体分区读取是需要实现 InputPartitionReader 接口的,大家可以看到,这里面就是真正的 MySQL 查询 SQL 的拼接,以及我们平时参见的 MySQL 数据查询方法。仔细的同学可以看出拼接的 SQL 中 where 条件里面的就是我们的算子下推逻辑;而 select 部分就是我们的列裁剪部分。

  到这里,我们已经使用 DataSource V2 API 定义了一个读取 MySQL 的类库,我们可以像正常 Spark 类库一样使用这个类库,如下:

  这条 SQL 没有使用到 select,所以会使用到表中所有的列,并且以为我们已经支持大于等算子下推,所以 id 10 这个应该是会下推到 MySQL 端执行的,具体的执行计划如下:

  从上面的 Physical Plan 可以看出,count#2 = 10 这个并没有推到数据源执行,以为我们这个例子里面没有实现大于等于算子的下推。本例我们使用了 select,并且指定了 id、ip 列,再加上没有推到 MySQL 端的列,所以这次执行只需要获取 id、ip 以及 count 三列即可,最后拼接后的 SQL 如下:

本文由www.0222.com于2019-08-16日发布