最新发布《数智时代的AI人才粮仓模型解读白皮书(2024版)》,立即领取! 了解详情
写点什么

Redis,Apache Spark 和 Python 入门

  • 2020-03-01
  • 本文字数:4719 字

    阅读完需:约 15 分钟

Redis,Apache Spark 和 Python 入门

Apache Spark 是创建分布式数据处理管道最受欢迎的框架之一,在这篇博文中,我们将介绍在使用 Spark 时如何结合 Redis 用作计算的数据存储库。Spark 的主要功能是管道(Java,Scala,Python 或 R 脚本)可以在本地(用于开发)和集群上运行,而无需更改任何源代码。


Spark 通过巧妙地使用延迟计算或在某些情况下称为惰性来提供这种灵活性。一切都始于类 RDD,DataFrame 和更新的 Dataset,它们分别是数据的分布式惰性表示。他们使用分布式文件系统,数据库或其他类似服务作为实际的存储后端。他们的操作(例如 map / select,filter / where 和 reduce / groupBy)并没有真正地执行计算。每个操作都会在执行计划中添加一个步骤,该步骤最终会在需要实际结果时再(例如,尝试将其打印到屏幕上时)运行。


在本地启动脚本时,所有计算都在计算机上进行。或者,在分布式集群上启动时,您的数据将被分区到不同的节点上。(在大多数情况下)Spark 集群中并行执行相同的操作。

关于 RDD,DataFrame 和数据集

随着时间的推移,Spark 开发了三种不同的 API 来处理分布式数据集。尽管每个新添加的功能都比以前增加了更多功能,但是没有哪个 API 可以完全替代以前的功能。按创建顺序(从最早到最新),总体概述如下:


  • RDD 提供了用于将编译时类型安全操作应用于数据的底层方法。使用 RDD,您可以在代码中表达“事情如何发生”,而不是采用声明性方法。

  • DataFrame 引入了一种类 SQL 的方法来表达计算(它甚至支持实际的 SQL 查询)。它的声明性语法使 Spark 可以构建优化的查询计划,从而比 RDD 运行更快。

  • 数据集 是 DataFrame for Java Virtual Machine(JVM)语言的改进。它引入了 DataFrame 缺乏的编译时类型安全性,以及行的优化表示形式,从而大大减少了内存使用。由于动态语言(Python,R)具有动态特性,因此它实际上并没有任何作用,因此您仍然可以使用它们(同时在内部重新实现为数据集)使用 DataFrame。

  • 有关更多详细信息,请参阅 Jules Damji 的“ 三个 Apache Spark API 的故事 ”。

关于 spark-redis

spark-redis 是一个开放源代码连接器,可以让您使用 Redis 存储数据。


将 Redis 用作后端的三个主要原因是:


  • DataFrame / set 和 Redis 专用的 RDD: spark-redis 既实现了更通用的接口,又公开了 Redis 闻名的数据结构的 RDD。这意味着您可以非常轻松地在 Redis 上部署现有脚本,并在需要完全控制时使用 Redis 特定的功能。

  • Redis Cluster:连接器使用 Redis Cluster API,并充分利用分片数据库,包括重新分片和故障转移。将数据放在 Redis 集群中,可以极大地提高性能,因为您的管道会增加数据的多个使用者。

  • Redis Streams:Spark Streaming 非常适合新的 Redis Streams 数据结构。Redis Streams 还使用消费组,使您可以优雅地调整并行度。

  • 在本文中,我们将重点介绍 Python 入门以及如何使用 DataFrame API。在撰写本文时,可以被视为 Spark 的“本机”语言的 Scala 可以访问集成的一些更高级的功能,例如 Redis RDD 和 Streams。由于 Scala 是一种 JVM 语言,因此 Java 也可以使用这些功能。使用 Python,我们需要坚持使用 DataFrames。

配置

我们的第一步是使用 pip 安装 pyspark。您还将需要在计算机上安装 Java 8。


