写点什么

GPkafka-Kafka 数据导入 GreenPlum 实践

  • 2019-08-30
  • 本文字数:5432 字

    阅读完需:约 18 分钟

GPkafka-Kafka 数据导入 GreenPlum 实践

本文经授权转载自 PostgreSQL 中文社区。


Kafka 是分布式消息订阅系统,有非常好的横向扩展性,可实时存储海量数据,是流数据处理中间件的事实标准。当通过 Kafka 和 greenplum 搭建流处理管道时,如何高速可靠的完成流数据加载,成为用户最关心的问题。从 5.10 开始,Greenplum 发布了新的工具 GPKafka,为 Greenplum 提供了流数据加载的能力。


GPkafka 工具:kafka —> Greenplum

一、安装准备

kafka 安装:版本为 kafka_2.11-2.1.0。


greenplum 安装:版本为 5.16

二、Kafka 数据导入 GreenPlum

  1. 启动 kafka


# 启动zookeeper
$ /opt/zookeeper-3.4.12/bin/zkServer.sh start
# 启动kafka
$/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon ../config/server.properties
复制代码


2.创建 gpss 扩展


在将 Kafka 消息数据加载到 Greenplum 数据库之前,必须在将 Kafka 数据写入 Greenplum 表的每个数据库中注册 Greenplum-Kafka 集成格式化程序函数;示例在 lottu 数据库


