赞
踩
实时业务需要计算新增用户,从 Hive 加载用户表到内存存储,但是几千万数据放到内存占用资源,查询效率也低。使用 Redis 位图进行存储,可以有效减少内存占用,提高查询效率。这里使用 Spark 查询, Redis 位图存储。
这里是 Scala + Spark local模式 + Redis 单节点测试。
测试了几种 Hash 算法,单一 Hash 算法效果或者效率都不是很满意,在万级到几十万级时,出现了多个碰撞。经过几种算法多次Hash,结果仍然不理想,在千万数据时,仍会有大量碰撞。
测试了 Redissom,碰撞率接近的情况下,Redisson 效率会低很多(千万数据/小时级)。
测试了多个质数组合:(61),(7,61),(7,11,13,61),(5,7,11,13,61),(61,83,113,151,211),(61,83,113,151,211,379)...
最后选择 murmurhash,经过 5 次 Hash(37, 41, 61, 79, 83), 使用 256m 存储 4kw 数据, 31 个碰撞,可以接受。
主类
- import java.util.Properties
-
- import com.sm.bloom.Bloom
- import com.sm.constants.Constants
- import com.sm.utils.{DateUtils, RedisUtils}
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.sql.SparkSession
- import org.slf4j.LoggerFactory
- import redis.clients.jedis.Jedis
-
- /**
- * Spark查询用户表,导入Redis缓存
- *
- * create by LiuJinHe 2020/3/3
- */
- object ImportDataToRedis {
- private val warehouseLocation = Constants.WAREHOUSE
- private val logger = LoggerFactory.getLogger(this.getClass)
- private var prop: Properties = new Properties
- private var jedis: Jedis = _
- private var startDate: String = _
- private var endDate: String = _
- private var count: Long = 0L
-
- def main(args: Array[String]): Unit = {
- Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
- Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
- Logger.getLogger("org.spark_project.jetty").setLevel(Level.WARN)
-
- // hash 质数
- val seeds = Array(37, 41, 61, 79, 83)
-
- if (args.length == 2) {
- startDate = args(0)
- endDate = args(1)
- }
-
- val spark = initSparkSession
-
- logger.info(s"========== 全量导入新增用户表到 Redis ==========")
-
- val start = System.currentTimeMillis()
-
- // 加载 redis 配置,初始化 jedis
- val redisProp = loadProp(Constants.REDIS_PROP)
- val host = redisProp.getProperty("redis.host")
- val port = redisProp.getProperty("redis.port").toInt
- val timeout = redisProp.getProperty("redis.timeout").toInt
- jedis = RedisUtils.init(host, port, timeout)
-
- // 用户位图
- // 位图存储用户记录,key:用户表名, value:bitmap (offset,即 hash(core_account + game_id + package_id))
- val userBit = redisProp.getProperty("redis.user.bit")
-
- // 初始化布隆过滤器,128m
- lazy val bloom = new Bloom(1 << 31)
-
- // 临时参数
- var data: Array[String] = null
- var gamePackage: String = ""
- var userId: String = ""
- var offset: Array[Long] = null
- var isExist: Boolean = true
-
- // 读取用户表数据
- val sqlStr =
- s"""
- |select
- | core_account, game_id, package_id, time
- |from sm_data.dws_new_user_id
- | where add_date >= '$startDate' and add_date < '$endDate'
- """.stripMargin
-
- val dataFrame = spark.sql(sqlStr)
- dataFrame.foreach(line => {
- // 组合字段作为userId
- userId = line.getInt(1) + "," + line.getInt(2) + "," + line.getString(0)
-
- // hash
- offset = bloom.murmurHash3(userId, seeds)
-
- // 判断 redis 位图中是否存在
- isExist = bloom.getbit(jedis, userBit, offset)
- if (!isExist) {
- // 如果不存在,设置位图对应位置1
- bloom.setbit(jedis, userBit, offset)
- count += 1
- }
- })
-
- spark.stop()
- jedis.close()
-
- val end = System.currentTimeMillis()
- logger.info(s"========== 新增总数: $count ==========")
- logger.info(s"========== 耗时: ${(end - start) / 1000} 秒 ==========")
-
- }
-
- def initSparkSession: SparkSession = SparkSession.builder()
- .appName(this.getClass.getSimpleName)
- .master(Constants.SPARK_LOCAL_MODE)
- .config("spark.sql.warehouse.dir", warehouseLocation)
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .config("spark.kryoserializer.buffer", "1024m")
- .config("spark.kryoserializer.buffer.max", "2046m")
- .config("spark.io.compression.codec", "snappy")
- .enableHiveSupport()
- .getOrCreate()
- }

