《指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计.docx》由会员分享,可在线阅读,更多相关《指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计.docx(12页珍藏版)》请在taowenge.com淘文阁网|工程机械CAD图纸|机械工程制图|CAD装配图下载|SolidWorks_CaTia_CAD_UG_PROE_设计图分享下载上搜索。
1、指标统计:基于流计算Oceanus(Flink)实现实时UVPV统计吴云涛腾讯CSIG高级工程师导语|最近梳理了一下怎样用Flink来实现实时的UV、PV指标的统计并以及公司内微视部门的同事沟通。然后针对该场景做了简化并发现使用FlinkSQL来实现这些指标的统计会更加便捷。一、解决方案描绘1.1概述本方案结合本地自建Kafka集群、腾讯云流计算OceanusFlink)、云数据库Redis对boke、购物等网站UV、PV指标进展实时可视化分析。分析指标包含网站的独立访客数量UV、产品的点击量PV、转化率转化率成交次数/点击量等。相关概念介绍UVUniqueVisitor独立访客数量。访问您网
2、站的一台客户端为一个访客如用户对同一页面访问了5次那么该页面的UV只加1因为UV统计的是去重后的用户数而不是访问次数。PVPageView点击量或者页面阅读量。如用户对同一页面访问了5次那么该页面的PV会加5。1.2方案架构及优势根据以上实时指标统计场景设计了如下架构图涉及产品列表本地数据中心IDC的自建Kafka集群私有网络VPC专线接入/云联网/VPN连接/对等连接流计算Oceanus(Flink)云数据库Redis二、前置准备购置所需的腾讯云资源并打通网络。自建的Kafka集群需根据集群所在区域需采用VPN连接、专线连接或者对等连接的方式来实现网络互通互联。2.1创立私有网络VPC私有网
3、络VPC是一块在腾讯云上自定义的逻辑隔离网络空间在构建Oceanus集群、Redis组件等效劳时选择的网络建议选择同一个VPC网络才能互通。否那么需要使用对等连接、NAT网关、VPN等方式打通网络。私有网络创立步骤请参考帮助文档s:/cloud.tencent/document/product/215/36515。2.2创立Oceanus集群流计算Oceanus是大数据产品生态体系的实时化分析利器是基于ApacheFlink构建的具备一站开发、无缝连接、亚秒延时、低廉本钱、平安稳定等特点的企业级实时大数据分析平台。流计算Oceanus以实现企业数据价值最大化为目的加速企业实时化数字化的建立进程
4、。在Oceanus控制台的【集群管理】-【新建集群】页面创立集群选择地域、可用区、VPC、日志、存储设置初始密码等。VPC及子网使用刚刚创立好的网络。创立完后Flink的集群如下2.3创立Redis集群在Redis控制台(s:/console.cloud.tencent/redis#/)的【新建实例】页面创立集群选择与其他组件同一地域同区域的同一私有网络VPC这里还选择同一子网。2.4配置自建Kafka集群2.4.1修改自建Kafka集群配置自建Kafka集群连接时bootstrap-servers参数常常使用hostname而不是ip来连接。但用自建Kafka集群连接腾讯云上的Oceanus
5、集群为全托管集群Oceanus集群的节点上无法解析自建集群的hostname与ip的映射关系所以需要改监听器地址由hostname为ip地址连接的形式。将config/server.properties配置文件中advertised.listeners参数配置为IP地址。例如#0.10.X及以后版本advertised.listenersPLAINTEXT:/10.1.0.10:9092#0.10.X之前版本advertised.host.namePLAINTEXT:/10.1.0.10:9092修改后重启Kafka集群。!假设在云上使用到自建的zookeeper地址也需要将zk配置中的hos
6、tname修改IP地址形式。2.4.2模拟发送数据到topic本案例使用topic为topic为uvpv-demo。1Kafka客户端进入自建Kafka集群节点启动Kafka客户端模拟发送数据。./bin/kafka-console-producer.sh-broker-list10.1.0.10:9092-topicuvpv-demorecord_type:0,user_id:2,client_ip:100.0.0.2,product_id:101,create_time:2021-09-0816:20:00record_type:0,user_id:3,client_ip:100.0.0.
7、3,product_id:101,create_time:2021-09-0816:20:00record_type:1,user_id:2,client_ip:100.0.0.1,product_id:101,create_time:2021-09-0816:20:002使用脚本发送脚本一Java代码参考s:/cloud.tencent/document/product/597/54834脚本二Python脚本。参考之前案例中python脚本进展适当修改即可?视频直播实时数据可视化分析?2.5打通自建IDC集群到腾讯云网络通信自建Kafka集群联通腾讯云网络可通过以下前3种方式打通自建IDC
8、到腾讯云的网络通信。专线接入s:/cloud.tencent/document/product/216适用于本地数据中心IDC与腾讯云网络打通。云联网s:/cloud.tencent/document/product/877适用于本地数据中心IDC与腾讯云网络打通可以用于云上不同地域间私有网络VPC打通。VPN连接s:/cloud.tencent/document/product/554适用于本地数据中心IDC与腾讯云网络打通。对等连接NAT网关对等连接s:/cloud.tencent/document/product/553NAT网关s:/cloud.tencent/document/pro
9、duct/552合适云上不同地域间私有网络VPC打通不合适本地IDC到腾讯云网络。本方案中使用了VPN连接的方式实现本地IDC以及云上网络的通信。参考链接建立VPC到IDC的连接路由表(s:/cloud.tencent/document/product/554/52854)根据方案绘制了下面的网络架构图三方案实现3.1业务目的利用流计算Oceanus实现网站UV、PV、转化率指标的实时统计这里只列取以下3种统计指标网站的独立访客数量UV。Oceanus处理后在Redis中通过set类型存储独立访客数量同时也到达了对同一访客的数据去重的目的。网站商品页面的点击量PV。Oceanus处理后在Red
10、is中使用list类型存储页面点击量。转化率转化率成交次数/点击量。Oceanus处理后在Redis中用String存储即可。3.2源数据格式Kafkatopicuvpv-demo阅读记录字段类型含义record_typeint客户号user_idvarchar客户ip地址client_ipvarchar房间号product_idInt进入房间时间create_timetimestamp创立时间Kafka内部采用json格式存储数据格式如下#阅读记录record_type:0,#0表示阅读记录user_id:6,client_ip:100.0.0.6,product_id:101,create
11、_time:2021-09-0616:00:00#购置记录record_type:1,#1表示购置记录user_id:6,client_ip:100.0.0.8,product_id:101,create_time:2021-09-0818:00:003.3编写FlinkSQL作业例如中实现了UV、PV以及转化率3个指标的获取逻辑并写入Sink端。1、定义SourceCREATETABLEinput_web_record(record_typeINT,user_idINT,client_ipVARCHAR,product_idINT,create_timeTIMESTAMP,timesAScr
12、eate_time,WATERMARKFORtimesAStimes-INTERVAL10MINUTE)WITH(connectorkafka,-可选kafka,kafka-0.11.注意选择对应的内置Connectortopicuvpv-demo,scan.startup.modeearliest-offset,-properties.bootstrap.servers82.157.27.147:9092,properties.bootstrap.servers10.1.0.10:9092,properties.group.idWebRecordGroup,-必选参数,一定要指定GroupI
13、Dformatjson,json.ignore-parse-errorstrue,-忽略JSON构造解析异常json.fail-on-missing-fieldfalse-假如设置为true,那么遇到缺失字段会报错设置为false那么缺失字段设置为null);2、定义Sink-UVsinkCREATETABLEoutput_uv(useridsSTRING,user_idSTRING)WITH(connectorredis,commandsadd,-使用集合保存uv支持命令set、lpush、sadd、hset、zaddnodes192.28.28.217:6379,-redis连接地址,集群
14、形式多个节点使用,分隔。-additional-keykey,-用于指定hset以及zadd的key。hset、zadd必须设置。passwordyourpassword-PVsinkCREATETABLEoutput_pv(pagevisitsSTRING,product_idSTRING,hour_countBIGINT)WITH(connectorredis,commandlpush,-使用列表保存pv支持命令set、lpush、sadd、hset、zaddnodes192.28.28.217:6379,-redis连接地址,集群形式多个节点使用,分隔。-additional-keyke
15、y,-用于指定hset以及zadd的key。hset、zadd必须设置。passwordyourpassword-转化率sinkCREATETABLEoutput_conversion_rate(conversion_rateSTRING,rateSTRING)WITH(connectorredis,commandset,-使用列表保存pv支持命令set、lpush、sadd、hset、zaddnodes192.28.28.217:6379,-redis连接地址,集群形式多个节点使用,分隔。-additional-keykey,-用于指定hset以及zadd的key。hset、zadd必须设置
16、。passwordyourpassword);3、业务逻辑-加工得到UV指标统计所有时间内的UVINSERTINTOoutput_uvSELECTuseridsASuserids,CAST(user_idASstring)ASuser_idFROMinput_web_record;-加工并得到PV指标统计每10分钟内的PVINSERTINTOoutput_pvSELECTpagevisitsASpagevisits,CAST(product_idASstring)ASproduct_id,SUM(product_id)AShour_countFROMinput_web_recordWHEREr
17、ecord_type0GROUPBYHOP(times,INTERVAL5MINUTE,INTERVAL10MINUTE),product_id,user_id;-加工并得到转化率指标统计每10分钟内的转化率INSERTINTOoutput_conversion_rateSELECTconversion_rateASconversion_rate,CAST(SELECTCOUNT(1)FROMinput_web_recordWHERErecord_type0)*1.0)/SUM(a.product_id)asstring)FROM(SELECT*FROMinput_web_recordwher
18、erecord_type1)ASaGROUPBYHOP(times,INTERVAL5MINUTE,INTERVAL10MINUTE),product_id;3.4结果验证通常情况会通过Web网站来展示统计到的UV、PV指标这里为了简单直接在Redis控制台(s:/console.cloud.tencent/redis#/)登录进展查询userids:存储UVpagevisits:存储PVconversion_rate:存储转化率即购置商品次数/总页面点击量。四总结通过自建Kafka集群收集数据在流计算Oceanus(Flink)中实时进展字段累加、窗口聚合等操作将加工后的数据存储在云数据库Redis统计到实时刷新的UV、PV等指标。这个方案在Kafkajson格式设计时为了简便易懂做了简化处理将阅读记录以及产品购置记录都放在了同一个topic中重点通过打通自建IDC以及腾讯云产品间的网络来展现整个方案。针对超大规模的UV去重微视的同事采用了Redishyperloglog方式来实现UV统计。相比直接使用set类型方式有极小的内存空间占用的优点详情见链接s:/cloud.tencent/developer/article/1889162。点击文末浏览原文解析腾讯云流计算Oceanus更多信息腾讯云大数据长按二维码关注我们