当前位置:   article > 正文

Redis 布隆过滤器存储查询千万级用户数据_布隆过滤器可以存储多少数据

布隆过滤器可以存储多少数据

实时业务需要计算新增用户,从 Hive 加载用户表到内存存储,但是几千万数据放到内存占用资源,查询效率也低。使用 Redis 位图进行存储,可以有效减少内存占用,提高查询效率。这里使用 Spark 查询, Redis 位图存储。

这里是 Scala + Spark local模式 + Redis 单节点测试。

测试了几种 Hash 算法,单一 Hash 算法效果或者效率都不是很满意,在万级到几十万级时,出现了多个碰撞。经过几种算法多次Hash,结果仍然不理想,在千万数据时,仍会有大量碰撞。

测试了 Redissom,碰撞率接近的情况下,Redisson 效率会低很多(千万数据/小时级)。

测试了多个质数组合:(61),(7,61),(7,11,13,61),(5,7,11,13,61),(61,83,113,151,211),(61,83,113,151,211,379)...

最后选择 murmurhash,经过 5 次 Hash(37, 41, 61, 79, 83), 使用 256m 存储 4kw 数据, 31 个碰撞,可以接受。

主类

  1. import java.util.Properties
  2. import com.sm.bloom.Bloom
  3. import com.sm.constants.Constants
  4. import com.sm.utils.{DateUtils, RedisUtils}
  5. import org.apache.log4j.{Level, Logger}
  6. import org.apache.spark.sql.SparkSession
  7. import org.slf4j.LoggerFactory
  8. import redis.clients.jedis.Jedis
  9. /**
  10. * Spark查询用户表,导入Redis缓存
  11. *
  12. * create by LiuJinHe 2020/3/3
  13. */
  14. object ImportDataToRedis {
  15. private val warehouseLocation = Constants.WAREHOUSE
  16. private val logger = LoggerFactory.getLogger(this.getClass)
  17. private var prop: Properties = new Properties
  18. private var jedis: Jedis = _
  19. private var startDate: String = _
  20. private var endDate: String = _
  21. private var count: Long = 0L
  22. def main(args: Array[String]): Unit = {
  23. Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
  24. Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
  25. Logger.getLogger("org.spark_project.jetty").setLevel(Level.WARN)
  26. // hash 质数
  27. val seeds = Array(37, 41, 61, 79, 83)
  28. if (args.length == 2) {
  29. startDate = args(0)
  30. endDate = args(1)
  31. }
  32. val spark = initSparkSession
  33. logger.info(s"========== 全量导入新增用户表到 Redis ==========")
  34. val start = System.currentTimeMillis()
  35. // 加载 redis 配置,初始化 jedis
  36. val redisProp = loadProp(Constants.REDIS_PROP)
  37. val host = redisProp.getProperty("redis.host")
  38. val port = redisProp.getProperty("redis.port").toInt
  39. val timeout = redisProp.getProperty("redis.timeout").toInt
  40. jedis = RedisUtils.init(host, port, timeout)
  41. // 用户位图
  42. // 位图存储用户记录,key:用户表名, value:bitmap (offset,即 hash(core_account + game_id + package_id))
  43. val userBit = redisProp.getProperty("redis.user.bit")
  44. // 初始化布隆过滤器,128m
  45. lazy val bloom = new Bloom(1 << 31)
  46. // 临时参数
  47. var data: Array[String] = null
  48. var gamePackage: String = ""
  49. var userId: String = ""
  50. var offset: Array[Long] = null
  51. var isExist: Boolean = true
  52. // 读取用户表数据
  53. val sqlStr =
  54. s"""
  55. |select
  56. | core_account, game_id, package_id, time
  57. |from sm_data.dws_new_user_id
  58. | where add_date >= '$startDate' and add_date < '$endDate'
  59. """.stripMargin
  60. val dataFrame = spark.sql(sqlStr)
  61. dataFrame.foreach(line => {
  62. // 组合字段作为userId
  63. userId = line.getInt(1) + "," + line.getInt(2) + "," + line.getString(0)
  64. // hash
  65. offset = bloom.murmurHash3(userId, seeds)
  66. // 判断 redis 位图中是否存在
  67. isExist = bloom.getbit(jedis, userBit, offset)
  68. if (!isExist) {
  69. // 如果不存在,设置位图对应位置1
  70. bloom.setbit(jedis, userBit, offset)
  71. count += 1
  72. }
  73. })
  74. spark.stop()
  75. jedis.close()
  76. val end = System.currentTimeMillis()
  77. logger.info(s"========== 新增总数: $count ==========")
  78. logger.info(s"========== 耗时: ${(end - start) / 1000} 秒 ==========")
  79. }
  80. def initSparkSession: SparkSession = SparkSession.builder()
  81. .appName(this.getClass.getSimpleName)
  82. .master(Constants.SPARK_LOCAL_MODE)
  83. .config("spark.sql.warehouse.dir", warehouseLocation)
  84. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  85. .config("spark.kryoserializer.buffer", "1024m")
  86. .config("spark.kryoserializer.buffer.max", "2046m")
  87. .config("spark.io.compression.codec", "snappy")
  88. .enableHiveSupport()
  89. .getOrCreate()
  90. }

 RedisUtils

  1. import org.apache.commons.pool2.impl.GenericObjectPoolConfig
  2. import redis.clients.jedis.{Jedis, JedisPool}
  3. /**
  4. * create by LiuJinHe 2020/3/3
  5. */
  6. object RedisUtils extends Serializable {
  7. // @transient 类型修饰符,只能用来修饰字段。
  8. // 在对象序列化过程中,被transient 标记的变量不会被序列化
  9. @transient private var redisPool: JedisPool = _
  10. var host: String = _
  11. var port: Int = _
  12. var timeout: Int = _
  13. def init(host: String, port: Int, timeout: Int): Jedis = {
  14. redisPool = new JedisPool(new GenericObjectPoolConfig, host, port, timeout)
  15. redisPool.getResource
  16. }
  17. }