$ pip install pyspark
复制代码


接下来,我们将需要 Maven 构建 spark-redis。您可以从官方网站或使用软件包管理器(例如 macOS 上的自制软件)来获取它。


从 GitHub 下载 spark-redis(git clone 或以 zip 下载),并使用 Maven 进行构建。


$ cd spark-redis$ mvnclean package -DskipTests
复制代码


在里面 target/子目录中,您将找到已编译的 jar 文件。


需要一个正在运行的 Redis 服务器进行连接。您可以通过多种方式下载它:从官方网站上的软件包管理器(apt-get 或者 brew install redis)或 Docker Hub。


一旦启动并运行,就可以启动 pyspark。请注意,修改 VERSION 值成您需要从 GitHub 下载的版本。


$ pyspark –jars target /spark-redis-VERSION-jar-with-dependencies.jar
复制代码


如果您的 Redis 服务器运行在容器中或已启用身份验证,则将这些开关添加到上一次调用中(并更改值以适合您的情况)。


–conf“ spark.redis.host =localhost” –conf“ spark.redis.port = 6379” –conf“ spark.redis.auth = passwd”
复制代码

使用样本数据集运行

现在我们有了一个可以正常工作的 pyspark shell,可以将数据存储在 Redis 上,让我们来运行这个著名的 people 数据集。

开始

下载 TSV 文件后,让我们将其作为 Spark DataFrame 加载。


>>> full_df =  spark.read.csv("pantheon.tsv", sep="\t",  quote="", header=True, inferSchema=True)    >>> full_df.dtypes    [('en_curid', 'int'), ('name', 'string'), ('numlangs',  'int'), ('birthcity', 'string'), ('birthstate', 'string'), ('countryName',  'string'), ('countryCode', 'string'), ('countryCode3', 'string'), ('LAT',  'double'), ('LON', 'double'), ('continentName', 'string'), ('birthyear',  'string'), ('gender', 'string'), ('occupation', 'string'), ('industry',  'string'), ('domain', 'string'), ('TotalPageViews', 'int'), ('L_star',  'double'), ('StdDevPageViews', 'double'), ('PageViewsEnglish', 'int'),  ('PageViewsNonEnglish', 'int'), ('AverageViews', 'double'), ('HPI',  'double')]
复制代码


现在,调用.dtypes 显示数据集中所有列(和相对类型)的列表。在此数据集中,有许多事情可能值得研究,但出于本示例的目的,让我们着重于为每个国家/地区查找到名人最常见的职业。


让我们从仅保留与我们的目标相关的列开始。


>>> data =  full_df.select("en_curid", "countryCode",  "occupation")    >>> data.show(2)    +--------+-----------+-----------+    |en_curid|countryCode| occupation|+--------+-----------+-----------+    |     307|         US| POLITICIAN||     308|         GR|PHILOSOPHER|+--------+-----------+-----------+    only showing top 2 rows
复制代码


这将创建原始 DataFrame 的副本,该副本仅包含三列:每个人的唯一 ID,他们的国家和他们的职业。


我们首先下载了一个小的数据集,但在现实生活中,如果您使用的是 Spark,则该数据集可能会更大并且可以远程托管。出于这个原因,让我们通过将数据加载到 Redis 来使下一步变得更加现实:


>>> data.write.format("org.apache.spark.sql.redis").option("table","people").option("key.column", "en_curid").save()
复制代码


此命令会将我们的数据集加载到 Redis 中。我们现在将看到,我们指定的两个选项有助于定义 Redis 中的数据布局。

Redis 上的 DataFrames

让我们使用 redis-cli 查询下,看看 DataFrame 是如何存储在 Redis 上的:


$ redis-cli> SCAN 0 MATCH people:* COUNT 31) "2048"2) 1) "people:2113653"2)"people:44849"3)"people:399280"4)"people:101393"
复制代码


SCAN 查询出我们加载到 Redis 中的 Key 和数据。您可以立即看到我们之前给出的选项如何用于定义键名:


