spark 创建 dataframe

栏目: Scala · 发布时间: 5年前

内容简介:dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。4,数据的追加

dataframe类似于关系型数据库的表,从dataframe中查询数据,需要调用api来实现,到目前为止spark支持的语言scala,java,r,python。

一,启动spark-shell

spark-shell --master yarn

二,创建SparkSession

scala> import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession

scala> val spark = SparkSession.builder().appName("Spark SQL basic example").enableHiveSupport().getOrCreate()
2019-01-04 11:44:21 WARN SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2565b097

三,toDF方法创建dataframe

1,将seq序列转换成dataframe

scala> import spark.implicits._
import spark.implicits._

scala> val df = Seq(
 | (1, "tank", 25),
 | (2, "zhang", 26)
 | ).toDF("id", "name", "age")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1| tank| 25|
| 2|zhang| 26|
+---+-----+---+

2,列表转换成dataframe

scala> val array = List((1, "tank1", 25),(2, "tank2", 26),(3, "tank3", 29))
array: List[(Int, String, Int)] = List((1,tank1,25), (2,tank2,26), (3,tank3,29))

scala> val df1 = array.toDF("id", "name", "age")
df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df1.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|tank1| 25|
| 2|tank2| 26|
| 3|tank3| 29|
+---+-----+---+

四,通过createDataFrame创建dataframe

scala> val schema = StructType(List(
 | StructField("id", IntegerType, nullable = false),
 | StructField("name", StringType, nullable = true),
 | StructField("age", IntegerType, nullable = true),
 | StructField("birthday", DateType, nullable = true)
 | ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(birthday,DateType,true))

scala> val rdd = spark.sparkContext.parallelize(Seq(
 | Row(1, "tank1", 25, java.sql.Date.valueOf("1982-07-07")),
 | Row(2, "zhang", 26, java.sql.Date.valueOf("1983-02-19"))
 | ))
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[107] at parallelize at <console>:39

scala> val df2 = spark.createDataFrame(rdd, schema)
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> df2.show()
+---+-----+---+----------+
| id| name|age| birthday|
+---+-----+---+----------+
| 1|tank1| 25|1982-07-06|
| 2|zhang| 26|1983-02-18|
+---+-----+---+----------+

创建的dataframe,以及里面的数据,是没有落地的

五,存储数据到hdfs

1,加载存储包

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

2,不分区存储

scala> df.write.parquet("hdfs://bigserver1:9000/test/spark/tank");

scala> var test = spark.read.load("hdfs://bigserver1:9000/test/spark/tank");
test: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> test.show();
+---+-----+---+
| id| name|age|
+---+-----+---+
| 2|zhang| 26|
| 1| tank| 25|
+---+-----+---+
spark 创建 dataframe

spark dataframe 不分区

3,根据id分区存储

scala> df2.write.partitionBy("id").mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1");

scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1");
test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> test1.show();
+-----+---+----------+---+
| name|age| birthday| id|
+-----+---+----------+---+
|zhang| 26|1983-02-18| 2|
|tank1| 25|1982-07-06| 1|
+-----+---+----------+---+
spark 创建 dataframe

spark dataframe 分区存储

4,数据的追加

scala> df1.write.mode(SaveMode.Append).parquet("hdfs://bigserver1:9000/test/spark/tank");

scala> var test2 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank");
test2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> test2.show();
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|tank1| 25|
| 2|zhang| 26|
| 1| tank| 25|
| 2|tank2| 26|
| 3|tank3| 29|
+---+-----+---+

5,数据的修改,分区的dataframe才有效

scala> test1.show();
+-----+---+----------+---+
| name|age| birthday| id|
+-----+---+----------+---+
|zhang| 26|1983-02-18| 2|
|tank1| 25|1982-07-06| 1|
+-----+---+----------+---+

scala> val df4 = Seq(
 | (2, "zhangying", 37, java.sql.Date.valueOf("1986-11-11"))
 | ).toDF("id", "name", "age","birthday")
df4: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> df4.write.mode(SaveMode.Overwrite).parquet("hdfs://bigserver1:9000/test/spark/tank1/id=2");

scala> var test1 = spark.read.load("hdfs://bigserver1:9000/test/spark/tank1")
test1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> test1.show();
+---------+---+----------+---+
| name|age| birthday| id|
+---------+---+----------+---+
|zhangying| 37|1986-11-11| 2|
| tank1| 25|1982-07-06| 1|
+---------+---+----------+---+

对数据处理操作非常的多,这里只是简单介绍。这种数据存取的方式,实时性差


以上所述就是小编给大家介绍的《spark 创建 dataframe》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

再启动

再启动

[日] 大前研一 / 田龙姬、金枫 / 中华工商联合出版社有限责任公司 / 2010-1 / 29.00元

1、“全球管理大师”、“日本战略之父”大前研一,职场励志最新巨作。 2、2010年1月中华工商联合出版社有限责任公司与日知公司继《货币战争2》《中国大趋势》之后,再度联手,重磅推出。 3、震撼中国职场的宗师级巨作,势必引领2010年中国职场4、世界著名出版商小学馆授予独家中文简体出版权。 5、试问,哪个老板不希望自己的员工不断实现自身的“再启动”呢? 6、只有不断激励鞭策自......一起来看看 《再启动》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具