用Spark Streaming实时计算海量用户UV

提出需求

实时统计业务系统(web,APP之类)的访问人数,即所谓UV,或者DAU指标.

这个需求怕是流计算最最最常见的需求了.

计算UV的关键点就在于去重,即同一个人访问两次是只计一个UV的.在离线计算中统计UV比较容易想到的方法就是用group或distinct机制来去重.但是在实时计算场景,还用group就不太科学了,一个是全量数据的group是比较费时的,第二个是全量数据的group是很费内存和CPU的.特别是当用户量巨大的时候,还要做到秒级更新就更难了.

总结起来,需求就是:海量用户场景UV实时计算.

接受挑战

不难发现,问题的主要难点就是去重.

Spark Streaming目前没有给出内置方案(这个其实可以有),但是海量数据去重问题早就有解决办法了.
所以Spark Streaming程序完全可以利用其他系统的现有方案解决去重问题,比如Redis.

Redis的海量去重计数方案

Bitmap方案

所谓的Bitmap就是用一个bit位来标记某个元素对应的Value,比如ID为2的用户,就用第2个bit位来表示,然后用该位的值来表示该用户是否访问过.如果要计算UV,那就只要数一下有多少个1就行啦.

假设我们有40亿用户,使用Bitmap需要2^32个bit位,算下来也就500M左右.

你可能没想到的是,Redis中最常用的数据结构string,就可以实现bitmap算法.

Redis提供了如下命令

1
2
3
4
5
6
// 插入
setbit key offset value
//获取
getbit key offset
//计数
BITCOUNT key [start] [end]

这里offset最大值就是2^32.
比如ID为2的用户,可以setbit uv 2 1,来记录.
最后要计算UV,就直接 BITCOUNT uv. 这步计数非常快,复杂度是O(1).

HyperLogLog方案

若要计算很多页面的UV,用bitmap还是比较费空间的,N个页面就得有N个500M.这时候HyperLogLog结构就是一个比较好的选择.

Redis 在 2.8.9 版本添加了 HyperLogLog 结构。
Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。
在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。
但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

也就是说HyperLogLog是一种基数统计算法,计算结果是近似值, 12 KB 内存就可以计算2^64 个不同元素的基数.

Redis命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

redis 127.0.0.1:6379> PFADD runoobkey "redis"

1) (integer) 1

redis 127.0.0.1:6379> PFADD runoobkey "mongodb"

1) (integer) 1

redis 127.0.0.1:6379> PFADD runoobkey "mysql"

1) (integer) 1

redis 127.0.0.1:6379> PFCOUNT runoobkey

(integer) 3

代码实现

下面给出HyperLogLog方案的参考实现:

1
2
3
4
5
6
7
8
9
10
11
12
stream.foreachRDD { rdd =>
//统计人数
rdd.foreachPartition { partition =>
//从分区所属executor的redis线程池获取一个连接.
val redis = RedisUtil.getRedis
partition.foreach { case (date, userId) =>
//统计当前userId
redis.pfadd(s"uv:$date", userId)
}
redis.close()
}
}

关于Redis的连接,如果是用java或scala可以使用JedisPool,注意处理序列化即可.