“table”, “people” 为表示此 DataFrame 的键定义一个公共前缀,并且


“ key.column”, “ en_curid” 为我们的 DataFrame 定义了主键。


让我们随机取一个 key 看一下具体的内容:


> HGETALL people:21136531) "countryCode"2) "DE"3) "occupation"4) "SOCCER PLAYER"
复制代码


现在,我们已经了解了数据如何存储在 Redis 上,让我们跳回到 pyspark,看看我们如何实际编写管道获取每个国家的名人最常见的职业。如您所见,DataFrame 的每一行都变成了 Redis 哈希结构,其中包含 countryCode 和 occupation。如前所述,en_curid 用作主键,因此它成为键名的一部分。


从 Redis DataFrame 执行计算


即使我们应该将数据仍然加载到内存中,也可以从 Redis 加载它,以便编写与您在现实生活中将要执行的操作更相似的代码。


>>> df = spark.read.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").load()>>> df.show(2)+--------+-----------+----------+|en_curid|countryCode|occupation|+--------+-----------+----------+|  915950|         ZW|   SWIMMER||  726159|         UY|POLITICIAN|+--------+-----------+----------+only showing top 2 rows
复制代码


这就是您的 Spark 管道开始的方式,因此让我们最后进行计算!


>>> counts = df.groupby("countryCode", "occupation").agg({"en_curid": "count"})>>> counts.show(2)+-----------+-------------+---------------+|countryCode|   occupation|count(en_curid)|+-----------+-------------+---------------+|         FR|MATHEMATICIAN|             34||         IT|SOCCER PLAYER|             81|+-----------+-------------+---------------+only showing top 2 rows
复制代码


现在,每一行代表所有当前(国家,职业)组合的计数。下一步,我们只需要选择每个国家/地区计数最高的职业。


首先,导入所需的一些新模块,然后使用 windows 定义代码以选择最常见的职业:


>>> from pyspark.sql.window import Window>>> from pyspark.sql.functions import count, col, row_number>>> w = Window().partitionBy("countryCode").orderBy(col("count(en_curid)").desc())>>> result = counts.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("countryCode", "occupation")>>> result.show(5)+-----------+-------------+|countryCode|   occupation|+-----------+-------------+|         DZ|   POLITICIAN||         LT|   POLITICIAN||         MM|   POLITICIAN||         CI|SOCCER PLAYER||         AZ|   POLITICIAN|+-----------+-------------+only showing top 5 rows
复制代码


此代码将原始行分组为 countryCode,按 count(en_curid)在降序排列每个组的内容,并且仅采用第一个元素。如您所见,在这个小样本中,政客似乎是最常见的职业。


让我们看看有多少个国家是这样:


>>> result.where(col("occupation") == "POLITICIAN").count()150
复制代码


哇,考虑到今天全球有 195 个国家,这真是很多。现在,让我们将其余国家/地区保存在 Redis 中:


>>>no_pol = result.where(col("occupation") != "POLITICIAN")>>> no_pol.write.format("org.apache.spark.sql.redis").option("table","occupation").option("key.column","countryCode").save()
复制代码


如果现在进入 redis-cli,您将能够看到新数据:


$ redis-cli> HGETALL occupation:IT1) "occupation"2) "RELIGIOUS FIGURE"> HGETALL occupation:US1) "occupation"2) "ACTOR"
复制代码


如果您想练习更多,请检查原始数据集,看看是否发现其他引起您兴趣的细节。

关于 Spark 数据类型和 Redis 集群 API 的补充

值得重申的非常重要的一点是,RDD 或 DataFrame / set 对象上的每个操作都将分布在多个节点上。如果我们的例子不仅仅涉及名人,那么一开始我们将有数千万行。在这种情况下,Spark 将扩展计算。但是,如果只有一个 Redis 实例,则将有 N 个节点在其上运行,很可能会成为网络带宽的瓶颈。


