当前位置:   article > 正文

RoaringBitMap在ClickHouse和Spark之间的实践-解决数据仓库预计算多维分析问题_clickhouse roaringbitmap

clickhouse roaringbitmap


  • 数据量过大 很常见
  • 单条数据存储过大 很少有单个字段单条数据超过几百兆或者几个G的
  • 单个Task计算逻辑非常复杂





* 统计基数的量
* 当前数据的散列程度



  • 只求数据业务方需要的数据,很难预判业务方的需求,毕竟阴晴不定。
  • 一是最大限度缩小无关联维度的组合 还没想到解决办法,记个代办
  • 按需自助取数

这里我们针对第三点,前面已经把数据聚合到了DWS,也就是最细粒度的维度+RoaringBitMap< userid >,那我们不提供ADS,业务方需要的话,自己根据DWS查就行。

维度2RBM< user >
维度3RBM< user >


Spark RoaringBitMapUDAF聚合函数实现

 * @description roaring64NavigableMap 采用红黑树的RBM实现方法
public class RoaringBitMapNavigableUDAF extends AbstractGenericUDAFResolver implements Serializable {

     * @return 返回去重Buffer
     * @description UDAF初始化 仅仅支持一个参数 传进来的为TextWritable 读取Object可以强制转为Text
     * */
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
        // TODO Auto-generated method stub
        if (parameters == null || parameters.length != 1) {
            throw new UDFArgumentTypeException(0, "仅支持一个参数!");
        return new RoaringBitMapNavigableUDAF.BitmapDistinctUDAFBuffer();

     * BitMap去重 静态内部类
     * 作为 COMPLETE(PARTIAL1) -> PARTIAL2 -> FINAL 几个过程的处理
     * */
    public static class BitmapDistinctUDAFBuffer  extends GenericUDAFEvaluator  {
        // 输入类型
        PrimitiveObjectInspector inputType;

         * @return 约定每个过程的输出类型。即告诉下个过程我将传入Byte[]数组过来
         * @description 初始化函数 如果为COMPLETE(PARTIAL1)过程 约定原始输入类型类PrimitiveObjectInspector(Text),用户传入字符串即可 其余模式下为Byte[]
         * */
        public ObjectInspector init(GenericUDAFEvaluator.Mode m, ObjectInspector[] parameters)
                throws HiveException {
            super.init(m, parameters);
            if(Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m)){
                inputType = (PrimitiveObjectInspector) parameters[0];
            return PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector;

         * @description 聚合的Buffer类
         * */
        public GenericUDAFEvaluator.AggregationBuffer getNewAggregationBuffer() throws HiveException {
            return new RoaringBitMapNavigableUDAF.BitmapAggrBuffer();

         * @description 重置清空buffer
         * */
        public void reset(GenericUDAFEvaluator.AggregationBuffer aggregationBuffer) throws HiveException {
            ((RoaringBitMapNavigableUDAF.BitmapAggrBuffer) aggregationBuffer).reset();

         * @param aggregationBuffer 聚合去重Buffer处理器
         * @param objects 对象列表
         * @description 由聚合Buffer处理器处理每一个object
         * */
        public void iterate(GenericUDAFEvaluator.AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {
            if(objects == null) {
            for(Object object: objects){
                if(object != null) {
                    Object input = inputType.getPrimitiveWritableObject(object);
                    RoaringBitMapNavigableUDAF.BitmapAggrBuffer bitmapAggrBufferr = (RoaringBitMapNavigableUDAF.BitmapAggrBuffer) aggregationBuffer;
                    bitmapAggrBufferr.calcResult((Text) input); // 这里强制转为了Text。注意BitMap只能处理数字,所以传进来的必须是数字 字符串 否则会报错

         * @param aggregationBuffer 聚合去重Buffer处理器
         * @description 将Buffer中的RoaringBitmap序列化成字节数组,输出给下游shuffle。
         * */
        public Object terminatePartial(GenericUDAFEvaluator.AggregationBuffer aggregationBuffer) throws HiveException {
            RoaringBitMapNavigableUDAF.BitmapAggrBuffer bitmapAggrBuffer = (RoaringBitMapNavigableUDAF.BitmapAggrBuffer) aggregationBuffer;
            // 约定字节输出流
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = null;
            try {
                objectOutputStream = new ObjectOutputStream(out);
                objectOutputStream.flush(); // 记得要flush一下
                return out.toByteArray();
            } catch (IOException e) {
            } finally {
                // 关闭流
                try {
                    if (objectOutputStream != null) {
                        objectOutputStream.close(); // 关闭外部流即可 内部流会自己关闭
                } catch (IOException e) {
            return new byte[0];

         * @param aggregationBuffer 聚合去重Buffer处理器
         * @param o 另一个需要待合并的 聚合去重Buffer处理器
         * @description 这里在下游聚合的时候,不同上游传了同一个Group key但是不同value,
         *  此时value已经是序列化之后的RoaringBitMap对象object,需要将每一个object反序列化成RoaringBitMap
         *  然后利用aggregationBuffer 将每个RoaringBitMap合并起来
         *  这里的合并采用OR操作,因为涉及到同一个key的value去重,只需要任何一个value的某个bit有值,那么就说明这个bit代表的值是有的,即把bit当做标记而已。
         * */
        public void merge(GenericUDAFEvaluator.AggregationBuffer aggregationBuffer, Object o) throws HiveException {
            RoaringBitMapNavigableUDAF.BitmapAggrBuffer bitmapAggrBuffer = (RoaringBitMapNavigableUDAF.BitmapAggrBuffer) aggregationBuffer;
            if (o != null) {
                byte[] bytes = (byte[]) o; // 将object转为byte[] 可能有坑
                ByteArrayInputStream in = new ByteArrayInputStream(bytes);
                ObjectInputStream inputStream = null;
                try {
                    Roaring64NavigableMap mergeInputStreamBitMap = new Roaring64NavigableMap();
                    inputStream = new ObjectInputStream(in);
                } catch (IOException | ClassNotFoundException e) {
                } finally {
                    try {
                        if(inputStream != null) {
                    } catch (IOException ex) {


         * @description 最终的结果输出 将aggregationBuffer里面的Roaring中的BitMap反序列化成Byte[]输出
         * */
        public Object terminate(GenericUDAFEvaluator.AggregationBuffer aggregationBuffer) throws HiveException {
            RoaringBitMapNavigableUDAF.BitmapAggrBuffer bitmapAggrBuffer = (RoaringBitMapNavigableUDAF.BitmapAggrBuffer) aggregationBuffer;
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = null;
            try {
                objectOutputStream = new ObjectOutputStream(out);
                return out.toByteArray();
            } catch (IOException e) {
            } finally {
                // 清理流的状态
                try {
                    if (objectOutputStream != null) {
                        objectOutputStream.close(); // 关闭外部流即可 内部流会自己关闭
                } catch (IOException e) {
            return new byte[0];

     * @description 聚合去重BitMap去重buffer处理器
     * */
    static class BitmapAggrBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer implements Serializable  {
        // Bitmap 64位实例化对象
        Roaring64NavigableMap roaring64NavigableMap = new Roaring64NavigableMap();
        // 重置
        public void reset() {
         * @param value 传入的为Text类型,如果不能转成数字 说明会出错
         * @description 计算结果
         * */
        public void calcResult(Text value) {
            String s = value.toString();
            if(s != null && !"".equals(s)) {
                } catch (Exception e){
         * @param o 传入的另一RoaringBitmap对象
         * @description 这里去重使用位图的OR操作,只要任何一个bitmap对象的某一个bit有值,说明这个bit代表的内容是有值的,也就是说合并之后的RoaringBitmap在这一位应该置为1
         * */
        public void merge(Object o){
            roaring64NavigableMap.or((Roaring64NavigableMap) o);

        public Long getResult() {
            if (roaring64NavigableMap.isEmpty()) {
                return null;
            return roaring64NavigableMap.getLongCardinality();

         * @description 将结果输出到output流里面
         * */
        public void writeBytes(ObjectOutputStream objectOutputStream){
            try {
            } catch (IOException e) {


        public void or(Roaring64NavigableMap roaring64NavigableMap){

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231


这里注意,我测试的是ClickHouse 21.1版本以上,20及之前版本序列化格式会有所区别,这三个是ClickHouse RBM实现相关代码,根据这些代码,我弄了一个java二进制序列化ClickHouse存储方式的版本。
ClickHouse VarInt实现

 * 将二进制的Roaring64NavigableMap对象先反序列化成对象
 * 这里对应的是ClickHouse21.1版本以上 将其定制序列化为ClickHouse的CRoaring
 * 序列化主要分为四部分
 * * 第一部分:一个byte 如果基数小于32用smallSet,用0表示。否则用1表示,用RBM
 * * 第二部分:实际数据需要的字节大小
 * * 第三部分:针对RBM实用的,RBM高位的字节数
 * * 第四部分:数据内容
 * 最后BASE64编码成字符串,最终写入到ClickHouse
public class BinToRbm64NavigableWithClickHouse extends UDF {
    public String evaluate(Object o) {
        String encodeResult = "";
        byte[] bytes = (byte[]) o;
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = null;
        Roaring64NavigableMap roaring64NavigableMap = new Roaring64NavigableMap();
        try {
            objectInputStream = new ObjectInputStream(in);
            // 先获取容量
            long longCardinality = roaring64NavigableMap.getLongCardinality();
            // 当基数小于32时,采用SmallSet存储
            if(longCardinality <= 32){
                // 分配缓冲区大小
                ByteBuffer initBuffer = ByteBuffer.allocate(2 + 8 * roaring64NavigableMap.getIntCardinality());
                ByteBuffer bosBuffer = null;
                if(initBuffer.order() == ByteOrder.LITTLE_ENDIAN){
                    bosBuffer = initBuffer;
                } else {
                    bosBuffer = initBuffer.slice().order(ByteOrder.LITTLE_ENDIAN);
                bosBuffer.put((new Integer(0)).byteValue());
                bosBuffer.put(new Integer((roaring64NavigableMap.getIntCardinality())).byteValue());
                long[] roaring64BitMapLong = roaring64NavigableMap.toArray();
                for (long l : roaring64BitMapLong) {
                encodeResult = Base64.getEncoder().encodeToString(bosBuffer.array());
            }else {
                //rb.serializedSizeInBytes() 需要序列化的字节数
                int serialByteSize = (int)roaring64NavigableMap.serializedSizeInBytes();
                int rbTotalSize = serialByteSize - 5 + 8;
                int varLongLen = VarIntParse.varIntSize(rbTotalSize);

                ByteBuffer initBuffer = ByteBuffer.allocate(varLongLen + 1 + rbTotalSize);
                ByteBuffer bosBuffer;
                if(initBuffer.order() == ByteOrder.LITTLE_ENDIAN){
                    bosBuffer = initBuffer;
                } else {
                    bosBuffer = initBuffer.slice().order(ByteOrder.LITTLE_ENDIAN);

                bosBuffer.put((new Integer(1)).byteValue());
                VarIntParse.putVarInt((int)serialByteSize, bosBuffer);
                // getHighToBitmap是Roaring64NavigableMap的私有方法 不能直接调用,利用反射调用即可
                Method method = roaring64NavigableMap.getClass().getDeclaredMethod("getHighToBitmap", null);
                NavigableMap<Integer, BitmapDataProvider> highToBitMap = (NavigableMap<Integer, BitmapDataProvider>) method.invoke(roaring64NavigableMap, null);

                ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
                roaring64NavigableMap.serialize(new DataOutputStream(byteOutputStream));
                byte[] outPutPre = byteOutputStream.toByteArray();
                byte[] outPutResult = Arrays.copyOfRange(outPutPre, 5, serialByteSize);
                encodeResult = Base64.getEncoder().encodeToString(bosBuffer.array());
        } catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
        } finally {
            try {
                if(objectInputStream != null) {
            } catch (IOException ex) {
        return encodeResult;


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84


 * @date 2022/11/09
 * @description VarInt解析 主要用来序列化和ClickHouse保持一致 详见https://github.com/bazelbuild/bazel/blob/master/src/main/java/com/google/devtools/build/lib/util/VarInt.java
public class VarIntParse {

     * Maximum encoded size of 32-bit positive integers (in bytes)
    public static final int MAX_VARINT_SIZE = 5;

     * maximum encoded size of 64-bit longs, and negative 32-bit ints (in bytes)
    public static final int MAX_VARLONG_SIZE = 10;

    private VarIntParse() { }

    /** Returns the encoding size in bytes of its input value.
     * @param i the integer to be measured
     * @return the encoding size in bytes of its input value
    public static int varIntSize(int i) {
        int result = 0;
        do {
            i >>>= 7;
        } while (i != 0);
        return result;

     * Reads a varint from src, places its values into the first element of dst and returns the offset
     * in to src of the first byte after the varint.
     * @param src source buffer to retrieve from
     * @param offset offset within src
     * @param dst the resulting int value
     * @return the updated offset after reading the varint
    public static int getVarInt(byte[] src, int offset, int[] dst) {
        int result = 0;
        int shift = 0;
        int b;
        do {
            if (shift >= 32) {
                // Out of range
                throw new IndexOutOfBoundsException("varint too long");
            // Get 7 bits from next byte
            b = src[offset++];
            result |= (b & 0x7F) << shift;
            shift += 7;
        } while ((b & 0x80) != 0);
        dst[0] = result;
        return offset;

     * Reads a varint from the current position of the given ByteBuffer and returns the decoded value
     * as 32 bit integer.
     * <p>The position of the buffer is advanced to the first byte after the decoded varint.
     * @param src the ByteBuffer to get the var int from
     * @return The integer value of the decoded varint
    public static int getVarInt(ByteBuffer src) {
        int tmp;
        if ((tmp = src.get()) >= 0) {
            return tmp;
        int result = tmp & 0x7f;
        if ((tmp = src.get()) >= 0) {
            result |= tmp << 7;
        } else {
            result |= (tmp & 0x7f) << 7;
            if ((tmp = src.get()) >= 0) {
                result |= tmp << 14;
            } else {
                result |= (tmp & 0x7f) << 14;
                if ((tmp = src.get()) >= 0) {
                    result |= tmp << 21;
                } else {
                    result |= (tmp & 0x7f) << 21;
                    result |= (tmp = src.get()) << 28;
                    while (tmp < 0) {
                        // We get into this loop only in the case of overflow.
                        // By doing this, we can call getVarInt() instead of
                        // getVarLong() when we only need an int.
                        tmp = src.get();
        return result;

     * Reads a varint from the given InputStream and returns the decoded value as an int.
     * @param inputStream the InputStream to read from
    public static int getVarInt(InputStream inputStream) throws IOException {
        int result = 0;
        int shift = 0;
        int b;
        do {
            if (shift >= 32) {
                // Out of range
                throw new IndexOutOfBoundsException("varint too long");
            // Get 7 bits from next byte
            b = inputStream.read();
            result |= (b & 0x7F) << shift;
            shift += 7;
        } while ((b & 0x80) != 0);
        return result;

     * Encodes an integer in a variable-length encoding, 7 bits per byte, into a destination byte[],
     * following the protocol buffer convention.
     * @param v the int value to write to sink
     * @param sink the sink buffer to write to
     * @param offset the offset within sink to begin writing
     * @return the updated offset after writing the varint
    public static int putVarInt(int v, byte[] sink, int offset) {
        do {
            // Encode next 7 bits + terminator bit
            int bits = v & 0x7F;
            v >>>= 7;
            byte b = (byte) (bits + ((v != 0) ? 0x80 : 0));
            sink[offset++] = b;
        } while (v != 0);
        return offset;

     * Encodes an integer in a variable-length encoding, 7 bits per byte, to a ByteBuffer sink.
     * @param v the value to encode
     * @param sink the ByteBuffer to add the encoded value
    public static void putVarInt(int v, ByteBuffer sink) {
        while (true) {
            int bits = v & 0x7f;
            v >>>= 7;
            if (v == 0) {
                sink.put((byte) bits);
            sink.put((byte) (bits | 0x80));

     * Encodes an integer in a variable-length encoding, 7 bits per byte, and writes it to the given
     * OutputStream.
     * @param v the value to encode
     * @param outputStream the OutputStream to write to
    public static void putVarInt(int v, OutputStream outputStream) throws IOException {
        byte[] bytes = new byte[varIntSize(v)];
        putVarInt(v, bytes, 0);

     * Returns the encoding size in bytes of its input value.
     * @param v the long to be measured
     * @return the encoding size in bytes of a given long value.
    public static int varLongSize(long v) {
        int result = 0;
        do {
            v >>>= 7;
        } while (v != 0);
        return result;

     * Reads an up to 64 bit long varint from the current position of the
     * given ByteBuffer and returns the decoded value as long.
     * <p>The position of the buffer is advanced to the first byte after the
     * decoded varint.
     * @param src the ByteBuffer to get the var int from
     * @return The integer value of the decoded long varint
    public static long getVarLong(ByteBuffer src) {
        long tmp;
        if ((tmp = src.get()) >= 0) {
            return tmp;
        long result = tmp & 0x7f;
        if ((tmp = src.get()) >= 0) {
            result |= tmp << 7;
        } else {
            result |= (tmp & 0x7f) << 7;
            if ((tmp = src.get()) >= 0) {
                result |= tmp << 14;
            } else {
                result |= (tmp & 0x7f) << 14;
                if ((tmp = src.get()) >= 0) {
                    result |= tmp << 21;
                } else {
                    result |= (tmp & 0x7f) << 21;
                    if ((tmp = src.get()) >= 0) {
                        result |= tmp << 28;
                    } else {
                        result |= (tmp & 0x7f) << 28;
                        if ((tmp = src.get()) >= 0) {
                            result |= tmp << 35;
                        } else {
                            result |= (tmp & 0x7f) << 35;
                            if ((tmp = src.get()) >= 0) {
                                result |= tmp << 42;
                            } else {
                                result |= (tmp & 0x7f) << 42;
                                if ((tmp = src.get()) >= 0) {
                                    result |= tmp << 49;
                                } else {
                                    result |= (tmp & 0x7f) << 49;
                                    if ((tmp = src.get()) >= 0) {
                                        result |= tmp << 56;
                                    } else {
                                        result |= (tmp & 0x7f) << 56;
                                        result |= ((long) src.get()) << 63;
        return result;

     * Encodes a long integer in a variable-length encoding, 7 bits per byte, to a
     * ByteBuffer sink.
     * @param v the value to encode
     * @param sink the ByteBuffer to add the encoded value
    public static void putVarLong(long v, ByteBuffer sink) {
        while (true) {
            int bits = ((int) v) & 0x7f;
            v >>>= 7;
            if (v == 0) {
                sink.put((byte) bits);
            sink.put((byte) (bits | 0x80));

    public static void putVarLong(long v, OutputStream outputStream) throws IOException {
        byte[] bytes = new byte[varLongSize(v)];
        ByteBuffer sink = ByteBuffer.wrap(bytes);
        putVarLong(v, sink);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272


create table ...(
	dim1 string,
	dim2 string,
	encode string, -- 加密二进制RBM
	roaring_bitmap AggregateFunction(groupBitmap, UInt32)  
	            MATERIALIZED base64Decode(encode) -- 解密并反序列化成Croaring

from table
where dim1 = 'ak' and dim2 = 'ko
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13




