QCon 演讲火热征集中,快来分享技术实践与洞见!222222 了解详情
写点什么

Redis,Apache Spark 和 Python 入门

  • 2020-03-01
  • 本文字数:4719 字

    阅读完需:约 15 分钟

Redis,Apache Spark 和 Python 入门

Apache Spark 是创建分布式数据处理管道最受欢迎的框架之一,在这篇博文中,我们将介绍在使用 Spark 时如何结合 Redis 用作计算的数据存储库。Spark 的主要功能是管道(Java,Scala,Python 或 R 脚本)可以在本地(用于开发)和集群上运行,而无需更改任何源代码。


Spark 通过巧妙地使用延迟计算或在某些情况下称为惰性来提供这种灵活性。一切都始于类 RDD,DataFrame 和更新的 Dataset,它们分别是数据的分布式惰性表示。他们使用分布式文件系统,数据库或其他类似服务作为实际的存储后端。他们的操作(例如 map / select,filter / where 和 reduce / groupBy)并没有真正地执行计算。每个操作都会在执行计划中添加一个步骤,该步骤最终会在需要实际结果时再(例如,尝试将其打印到屏幕上时)运行。


在本地启动脚本时,所有计算都在计算机上进行。或者,在分布式集群上启动时,您的数据将被分区到不同的节点上。(在大多数情况下)Spark 集群中并行执行相同的操作。

关于 RDD,DataFrame 和数据集

随着时间的推移,Spark 开发了三种不同的 API 来处理分布式数据集。尽管每个新添加的功能都比以前增加了更多功能,但是没有哪个 API 可以完全替代以前的功能。按创建顺序(从最早到最新),总体概述如下:


  • RDD 提供了用于将编译时类型安全操作应用于数据的底层方法。使用 RDD,您可以在代码中表达“事情如何发生”,而不是采用声明性方法。

  • DataFrame 引入了一种类 SQL 的方法来表达计算(它甚至支持实际的 SQL 查询)。它的声明性语法使 Spark 可以构建优化的查询计划,从而比 RDD 运行更快。

  • 数据集 是 DataFrame for Java Virtual Machine(JVM)语言的改进。它引入了 DataFrame 缺乏的编译时类型安全性,以及行的优化表示形式,从而大大减少了内存使用。由于动态语言(Python,R)具有动态特性,因此它实际上并没有任何作用,因此您仍然可以使用它们(同时在内部重新实现为数据集)使用 DataFrame。

  • 有关更多详细信息,请参阅 Jules Damji 的“ 三个 Apache Spark API 的故事 ”。

关于 spark-redis

spark-redis 是一个开放源代码连接器,可以让您使用 Redis 存储数据。


将 Redis 用作后端的三个主要原因是:


  • DataFrame / set 和 Redis 专用的 RDD: spark-redis 既实现了更通用的接口,又公开了 Redis 闻名的数据结构的 RDD。这意味着您可以非常轻松地在 Redis 上部署现有脚本,并在需要完全控制时使用 Redis 特定的功能。

  • Redis Cluster:连接器使用 Redis Cluster API,并充分利用分片数据库,包括重新分片和故障转移。将数据放在 Redis 集群中,可以极大地提高性能,因为您的管道会增加数据的多个使用者。

  • Redis Streams:Spark Streaming 非常适合新的 Redis Streams 数据结构。Redis Streams 还使用消费组,使您可以优雅地调整并行度。

  • 在本文中,我们将重点介绍 Python 入门以及如何使用 DataFrame API。在撰写本文时,可以被视为 Spark 的“本机”语言的 Scala 可以访问集成的一些更高级的功能,例如 Redis RDD 和 Streams。由于 Scala 是一种 JVM 语言,因此 Java 也可以使用这些功能。使用 Python,我们需要坚持使用 DataFrames。

配置

我们的第一步是使用 pip 安装 pyspark。您还将需要在计算机上安装 Java 8。


$ pip install pyspark
复制代码


接下来,我们将需要 Maven 构建 spark-redis。您可以从官方网站或使用软件包管理器(例如 macOS 上的自制软件)来获取它。


从 GitHub 下载 spark-redis(git clone 或以 zip 下载),并使用 Maven 进行构建。


$ cd spark-redis$ mvnclean package -DskipTests
复制代码


在里面 target/子目录中,您将找到已编译的 jar 文件。


需要一个正在运行的 Redis 服务器进行连接。您可以通过多种方式下载它:从官方网站上的软件包管理器(apt-get 或者 brew install redis)或 Docker Hub。


一旦启动并运行,就可以启动 pyspark。请注意,修改 VERSION 值成您需要从 GitHub 下载的版本。


$ pyspark –jars target /spark-redis-VERSION-jar-with-dependencies.jar
复制代码


如果您的 Redis 服务器运行在容器中或已启用身份验证,则将这些开关添加到上一次调用中(并更改值以适合您的情况)。


–conf“ spark.redis.host =localhost” –conf“ spark.redis.port = 6379” –conf“ spark.redis.auth = passwd”
复制代码

使用样本数据集运行