Bloom类

  1. import redis.clients.jedis.Jedis
  2. import scala.util.hashing.MurmurHash3
  3. /**
  4. * create by LiuJinHe 2020/3/3
  5. */
  6. class Bloom(size: Long) extends Serializable {
  7. // 位图大小,默认128m
  8. private val cap = if (size > 0) size else 1 << 31
  9. private val seeds = Array(37, 41, 61, 79, 83)
  10. private var indices:Array[Long] = _
  11. /**
  12. * BKDRHash
  13. * @param value 需要hash的字符串
  14. * @param seed 随机数种子
  15. * @return hash结果
  16. */
  17. def hash(str: String, seed: Int): Long = {
  18. var result = 0L
  19. val value = md5Hash(str)
  20. for (i <- 0 until value.length) {
  21. result = result * seed + value.charAt(i)
  22. }
  23. result = result & (cap - 1)
  24. println("result: " + result)
  25. result
  26. }
  27. /**
  28. * murmurHash
  29. * @param value 需要hash的字符串
  30. * @return hash后的结果数组
  31. */
  32. def murmurHash3(value: String): Long = {
  33. var result = 0L
  34. if (value != null && !value.isEmpty) {
  35. for (i <- seeds.indices) {
  36. result = Math.abs(MurmurHash3.stringHash(value, seeds(i))) % cap
  37. }
  38. }
  39. result
  40. }
  41. /**
  42. * murmurHash
  43. * @param seeds 传入murHash 中的 seed 的范围
  44. * @return hash后的结果数组
  45. */
  46. def murmurHash3(value: String, seeds: Array[Int]): Array[Long] = {
  47. indices = new Array[Long](seeds.length)
  48. if (value != null && !value.isEmpty) {
  49. for (i <- seeds.indices) {
  50. indices(i) = Math.abs(MurmurHash3.stringHash(value, seeds(i))) % cap
  51. }
  52. }
  53. indices
  54. }
  55. def setbit(jedis: Jedis, key: String, indices: Array[Long]): Unit = {
  56. for (index <- indices) {
  57. jedis.setbit(key, index, true)
  58. }
  59. }
  60. def getbit(jedis: Jedis, key: String, indices: Array[Long]): Boolean ={
  61. if (key == null && key.isEmpty ) return false
  62. var isExists = true
  63. for (index <- indices) {
  64. isExists = isExists && jedis.getbit(key, index)
  65. }
  66. isExists
  67. }
  68. }

