Spark & PySpark 使用手册

栏目: 编程工具 · 发布时间: 4年前

内容简介:最近公司有一个安全方面的业务,需要实时监控所有访客的情况。之前是定时去查询这类方法仅仅是定义逻辑,并不会立即执行,即lazy特性。目的是将一个RDD转为新的RDD。不会产生新的RDD,而是直接运行,得到我们想要的结果。

最近公司有一个安全方面的业务,需要实时监控所有访客的情况。之前是定时去查询 Elasticsearch 的接口进行统计分析,但是这个时间间隔不好把握,并且 Elasticsearch 并不适合特别实时的查询操作。实时的分布式流计算引擎首推 Spark ,它与 Hadoop 等相比的优势在 这里 讲得比较清楚了。

try...catch...
pip install pyspark

基本运算

  • 下面是所有运算方法的集合,其中有些方法仅用于键值对,有些方法仅用于数据流

Transformation(转换)

这类方法仅仅是定义逻辑,并不会立即执行,即lazy特性。目的是将一个RDD转为新的RDD。

datas.map(lambda a, (a, 1))

Action(执行)

不会产生新的RDD,而是直接运行,得到我们想要的结果。

  • collect(): 以数组的形式,返回数据集中所有的元素
  • count(): 返回数据集中元素的个数
  • take(n): 返回数据集的前N个元素
  • takeOrdered(n): 升序排列,取出前N个元素
  • takeOrdered(n, lambda x: -x): 降序排列,取出前N个元素
  • first(): 返回数据集的第一个元素
  • min(): 取出最小值
  • max(): 取出最大值
  • stdev(): 计算标准差
  • sum(): 求和
  • mean(): 平均值
  • countByKey(): 统计各个key值对应的数据的条数
  • lookup(key): 根据传入的key值来查找对应的Value值
  • foreach(func): 对集合中每个元素应用func

Persistence(持久化)

  • persist(): 将数据按默认的方式进行持久化
  • unpersist(): 取消持久化
  • saveAsTextFile(path): 将数据集保存至文件

应用场景

创建简单的RDD

from pyspark.sql import SparkConf, SparkContext
rdd = sc.parallelize(['abc', def'])	// 直接创建rdd

读取CSV文件

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("test") \
    .config("spark.some.config.option", "一些设置") \
    .getOrCreate()

df = spark.read.csv("/home/Users/haofly/test.csv", header=True, sep="|")	# 读取文件
print(df.collect())

Spark Streaming流计算

Spark Streaming
StreamingContext

从文件流读取数据

conf = SparkConf().setAppName("test").setMaster("local[2]")	# 表示运行在本地模式,并且启动2个工作线程
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30)	# 每隔10秒钟自动进行一次流计算

lines = ssc.textFileStream('file:///Users/haofly/log')
words = lines.map(lambda line: line.strip())
words.pprint()
ssc.start()
ssc.awaitTermination()

从kafka读取数据

首先得从 maven仓库 下载对应的版本,注意这里需要下载 assembly 的包,这里的2.11是scala的版本,2.4.3是pyspark的版本号,也是spark的版号,如果下载后的包不能用,那就尝试换一个版本吧。可以通过这篇文章搭建测试用的kafka集群

# 指定spark-streaming-kafka的jar包
os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = "--jars /test/jars/kafka/spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar pyspark-shell"

conf = SparkConf().setAppName("test").setMaster("local[2]")	# 表示运行在本地模式,并且启动2个工作线程
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)	# 每隔10秒钟自动进行一次流计算

zkQuorum, topic = "zookeeper:2181", "test"
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])

def myadd(a, b):	# 只能在传入的函数中捕获异常
  try:
	  return a+b
  catch:
    pass	# tolog

def myadd_inv(a, b):
  return a-b

rdd = lines.map(lambda x: (x, 1)).reduceByKeyAndWIndow(myadd, myadd_inv, 60)	# 统计时间窗口60秒内的数据
rdd.pprint()	# 每次统计都打印rdd的数据

ssc.start()		# 异步执行
ssc.awaitTermination()	# 等待终止信号

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

算法设计

算法设计

Jon Kleinberg、Éva Tardos / 张立昂、屈婉玲 / 清华大学出版社 / 2007-3-1 / 75.00元

算法设计,ISBN:9787302143352,作者:(美)克林伯格(Kleinberg,J.),()塔多斯(Tardos,E.) 著,张立昂,屈婉玲 译一起来看看 《算法设计》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器