[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.
lottu=# CREATE EXTENSION gpss;
复制代码


3.创建示例表


kafka 的数据格式 json 形式;样式:


{  "time": 1550198435941,  "type": "type_mobileinfo",  "phone_imei": "861738033581011",  "phone_imsi": "",  "phone_mac": "00:27:1c:95:47:09",  "appkey": "307A5C626E6C2F6472636E6E6A2F736460656473",  "phone_udid": "8F137BFFB2289784A5EA2DCADCE519C2",  "phone_udid2": "744DD04CE29652F4F1D2DFFC8D3204A9",  "appUdid": "D21C76419E54B18DDBB94BF2E6990183",  "phone_resolution": "1280*720",  "phone_apn": "",  "phone_model": "BF T26",  "phone_firmware_version": "5.1",  "phone_softversion": "3.19.0",  "phone_softname": "com.esbook.reader",  "sdk_version": "3.1.8",  "cpid": "blp1375_13621_001",  "currentnetworktype": "wifi",  "phone_city": "",  "os": "android",  "install_path": "\/data\/app\/com.esbook.reader-1\/base.apk",  "last_cpid": "",  "package_name": "com.esbook.reader",  "src_code": "WIFIMAC:00:27:1c:95:47:09"} 
复制代码


我需要其中的 package_name,appkey ,time, phone_udid,os, idfa,phone_imei,cpid,last_cpid,phone_number 字段;所以我创建的表语句


CREATE TABLE tbl_novel_mobile_log (
package_name text,
appkey text,
ts bigint,
phone_udid text,
os character varying(20),
idfa character varying(64),
phone_imei character varying(20),
cpid text,
last_cpid text,
phone_number character varying(20)
) ;
复制代码


4.创建 gpkafka.yaml 配置文件


gpkafka_mobile_yaml文件内容:
DATABASE: lottu
USER: gpadmin
HOST: oracle166
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: kafkaip:9092
TOPIC: mobile_info
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: tbl_novel_mobile_log
MAPPING:
- NAME: package_name
EXPRESSION: (jdata->>'package_name')::text
- NAME: appkey
EXPRESSION: (jdata->>'appkey')::text
- NAME: ts
EXPRESSION: (jdata->>'time')::bigint
- NAME: phone_udid
EXPRESSION: (jdata->>'phone_udid')::text
- NAME: os
EXPRESSION: (jdata->>'os')::text
- NAME: idfa
EXPRESSION: (jdata->>'idfa')::text
- NAME: phone_imei
EXPRESSION: (jdata->>'phone_imei')::text
- NAME: cpid
EXPRESSION: (jdata->>'cpid')::text
- NAME: last_cpid
EXPRESSION: (jdata->>'last_cpid')::text
- NAME: phone_number
EXPRESSION: (jdata->>'phone_number')::text
COMMIT:
MAX_ROW: 1000
复制代码


5.创建 mobile_info topic


/opt/kafka/kafka_2.11-2.1.0/bin/kafka-topics.sh --create --zookeeper kafkaIp:2181 --replication-factor 1 --partitions 1  --topic mobile_info
复制代码


6.创建 kafka 的发布者


执行下列命令;并添加 kafka 记录


[root@oracle166 ~]# /opt/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh  --broker-list kafkaIP:9092 --topic mobile_info
复制代码


>{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BFT26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"}
{"time":1550198437885,"type":"type_mobileinfo","phone_imei":"862245038046551","phone_imsi":"","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626F2F76646B74606F2F736460656473","phone_udid":"A3BB70A0218AEFC7908B1D79C0C02D77","phone_udid2":"E3976E0453010FC7F32B6143AA3A164E","appUdid":"4FBEF77BC076254ED0407CAD653E6954","phone_resolution":"1920*1080","phone_apn":"","phone_model":"LeX620","phone_firmware_version":"6.0","phone_softversion":"1.9.0","phone_softname":"cn.wejuan.reader","sdk_version":"3.1.8","cpid":"blf1298_14411_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/cn.wejuan.reader1\/base.apk","last_cpid":"","package_name":"cn.wejuan.reader","src_code":"ffffffff-9063-8e34-0000-00007efffeff"}
{"time":1550198438311,"type":"type_mobileinfo","phone_number":"","phone_imei":"867520045576831","phone_imsi":"460001122544742","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"A00407EF9D6EBCC207A514CDA452EB76","phone_udid2":"A00407EF9D6EBCC207A514CDA452EB76","appUdid":"1C35633F4EB8218789EFD8666C763485","phone_resolution":"2086*1080","phone_apn":"CMCC","phone_model":"ONEPLUSA6000","phone_firmware_version":"9","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.readerTlgFCk6ANgEDRnXDCem8uQ==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460001122544742"}
{"time":1550198433102,"type":"type_mobileinfo","phone_number":"15077113477","phone_imei":"860364049874919","phone_imsi":"460023771256711","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"EEF566CB5253AA62B653347A203815C3","phone_udid2":"0845931539AE39B3B0D4EB42B85D98EC","appUdid":"9570DCA2D574E6B69B24137035209D42","phone_resolution":"2340*1080","phone_apn":"CHINAMOBILE","phone_model":"PBEM00","phone_firmware_version":"8.1.0","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.readerNBToXQo14TOeNuPxo_aA4w==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"13598c2d-efc4-4957-8d4d-22eb145d15fd"}
{"time":1550198440577,"type":"type_mobileinfo","phone_imei":"869800021106037","phone_imsi":"","phone_mac":"2c:5b:b8:fb:79:af","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"2BC16C4AC07070BA9608BBD0EE2EE320","phone_udid2":"A7F9FA4772D31FADEECFDB445BA3BEBB","appUdid":"DC6BEE2F6E5D6A133E26131887AE788A","phone_resolution":"960*540","phone_apn":"","phone_model":"OPPOA33","phone_firmware_version":"5.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_14526_003","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:2c:5b:b8:fb:79:af"}
{"time":1506944701166,"type":"type_mobileinfo","phone_number":"+8618602699126","phone_imei":"865902038154143","phone_imsi":"460012690618403","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"388015DA70C0AEA6D59D3CE37B0C4BA2","phone_udid2":"388015DA70C0AEA6D59D3CE37B0C4BA2","appUdid":"EC0A105297D55075526018078A4A1B84","phone_resolution":"1920*1080","phone_apn":"中国联通","phone_model":"MIMAX2","phone_firmware_version":"7.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_10928_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460012690618403"}

复制代码


7.执行 gpkafka 加载数据


[gpadmin@oracle166 ~]$ gpkafka load --quit-at-eof ./gpkafka_mobile_yamlPartitionID StartTime EndTime BeginOffset EndOffset0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5Job dcd0d159282c0ef39f182cabeef23ee6 stopped normally at 2019-02-27 09:26:29.442874281 +0000 UTC 
复制代码


8.检查加载操作的进度(非必要)


[gpadmin@oracle166 ~]$ gpkafka check ./gpkafka_mobile_yamlPartitionID StartTime EndTime BeginOffset EndOffset0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5
复制代码


9.查看表数据


[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.


lottu=# select * from tbl_novel_mobile_log ;
package_name | appkey | ts | phone_udid | os | idfa | phone_imei | cpid | last_cpid | p
hone_number
-------------------+------------------------------------------+---------------+----------------------------------+---------+------+-----------------+--
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198435941 | 8F137BFFB2289784A5EA2DCADCE519C2 | android | | 861738033581011 | blp1375_13621_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198438311 | A00407EF9D6EBCC207A514CDA452EB76 | android | | 867520045576831 | blf1298_12242_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198433102 | EEF566CB5253AA62B653347A203815C3 | android | | 860364049874919 | blf1298_12242_001 | | 1
5077113477
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198440577 | 2BC16C4AC07070BA9608BBD0EE2EE320 | android | | 869800021106037 | blp1375_14526_003 | |
cn.wejuan.reader | 307A5C626F2F76646B74606F2F736460656473 | 1550198437885 | A3BB70A0218AEFC7908B1D79C0C02D77 | android | | 862245038046551 | blf1298_14411_001 | |
(5 rows)

复制代码

三、后记

编辑本文初衷是:公司计划为北京 ES 小说作投放计划,需要类似热云数据平台作为投放数据支持,使投放更加精准可靠。北京小说部门数据存放于 kafka 中,需要将 kafka 中的数据导入深圳后台数据库中,虽然最后平台未采用 gpkafka 方式,但不失一种方案,由于种种原因后台数据库选 PG9.6 版本,采用 java 代码实现 kafka 数据实时导入 PG。最后祝 PG,GP 越来越好,也期待 pgkafka 工具诞生。

四、参考文献

1、gpkafka 更多用法


https://gpdb.docs.pivotal.io/5120/greenplum-kafka/intro.html


2、BottledWater-PG:PostgreSQL 集成 Kafka 的实时数据交换平台


https://www.jianshu.com/p/c3659f49bf94


作者介绍:


Lottu,就职于深圳宜搜科技有限公司,担任数据库 DBA,主要承担 PostgreSQL、Oracle 数据库维护工作以及数据库去 O 工作。


原文链接:


https://mp.weixin.qq.com/s/HuYYvKtV8RfNxtrrJwR6SQ


2019-08-30 09:426948

评论

发布
暂无评论
发现更多内容

一叶红船见百年!百度大脑助力南湖红船泛起国人心中红色情怀

百度大脑

模块一作业

lhp

架构实战营

面试官问的那些Android原理你都懂吗?快来收藏!

欢喜学安卓

android 程序员 面试 移动开发

全网首发!“阿里爸爸”最新出品SpringBoot高级笔记(内部笔记!)

Java spring

程序员外包避坑指南?

孙叫兽

程序员 外包

5分钟速读之Rust权威指南(三十六)模式匹配

wzx

rust

数字货币大趋势,DC EP出征,带老百姓进入新时代!

CECBC

辞职1000小时后,我走进字节跳动拿了offer

Java 程序员 面试 java编程

人民网发文:区块链如何跨越未来10年

CECBC

RedHat7.2 切换yum源记录

Bruce Xiong

redhat yum源

数据准备的能力,决定企业AI研发的边界

百度大脑

人工智能

云计算还有多久能够替代高性能计算?

北鲲云

YOLOV1解读

re-执着

数据安全法下,企业如何平衡数据安全合规与业务性能?

腾讯安全云鼎实验室

数据安全 数据安全法

重磅!不容错过的阿里内部微服务速成手册也太赞了(2021版)

Java 程序员 面试 java编程

Java 8 新特性

Bf-Bus

关于数据安全

奔向架构师

大数据 数据安全

iOS端屏幕录制开发指南

anyRTC开发者

音视频 WebRTC ios开发 屏幕录制

CODING 助力推进腾讯游戏国际化进程

CODING DevOps

DevOps 开发工具 腾讯游戏 软件研发

InnoDB存储引擎-锁

CodeWithBuff

MySQL innodb

《持之以恒的从事运动》三

Changing Lin

当法律纽带变成“机器红线”,能让自动驾驶汽车更安全吗?

脑极体

就是它,帮我斩获了8家大厂offer,由于太全被各大厂要求Github连夜下架

Java架构师迁哥

算法面试通关

buchila11

面试

模块一作业:微信业务架构图和毕设架构设计

Felix

[架构实战营][模块一作业]

KK_TTN

#架构实战营

为了对抗内卷,我“偷”了阿里两份笔记:JDK源码+Java并发图册

Java架构师迁哥

自制深度学习照片数据集

re-执着

面试官问的那些Android原理你都懂吗?值得一看

欢喜学安卓

android 程序员 面试 移动开发

区块链互操作性:大规模应用的关键

CECBC

第一周作业

Morphling

#架构实战营

GPkafka-Kafka 数据导入 GreenPlum 实践_大数据_Lottu_InfoQ精选文章