现在我们有了一个可以正常工作的 pyspark shell,可以将数据存储在 Redis 上,让我们来运行这个著名的 people 数据集。

开始

下载 TSV 文件后,让我们将其作为 Spark DataFrame 加载。


>>> full_df =  spark.read.csv("pantheon.tsv", sep="\t",  quote="", header=True, inferSchema=True)    >>> full_df.dtypes    [('en_curid', 'int'), ('name', 'string'), ('numlangs',  'int'), ('birthcity', 'string'), ('birthstate', 'string'), ('countryName',  'string'), ('countryCode', 'string'), ('countryCode3', 'string'), ('LAT',  'double'), ('LON', 'double'), ('continentName', 'string'), ('birthyear',  'string'), ('gender', 'string'), ('occupation', 'string'), ('industry',  'string'), ('domain', 'string'), ('TotalPageViews', 'int'), ('L_star',  'double'), ('StdDevPageViews', 'double'), ('PageViewsEnglish', 'int'),  ('PageViewsNonEnglish', 'int'), ('AverageViews', 'double'), ('HPI',  'double')]
复制代码


现在,调用.dtypes 显示数据集中所有列(和相对类型)的列表。在此数据集中,有许多事情可能值得研究,但出于本示例的目的,让我们着重于为每个国家/地区查找到名人最常见的职业。


让我们从仅保留与我们的目标相关的列开始。


>>> data =  full_df.select("en_curid", "countryCode",  "occupation")    >>> data.show(2)    +--------+-----------+-----------+    |en_curid|countryCode| occupation|+--------+-----------+-----------+    |     307|         US| POLITICIAN||     308|         GR|PHILOSOPHER|+--------+-----------+-----------+    only showing top 2 rows
复制代码


这将创建原始 DataFrame 的副本,该副本仅包含三列:每个人的唯一 ID,他们的国家和他们的职业。


我们首先下载了一个小的数据集,但在现实生活中,如果您使用的是 Spark,则该数据集可能会更大并且可以远程托管。出于这个原因,让我们通过将数据加载到 Redis 来使下一步变得更加现实:


>>> data.write.format("org.apache.spark.sql.redis").option("table","people").option("key.column", "en_curid").save()
复制代码


此命令会将我们的数据集加载到 Redis 中。我们现在将看到,我们指定的两个选项有助于定义 Redis 中的数据布局。

Redis 上的 DataFrames

让我们使用 redis-cli 查询下,看看 DataFrame 是如何存储在 Redis 上的:


$ redis-cli> SCAN 0 MATCH people:* COUNT 31) "2048"2) 1) "people:2113653"2)"people:44849"3)"people:399280"4)"people:101393"
复制代码


SCAN 查询出我们加载到 Redis 中的 Key 和数据。您可以立即看到我们之前给出的选项如何用于定义键名:


“table”, “people” 为表示此 DataFrame 的键定义一个公共前缀,并且


“ key.column”, “ en_curid” 为我们的 DataFrame 定义了主键。


让我们随机取一个 key 看一下具体的内容:


> HGETALL people:21136531) "countryCode"2) "DE"3) "occupation"4) "SOCCER PLAYER"
复制代码


现在,我们已经了解了数据如何存储在 Redis 上,让我们跳回到 pyspark,看看我们如何实际编写管道获取每个国家的名人最常见的职业。如您所见,DataFrame 的每一行都变成了 Redis 哈希结构,其中包含 countryCode 和 occupation。如前所述,en_curid 用作主键,因此它成为键名的一部分。


从 Redis DataFrame 执行计算


即使我们应该将数据仍然加载到内存中,也可以从 Redis 加载它,以便编写与您在现实生活中将要执行的操作更相似的代码。


>>> df = spark.read.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").load()>>> df.show(2)+--------+-----------+----------+|en_curid|countryCode|occupation|+--------+-----------+----------+|  915950|         ZW|   SWIMMER||  726159|         UY|POLITICIAN|+--------+-----------+----------+only showing top 2 rows
复制代码


这就是您的 Spark 管道开始的方式,因此让我们最后进行计算!


>>> counts = df.groupby("countryCode", "occupation").agg({"en_curid": "count"})>>> counts.show(2)+-----------+-------------+---------------+|countryCode|   occupation|count(en_curid)|+-----------+-------------+---------------+|         FR|MATHEMATICIAN|             34||         IT|SOCCER PLAYER|             81|+-----------+-------------+---------------+only showing top 2 rows
复制代码


现在,每一行代表所有当前(国家,职业)组合的计数。下一步,我们只需要选择每个国家/地区计数最高的职业。


首先,导入所需的一些新模块,然后使用 windows 定义代码以选择最常见的职业:


>>> from pyspark.sql.window import Window>>> from pyspark.sql.functions import count, col, row_number>>> w = Window().partitionBy("countryCode").orderBy(col("count(en_curid)").desc())>>> result = counts.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("countryCode", "occupation")>>> result.show(5)+-----------+-------------+|countryCode|   occupation|+-----------+-------------+|         DZ|   POLITICIAN||         LT|   POLITICIAN||         MM|   POLITICIAN||         CI|SOCCER PLAYER||         AZ|   POLITICIAN|+-----------+-------------+only showing top 5 rows
复制代码


