Spark读取文本文件并转换为DataFrame | QIMING.INFO

Spark读取文本文件并转换为DataFrame

Spark ML里的核心API已经由基于RDD换成了基于DataFrame,为了使读取到的值成为DataFrame类型,我们可以直接使用读取CSV的方式来读取文本文件,可问题来了,当文本文件中每一行的各个数据被不定数目的空格所隔开时,我们无法将这些不定数目的空格当作CSV文件的分隔符(因为Spark读取CSV文件时,不支持正则表达式作为分隔符),一个常用方法是先将数据读取为rdd,然后用map方法构建元组,再用toDF方法转为DataFrame,但是如果列数很多的话,构建元组会很麻烦。本文将介绍spark读取多列txt文件后转成DataFrame以供一些数据源使用的三种方法。

1 数据说明

使用Synthetic Control Chart Time Series数据synthetic_control.data,数据包括600个数据点(行),每个数据点有60个属性,详细信息见:
http://archive.ics.uci.edu/ml/databases/synthetic_control/

如图,每个数据点的不同属性用不定数量的空格隔开,为了解决这个问题,本文将介绍两种方法(现已更新为三种方法)。
18.08.17更新!今天发现了一个新的方法,比原来的第二种方法还简单了许多,请读者在上策中查看。

2 下策

2.1 基本思想

本方法非常繁琐且效率较低,是我在没看到第二种方法时自己想的,本方法的思想是:

  1. 直接读取数据,保存成一个String类型的RDD
  2. 将此RDD中每一行中的不定数量的空格用正则表达式匹配选出后替换成“,”
  3. 将处理过后的RDD保存到一个临时目录中
  4. 以CSV方式读取此临时目录中的数据,便可将读到的数据直接存成一个多列的DataFrame
  5. 最后将此DataFrame的数据类型转为Double

2.2 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

def readData(spark: SparkSession, path: String): DataFrame = {

// 读取数据并将其中的分隔符(不定个数的空格)都转为“,”
val tmpRdd = spark.sparkContext.textFile(path).map(_.replaceAll("\\s+", ","))

// 将转换过的数据保存到一个临时目录中
val tmpPathStr = "file:///home/xuqm/ML_Data/input/tmp"
// 判断此临时目录是否存在,若存在则删除
val tmpPath: Path = new Path(tmpPathStr)
val fs: FileSystem = tmpPath.getFileSystem(new Configuration())
if (fs.exists(tmpPath)) {
fs.delete(tmpPath, true)
}
// 保存
tmpRdd.saveAsTextFile(tmpPathStr)

// 从此临时目录中以CSV方式读取数据
val df = spark.read.csv(tmpPathStr)
// 将读取到的数据中的每一列都转为Double类型
val cols = df.columns.map(f => col(f).cast(DoubleType))
val data = df.select(cols: _*)

data
}

3 中策

3.1 代码及说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// 读取数据 暂存为RDD
val rdd = sc.textFile("file:///home/xuqm/ML_Data/input/synthetic_control.data")
// 从第一行数据中获取最后转成的DataFrame应该有多少列 并给每一列命名
val colsLength = rdd.first.split("\\s+").length
val colNames = new Array[String](colsLength)
for (i <- 0 until colsLength) {
colNames(i) = "col" + (i + 1)
}
// 将RDD动态转为DataFrame
// 设置DataFrame的结构
val schema = StructType(colNames.map(fieldName => StructField(fieldName, DoubleType)))
// 对每一行的数据进行处理
val rowRDD = rdd.map(_.split("\\s+").map(_.toDouble)).map(p => Row(p: _*))
// 将数据和结构合成,创建为DataFrame
val data = spark.createDataFrame(rowRDD, schema)

3.2 结果展示

