本文基于 Apache Livy-0.8.0-incubating-SNAPSHOT,Apache Spark-3.3.1,Apache Hadoop-3.3.4
读者需要自行编译相关版本

Apache Livy

livy 的默认端口为 8998

Apache Livy 是一项服务,可通过 REST 接口与 Spark 集群轻松交互。 它支持通过简单的 REST 接口或 RPC 客户端库轻松提交 Spark 作业或 Spark 代码片段、同步或异步结果检索以及 Spark 上下文管理。Apache Livy 还简化了 Spark 与应用程序服务器之间的交互,从而使 Spark 能够用于交互式 Web/移动应用程序。

livy 的相关配置使用还请读者自行阅读。

Apache Spark

Apache Spark 是一种多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。

spark sql 的相关使用配置还请读者自行阅读。

livy 结果返回格式说明

所有操作均可通过接口测试工具 PostMan 进行

我们主要会想要获取每个 statement 的执行结果,statement 的完整结构体可以在 Statement 查看,所涉及的接口为 GET /sessions/{sessionId}/statements/{statementId}

statement 的结构体中有 output(Statement Output) 子结构体,output 中有 data 子结构体,该结构体返回默认协议为 text/plain,此结构体还有两个可选的协议,分别为 application/vnd.livy.table.v1+jsonapplication/json

text/plain

该协议下是将 Spark Shell 返回的结果直接返回,具有较差的可读性,无法进行程序解析,示例如下(本例使用了 spark 读取 excel 的组件):

请求参数为:

{
"code":"import org.apache.spark.sql.types.{StructType,StructField,StringType,IntergerType}"
}
{
"code":"val schema = StructType(List(tructField(\"id\", StringType, nullable = false),StructField(\"name\", StringType, nullable = false),StructField(\"age\", IntegerType, nullable = false),StructField(\"gender\", StringType, nullable = false),StructField(\"cls\", StringType, nullable = false)))"
}
{
"code":"val excelDF = spark.read.format(\"com.crealytics.spark.excel\").option(\"dataAddress\", \"'学生信息'!A2:E6\").option(\"useHeader\", \"false\") .schema(schema).load(filePath)"
}
{
"code":"excelDF.show"
}

其中 filepath 是实现上传到 hdfs 的 excel 文件。excel 文件内容如下:

编号 姓名 年龄 性别 班级
1 a 12 202101
2 b 12 202101

返回值为:

{
"id": 3,
"code": "excelDF.show",
"state": "available",
"output": {
"status": "ok",
"execution_count": 3,
"data": {
"text/plain": "+----+----+---+------+------+\n| id|name|age|gender| cls|\n+----+----+---+------+------+\n|??|??| 0| ??| ??|\n| 1| a| 12| ?|202101|\n| 2| b| 13| ?|202101|\n| 3| c| 14| ?|202101|\n| 4| d| 15| ?|202101|\n| 5| e| 16| ?|202101|\n+----+----+---+------+------+\n\n"
}
},
"progress": 1.0,
"started": 1679553013036,
"completed": 1679553052620
}

application/json

该协议下 livy 会将 Spark Shell 返回的结果转换为 json 返回,为程序解析提供可能。但需要注意的是此协议并非适用所有的场景,该协议只适用于返回的是 ArrayList 数据结构,否则会报无法转换为 json 数据结构的错误,此处为官方示例。我的示例如下(本例使用了 spark 读取 excel 的组件):

请求参数:

{
"code":"import org.apache.spark.sql.types.{StructType,StructField,StringType,IntergerType}"
}
{
"code":"val schema = StructType(List(tructField(\"id\", StringType, nullable = false),StructField(\"name\", StringType, nullable = false),StructField(\"age\", IntegerType, nullable = false),StructField(\"gender\", StringType, nullable = false),StructField(\"cls\", StringType, nullable = false)))"
}
{
"code":"val excelDF = spark.read.format(\"com.crealytics.spark.excel\").option(\"dataAddress\", \"'学生信息'!A2:E6\").option(\"useHeader\", \"false\") .schema(schema).load(filePath)"
}
{
"code":"val result = excelDF.collect\n%json result"
}

其中 filepath 是实现上传到 hdfs 的 excel 文件。excel 文件内容如下:

编号 姓名 年龄 性别 班级
1 a 12 202101
2 b 12 202101

返回值为(内容有删减):

{
"id": 17,
"code": "val result = excelDF.collect\n%json a",
"state": "available",
"output": {
"status": "ok",
"execution_count": 17,
"data": {
"application/json": [
{
"schema": [
{
"name": "id",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
},
{
"name": "name",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
},
{
"name": "age",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
},
{
"name": "gender",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
},
{
"name": "cls",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
}
],
"values": [
"编号",
"姓名",
0,
"性别",
"班级"
]
},
{
"schema": [
{
"name": "id",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
},
{
"name": "name",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
},
{
"name": "age",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
},
{
"name": "gender",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
},
{
"name": "cls",
"dataType": {},
"nullable": false,
"metadata": {
"map": {}
}
}
],
"values": [
"1",
"a",
12,
"男",
"202101"
]
}
]
}
},
"progress": 1.0,
"started": 1675671786024,
"completed": 1675671786321
}

application/vnd.livy.table.v1+json

此协议使用条件较为苛刻,返回值的类型需要为 List,且数据格式需要一致,如果不符合数据结构的数据被接收到则会报 table rows have different types 错。此处为官方示例。我的示例如下:

输入参数为:

{
"code":"val a = List((1,\"a\",0.12),(3,\"b\",0.63))\n%table a"
}

返回值为:

{
"id": 7,
"code": "val a = List((1,\"a\",0.12),(3,\"b\",0.63))\n%table a",
"state": "available",
"output": {
"status": "ok",
"execution_count": 7,
"data": {
"application/vnd.livy.table.v1+json": {
"headers": [
{
"type": "BIGINT_TYPE",
"name": "_1"
},
{
"type": "STRING_TYPE",
"name": "_2"
},
{
"type": "DOUBLE_TYPE",
"name": "_3"
}
],
"data": [
[
1,
"a",
0.12
],
[
3,
"b",
0.63
]
]
}
}
},
"progress": 1.0,
"started": 1679553786494,
"completed": 1679553789663
}

总结

如果想 livy 把 SparkSQL 的结果按 json 结果返回只需要在相应的 spark.sql().collectdf.collect 语句后面追加上 \n%json <变量名> 即可。

注意:任何赋值、无返回值的语句、返回值不符合规范的语句使用 \n%json <变量名> 将会报 Failed to convert value into JSON value 错。