RedisUtils
- import org.apache.commons.pool2.impl.GenericObjectPoolConfig
- import redis.clients.jedis.{Jedis, JedisPool}
-
- /**
- * create by LiuJinHe 2020/3/3
- */
- object RedisUtils extends Serializable {
-
- // @transient 类型修饰符,只能用来修饰字段。
- // 在对象序列化过程中,被transient 标记的变量不会被序列化
- @transient private var redisPool: JedisPool = _
- var host: String = _
- var port: Int = _
- var timeout: Int = _
-
- def init(host: String, port: Int, timeout: Int): Jedis = {
- redisPool = new JedisPool(new GenericObjectPoolConfig, host, port, timeout)
- redisPool.getResource
- }
- }

Bloom类
- import redis.clients.jedis.Jedis
- import scala.util.hashing.MurmurHash3
-
-
- /**
- * create by LiuJinHe 2020/3/3
- */
- class Bloom(size: Long) extends Serializable {
- // 位图大小,默认128m
- private val cap = if (size > 0) size else 1 << 31
- private val seeds = Array(37, 41, 61, 79, 83)
- private var indices:Array[Long] = _
-
- /**
- * BKDRHash
- * @param value 需要hash的字符串
- * @param seed 随机数种子
- * @return hash结果
- */
- def hash(str: String, seed: Int): Long = {
- var result = 0L
- val value = md5Hash(str)
- for (i <- 0 until value.length) {
- result = result * seed + value.charAt(i)
- }
- result = result & (cap - 1)
- println("result: " + result)
- result
- }
-
- /**
- * murmurHash
- * @param value 需要hash的字符串
- * @return hash后的结果数组
- */
- def murmurHash3(value: String): Long = {
- var result = 0L
- if (value != null && !value.isEmpty) {
- for (i <- seeds.indices) {
- result = Math.abs(MurmurHash3.stringHash(value, seeds(i))) % cap
- }
- }
- result
- }
-
- /**
- * murmurHash
- * @param seeds 传入murHash 中的 seed 的范围
- * @return hash后的结果数组
- */
- def murmurHash3(value: String, seeds: Array[Int]): Array[Long] = {
- indices = new Array[Long](seeds.length)
- if (value != null && !value.isEmpty) {
- for (i <- seeds.indices) {
- indices(i) = Math.abs(MurmurHash3.stringHash(value, seeds(i))) % cap
- }
- }
- indices
- }
-
- def setbit(jedis: Jedis, key: String, indices: Array[Long]): Unit = {
- for (index <- indices) {
- jedis.setbit(key, index, true)
- }
- }
-
- def getbit(jedis: Jedis, key: String, indices: Array[Long]): Boolean ={
- if (key == null && key.isEmpty ) return false
- var isExists = true
- for (index <- indices) {
- isExists = isExists && jedis.getbit(key, index)
- }
- isExists
- }
- }

pom.xml
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <scala.version>2.11.12</scala.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.complete.version>${scala.binary.version}.8</scala.complete.version>
- <hadoop.version>2.7.6</hadoop.version>
- <spark.version>2.4.0</spark.version>
- <slf4j.version>1.7.20</slf4j.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>3.1.0</version>
- </dependency>
- </dependencies>
-
- <build>
- <pluginManagement>
- <plugins>
- <!-- 该插件用于将Scala代码编译成class文件 -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.0</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.8.1</version>
- </plugin>
- </plugins>
- </pluginManagement>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>add-source</goal>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <executions>
- <execution>
- <phase>compile</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>

Redis
RDB文件
结果
- hash 5 {37, 41, 61, 79, 83} 256m
-
- 原表 导入 差值
- 17440258 17440227 31
- 6675384 6675384 0
- 11730951 11730951 0
- 3489212 3489212 0
-
- 总数
- 39,335,805 39,335,774 31
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。