为了充分利用 Redis,您需要使用 Redis Cluster API 适当地扩展它。这将确保您的所有计算节点在读取时不会得不到数据,在写入时不会阻塞。

结论

在本文中,我们探讨了如何下载,编译和部署 spark-redis,以便将 Redis 用作 Spark DataFrames 的后端。Redis 提供了对 DataFrame API 的全面支持,因此移植现有脚本并开始享受 Redis 提供的更高速度应该非常容易。如果您想了解更多信息,请查看 GitHub 上关于 spark-redis 的文档。


本文转载自 中间件小哥 公众号。


原文链接:https://mp.weixin.qq.com/s/GOu3EQyqfG3vRqy9mdHKyw


2020-03-01 21:431041

评论

发布
暂无评论
发现更多内容

架构纠缠系列:简单与复杂的技术方案如何取舍

凌晞

架构设计 架构决策

为什么zookeeper不满足线性一致性依然可以实现分布式锁?

Jerry Tse

zookeeper zookeeper分布式锁 分布式锁原理

ChatGPT“与图对话”初体验

无人之路

ChatGPT

火山引擎ByteHouse:如何提升18000节点的ClickHouse可用性?

字节跳动数据平台

数据库 大数据 数据仓库 云原生 企业号9月PK榜

Mac平台PDF编辑和阅读工具 PDF Expert 3

展初云

PDF Mac软件 PDF Expert for Mac

蓝易云:Linux系统命令:du与df的区别!

百度搜索:蓝易云

Linux 磁盘 du df 硬盘

蓝易云:linux系统切换IP实现HTTP代理教程。

百度搜索:蓝易云

云计算 Linux IP HTTP 云服务器

神器 CodeWhisperer

亚马逊云科技 (Amazon Web Services)

人工智能

Python 列表操作指南2

小万哥

Python 程序员 软件 后端 开发

icon图标设计制作 Image2icon最新激活版中文

胖墩儿不胖y

图标制作 图标工具 icon

非一般中国“千年霓裳 遇见运河”通州大运河畔再现传统文化盛景

联营汇聚

文心一言 VS 讯飞星火 VS chatgpt (105)-- 算法导论10.1 3题

福大大架构师每日一题

福大大架构师每日一题

简单的解压缩软件 Bandizip Archiver激活中文最新版

胖墩儿不胖y

Mac软件 解压缩工具 压缩软件

公链如何开发?怎么开发一条可靠的公链

V\TG【ch3nguang】

04. 人工智能核心基础 - 导论(3)

茶桁

人工智能

DxO PhotoLab 7 for mac(专业raw图像处理软件) 7.0.1.31永久激活版

mac

图像处理软件 苹果mac Windows软件 DxO PhotoLab 7

专业好用的照片处理和编辑软件 DxO PhotoLab 7 for mac

展初云

Mac软件 DxO PhotoLab 7 raw智能照片处理工具

钱包发展的未来:ERC-20代币的趋势和创新

区块链软件开发推广运营

交易所开发 dapp开发 区块链开发 链游开发 NFT开发

ERC20代币开发如何影响区块链环境

区块链软件开发推广运营

数字藏品开发 dapp开发 区块链开发 链游开发 NFT开发

人生最优解:体验最极致的人生

少油少糖八分饱

人生 回忆 体验 阅读笔记 死前归零

PDF Expert for mac(pdf编辑工具) v3.5.2激活密钥版

mac

PDF Expert 苹果mac Windows软件 PDF编辑软件

Lightroom Classic 2023 for Mac(摄影后期图像编辑软件) v12.4完美激活版

mac

图像处理软件 苹果mac Windows软件 Lightroom Classic 2023

简单易用的程序创建工具:VMware InstallBuilder Enterprise激活版

mac大玩家j

软件推荐 Mac软件

探索Redis与MySQL的双写问题:挑战与解决方案

Java随想录

Java MySQL redis

Redis,Apache Spark 和 Python 入门_行业深度_翻译自redis.io_InfoQ精选文章