此代码将原始行分组为 countryCode,按 count(en_curid)在降序排列每个组的内容,并且仅采用第一个元素。如您所见,在这个小样本中,政客似乎是最常见的职业。


让我们看看有多少个国家是这样:


>>> result.where(col("occupation") == "POLITICIAN").count()150
复制代码


哇,考虑到今天全球有 195 个国家,这真是很多。现在,让我们将其余国家/地区保存在 Redis 中:


>>>no_pol = result.where(col("occupation") != "POLITICIAN")>>> no_pol.write.format("org.apache.spark.sql.redis").option("table","occupation").option("key.column","countryCode").save()
复制代码


如果现在进入 redis-cli,您将能够看到新数据:


$ redis-cli> HGETALL occupation:IT1) "occupation"2) "RELIGIOUS FIGURE"> HGETALL occupation:US1) "occupation"2) "ACTOR"
复制代码


如果您想练习更多,请检查原始数据集,看看是否发现其他引起您兴趣的细节。

关于 Spark 数据类型和 Redis 集群 API 的补充

值得重申的非常重要的一点是,RDD 或 DataFrame / set 对象上的每个操作都将分布在多个节点上。如果我们的例子不仅仅涉及名人,那么一开始我们将有数千万行。在这种情况下,Spark 将扩展计算。但是,如果只有一个 Redis 实例,则将有 N 个节点在其上运行,很可能会成为网络带宽的瓶颈。


为了充分利用 Redis,您需要使用 Redis Cluster API 适当地扩展它。这将确保您的所有计算节点在读取时不会得不到数据,在写入时不会阻塞。

结论

在本文中,我们探讨了如何下载,编译和部署 spark-redis,以便将 Redis 用作 Spark DataFrames 的后端。Redis 提供了对 DataFrame API 的全面支持,因此移植现有脚本并开始享受 Redis 提供的更高速度应该非常容易。如果您想了解更多信息,请查看 GitHub 上关于 spark-redis 的文档。


本文转载自 中间件小哥 公众号。


原文链接:https://mp.weixin.qq.com/s/GOu3EQyqfG3vRqy9mdHKyw


2020-03-01 21:431297

评论

发布
暂无评论
发现更多内容

Spark 源码阅读 02:从 Spark-Submit 到 Driver 启动

程序员赤小豆

spark 技术 Spark 源码

Vue进阶(八十四):Computed 和 Watch 使用与区别

No Silver Bullet

Vue 8月日更

在线JSON转Schema工具

入门小站

百度地图开发-在地图上实现路线导航 09

Andy阿辉

android Android 小菜鸟 Android端 8月日更

java毕设开发经典选题

清风

毕业设计

如何找到靠谱的工长?

escray

生活记录 8月日更 装修记

Rust从0到1-高级特性-类型进阶

rust 高级特性 类型 Types

详解可观测性监控系统中的“金三角”

尔达Erda

开源 微服务 运维 云原生 APM

索引下推,这个点你肯定不知道!

艾小仙

MySQL MySQL 高可用

Lua 入门到精通( 01 Lua 简介以及软件安装)《做一个脚本高手》

陈皮的JavaLib

lua Linux 运维 脚本语言 8月日更

html创建表格有那些小技巧,表单中真的有这么多功能吗

你好bk

html html5 大前端 html/css JavaScrip

你知道 JavaScript 中的 Arguments 对象都有哪些用途吗?

编程三昧

JavaScript 大前端 函数 8月日更 Arguments

微信业务架构&学生系统管理系统设计

Geek_dae

架构实战营

情窦初开,原来喜欢这么可爱的

4ye

Python 后端 8月日更 词云

技术调研,IDEA 插件怎么开发「脚手架、低代码可视化编排、接口生成测试」?

小傅哥

Java 小傅哥 低代码 IDEA 脚手架

面向多场景而设计的 Erda Pipeline

尔达Erda

开源 微服务 云原生 企业数字化转型 Go 语言

架构训练营 模块六

小卷儿

模块六作业 - 电商系统微服务

babos

#架构实战营

架构实战营第一期--模块六作业

clay

架构实战营

架构实战营模块 6 作业

蔸蔸

拆分电商系统为微服务

木云先森

架构实战营

云原生时代的 APM

尔达Erda

微服务 运维 云原生 APM 应用性能管理

苏宁精准测试方案探索和实践

薛飞

精准测试

用户体验再升级!Erda 1.2 版本正式发布

尔达Erda

云计算 开源 开发者 云原生 Go 语言

模块六作业

Mr.He

架构实战营

Spark 源码阅读 01:环境搭建

程序员赤小豆

spark 技术 Spark 源码

从头配置阿里云服务器

阿Q说代码

SSL证书 8月日更 阿里云服务器 域名备案

架构训练营模块六作业

喻高咏        

架构实战营

Linux之fgrep命令

入门小站

Linux

【架构训练营】模块六作业

zclau

电商系统微服务拆分-模块6

小牧ah

架构实战营

Redis,Apache Spark 和 Python 入门_行业深度_翻译自redis.io_InfoQ精选文章