1
2
3
4
5
6
7
8
9
10
scala> val data = spark.createDataFrame(rowRDD, schema)
data: org.apache.spark.sql.DataFrame = [col1: double, col2: double ... 58 more fields]
scala> data.show(2)
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| col1| col2| col3| col4| col5| col6| col7| col8| col9| col10| col11| col12| col13| col14| col15| col16| col17| col18| col19| col20| col21| col22| col23| col24| col25| col26| col27| col28| col29| col30| col31| col32| col33| col34| col35| col36| col37| col38| col39| col40| col41| col42| col43| col44| col45| col46| col47| col48| col49| col50| col51| col52| col53| col54| col55| col56| col57| col58| col59| col60|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|28.7812|34.4632|31.3381|31.2834|28.9207|33.7596|25.3969|27.7849|35.2479|27.1159|32.8717|29.2171|36.0253|32.337|34.5249|32.8717|34.1173|26.5235|27.6623|26.3693|25.7744| 29.27|30.7326|29.5054|33.0292| 25.04|28.9167|24.3437|26.1203|34.9424|25.0293|26.6311|35.6541|28.4353|29.1495|28.1584|26.1927|33.3182|30.9772|27.0443|35.5344|26.2353|28.9964|32.0036|31.0558|34.2553|28.0721|28.9402|35.4973| 29.747|31.4333|24.5556|33.7431|25.0466|34.9318|34.9879|32.4721|33.3759|25.4652|25.8717|
|24.8923| 25.741|27.5532|32.8217|27.8789|31.5926|31.4861|35.5469|27.9516|31.6595|27.5415|31.1887|27.4867|31.391| 27.811| 24.488|27.5918|35.6273|35.4102|31.4167|30.7447|24.1311|35.1422|30.4719|31.9874|33.6615|25.5511|30.4686|33.6472|25.0701|34.0765|32.5981|28.3038|26.1471|26.9414|31.5203|33.1089|24.1491|28.5157|25.7906|35.9519|26.5301|24.8578|25.9562|32.8357|28.5322|26.3458|30.6213|28.9861|29.4047|32.5577|31.0205|26.6418|28.4331|33.6564|26.4244|28.4661|34.2484|32.1005| 26.691|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
only showing top 2 rows

4 上策

4.1 基本思想

  1. 读取原始文件,用正则表达式分割每个样本点的属性值,保存成Array[String]类型的RDD
  2. 利用Spark ML库中的LabeledPoint类将数据转换成LabeledPoint类型的RDD。
    LabeledPoint类型包含label列和features列,label列即标签列,是Double类型的,因为本次数据未经训练还没有标签,所以可随意给定一个数字;features列即特征向量列,是向量类型的,本次数据均为特征点,所以用Vectors类全部转换为向量类型。
  3. 将LabeledPoint类型的RDD转换为DataFrame并只选择其features列,得到一个新的DataFrame,然后就可以在此df上进行一些机器学习算法(如:KMeans)了。

4.2 代码

1
2
3
4
5
6
7
8
9
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors

// 读取数据并分割每个样本点的属性值 形成一个Array[String]类型的RDD
val rdd = sc.textFile("file:///home/xuqm/ML_Data/input/synthetic_control.data").map(_.split("\\s+"))
// 将rdd转换成LabeledPoint类型的RDD
val LabeledPointRdd = rdd.map(x=>LabeledPoint(0,Vectors.dense(x.map(_.toDouble))))
// 转成DataFrame并只取"features"列
val data = spark.createDataFrame(LabeledPointRdd).select("features")

4.3 结果展示

1
2
3
4
5
6
7
8
9
10
11
scala> val data = spark.createDataFrame(LabeledPointRdd).select("features")
data: org.apache.spark.sql.DataFrame = [features: vector]

scala> data.show(2,false)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[28.7812,34.4632,31.3381,31.2834,28.9207,33.7596,25.3969,27.7849,35.2479,27.1159,32.8717,29.2171,36.0253,32.337,34.5249,32.8717,34.1173,26.5235,27.6623,26.3693,25.7744,29.27,30.7326,29.5054,33.0292,25.04,28.9167,24.3437,26.1203,34.9424,25.0293,26.6311,35.6541,28.4353,29.1495,28.1584,26.1927,33.3182,30.9772,27.0443,35.5344,26.2353,28.9964,32.0036,31.0558,34.2553,28.0721,28.9402,35.4973,29.747,31.4333,24.5556,33.7431,25.0466,34.9318,34.9879,32.4721,33.3759,25.4652,25.8717] |
|[24.8923,25.741,27.5532,32.8217,27.8789,31.5926,31.4861,35.5469,27.9516,31.6595,27.5415,31.1887,27.4867,31.391,27.811,24.488,27.5918,35.6273,35.4102,31.4167,30.7447,24.1311,35.1422,30.4719,31.9874,33.6615,25.5511,30.4686,33.6472,25.0701,34.0765,32.5981,28.3038,26.1471,26.9414,31.5203,33.1089,24.1491,28.5157,25.7906,35.9519,26.5301,24.8578,25.9562,32.8357,28.5322,26.3458,30.6213,28.9861,29.4047,32.5577,31.0205,26.6418,28.4331,33.6564,26.4244,28.4661,34.2484,32.1005,26.691]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 2 rows

5 参考资料

[1]董克伦.spark 将DataFrame所有的列类型改为double[OL].2018-04-27/2018-08-08
[2]董克伦.旧版spark(1.6版本) 将rdd动态转为dataframe[OL].2018-05-11/2018-08-08
[3]吴茂贵.深度实践Spark机器学习[M].北京:机械工业出版社.2018:104-106

-----本文结束感谢您的阅读-----
如我有幸帮到了您,那么,不妨~~~谢谢!
0%