pom.xml

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <maven.compiler.source>1.8</maven.compiler.source>
  4. <maven.compiler.target>1.8</maven.compiler.target>
  5. <scala.version>2.11.12</scala.version>
  6. <scala.binary.version>2.11</scala.binary.version>
  7. <scala.complete.version>${scala.binary.version}.8</scala.complete.version>
  8. <hadoop.version>2.7.6</hadoop.version>
  9. <spark.version>2.4.0</spark.version>
  10. <slf4j.version>1.7.20</slf4j.version>
  11. </properties>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.scala-lang</groupId>
  15. <artifactId>scala-library</artifactId>
  16. <version>${scala.version}</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.spark</groupId>
  20. <artifactId>spark-core_2.11</artifactId>
  21. <version>${spark.version}</version>
  22. <scope>provided</scope>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.spark</groupId>
  26. <artifactId>spark-sql_2.11</artifactId>
  27. <version>${spark.version}</version>
  28. <scope>provided</scope>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.spark</groupId>
  32. <artifactId>spark-hive_2.11</artifactId>
  33. <version>${spark.version}</version>
  34. <scope>provided</scope>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.hadoop</groupId>
  38. <artifactId>hadoop-client</artifactId>
  39. <version>${hadoop.version}</version>
  40. <scope>provided</scope>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.hadoop</groupId>
  44. <artifactId>hadoop-common</artifactId>
  45. <version>${hadoop.version}</version>
  46. <scope>provided</scope>
  47. </dependency>
  48. <dependency>
  49. <groupId>redis.clients</groupId>
  50. <artifactId>jedis</artifactId>
  51. <version>3.1.0</version>
  52. </dependency>
  53. </dependencies>
  54. <build>
  55. <pluginManagement>
  56. <plugins>
  57. <!-- 该插件用于将Scala代码编译成class文件 -->
  58. <plugin>
  59. <groupId>net.alchim31.maven</groupId>
  60. <artifactId>scala-maven-plugin</artifactId>
  61. <version>3.2.0</version>
  62. </plugin>
  63. <plugin>
  64. <groupId>org.apache.maven.plugins</groupId>
  65. <artifactId>maven-compiler-plugin</artifactId>
  66. <version>3.8.1</version>
  67. </plugin>
  68. </plugins>
  69. </pluginManagement>
  70. <plugins>
  71. <plugin>
  72. <groupId>net.alchim31.maven</groupId>
  73. <artifactId>scala-maven-plugin</artifactId>
  74. <executions>
  75. <execution>
  76. <id>scala-compile-first</id>
  77. <phase>process-resources</phase>
  78. <goals>
  79. <goal>add-source</goal>
  80. <goal>compile</goal>
  81. </goals>
  82. </execution>
  83. <execution>
  84. <id>scala-test-compile</id>
  85. <phase>process-test-resources</phase>
  86. <goals>
  87. <goal>testCompile</goal>
  88. </goals>
  89. </execution>
  90. </executions>
  91. </plugin>
  92. <plugin>
  93. <groupId>org.apache.maven.plugins</groupId>
  94. <artifactId>maven-compiler-plugin</artifactId>
  95. <executions>
  96. <execution>
  97. <phase>compile</phase>
  98. <goals>
  99. <goal>compile</goal>
  100. </goals>
  101. </execution>
  102. </executions>
  103. </plugin>
  104. <plugin>
  105. <groupId>org.apache.maven.plugins</groupId>
  106. <artifactId>maven-shade-plugin</artifactId>
  107. <version>2.4.3</version>
  108. <executions>
  109. <execution>
  110. <phase>package</phase>
  111. <goals>
  112. <goal>shade</goal>
  113. </goals>
  114. <configuration>
  115. <filters>
  116. <filter>
  117. <artifact>*:*</artifact>
  118. <excludes>
  119. <exclude>META-INF/*.SF</exclude>
  120. <exclude>META-INF/*.DSA</exclude>
  121. <exclude>META-INF/*.RSA</exclude>
  122. </excludes>
  123. </filter>
  124. </filters>
  125. </configuration>
  126. </execution>
  127. </executions>
  128. </plugin>
  129. </plugins>
  130. </build>

 Redis

RDB文件 

结果 

  1. hash 5 {37, 41, 61, 79, 83} 256m
  2. 原表 导入 差值
  3. 17440258 17440227 31
  4. 6675384 6675384 0
  5. 11730951 11730951 0
  6. 3489212 3489212 0
  7. 总数
  8. 39,335,80539,335,77431
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/462677
推荐阅读
相关标签
  

闽ICP备14008679号