当前位置:   article > 正文

PostgreSQL源码学习——手动增加信息熵聚合函数_hashfn.h

hashfn.h

Get Your Hands Dirty!

这是对于中国人民大学信息学院研究生课——PostgreSQL源码分析课程作业的回顾。
PostgreSQL的源码中定义了很多内建的函数(build-in function),这些函数是在数据库启动时就可以使用的,一般是一些比较重要和常用的函数。作为内核学习的第一步,我选择了自己实现一个内建的聚合函数,并重新编译运行数据库。
此文要综合用到前三篇博客的内容:

一、明确目标

我们的目标是增加一些聚合函数,功能为计算信息熵相关的值(很有DB for AI)的感觉了。
具体来讲,我们打算要实现以下四个函数:

1.1 entropy

H = − ∑ i = 1 N p ( x i ) ⋅ l o g p ( x i ) H=-\sum^{N}_{i=1}p(x_{i})·{\rm log} p(x_i) H=i=1Np(xi)logp(xi)

1.2 cross_entropy

H ( p , q ) = − ∑ i = 1 N p ( x i ) ⋅ l o g q ( x i ) H(p,q)=-\sum^{N}_{i=1}p(x_{i})·{\rm log} q(x_i) H(p,q)=i=1Np(xi)logq(xi)

1.3 kl_divergence

D K L ( p ∣ ∣ q ) = ∑ i = 1 N P ( x i ) ⋅ ( l o g p ( x i ) − l o g q ( x i ) ) D_{KL}(p||q)=\sum^{N}_{i=1}P(x_{i})·({\rm log} p(x_i)-{\rm log} q(x_i)) DKL(pq)=i=1NP(xi)(logp(xi)logq(xi))

1.4 js_divergence

D J S ( p ∣ ∣ q ) = 1 2 D K L ( P ∣ ∣ M ) + 1 2 D K L ( M ∣ ∣ Q ) D_{JS}(p||q)=\frac{1}{2} D_{KL}(P||M)+\frac{1}{2} D_{KL}(M||Q) DJS(pq)=21DKL(PM)+21DKL(MQ)

这四个函数的参数可以允许多种类型,目前我们设计允许int8(即一字节大小的整型数字)、text(PostgreSQL中的特殊类型,用来存储字符串),text底层实现就是一个包含长度信息和一个柔性数组的结构体(柔性数组的讲解看这里)。这一来,一共要实现8个聚合函数,分别为:

输入为int的聚合函数输入为text的聚合函数
entropy(int, int)entropy(text, text)
cross_entropy(int, int)cross_entropy(text, text)
kl_divergence(int, int)kl_divergence(text, text)
js_divergence(int, int)js_divergence(text, text)

通过查阅文档,我们可以看到,一个聚合函数至少有两个函数来实现:

  1. 聚集转换函数(aggregate transition function):该函数接收聚合函数的输入参数以及初始,并且返回一个转换后的状态。
  2. 聚集最终函数(aggregate final function):该函数接受转换完成之后的状态,并且计算最终结果当作聚集函数的返回值。

此外,还可以添加聚集结合函数(aggregate combine function):这个函数接收两个状态,合并为一个状态,并且返回,主要是当并行计算时,可能多组同时计算得到多个中间状态,需要通过这样的函数去合并这些状态。

进行下面步骤之前,我们先用源码安装好PostgreSQL(教程见这里)。在学习如何新增函数的过程中,少不了对原有的函数的学习,这个过程中需要调试来辅助学习,因此还应该先学习这篇文章:【如何调试PG】。

二、新增一个.c文件

我们新增一个.c文件,具体加在{PGPATH}/src/backend/utils/adt/entropy.c这个位置。{PGPATH}是你的PostgreSQL源码位置。具体为什么是这里呢,因为我们看源码发现,这个adt文件夹下面,都是针对内建的数据类型使用的各种函数的实现,比如求和、求方差、求平均数、求斜率等函数都实现在这个文件夹下。这里的adt,其实就是我们数据结构中抽象数据类型(Abstract Data Type)的缩写。所以大胆猜想,文件应该加在这里。
里面具体的一些写法,如果看不懂没关系,很多是PG的宏定义,或者一些PG系统的特性(比如context的切换)。只需要打开几个其他文件,看懂算法,然后照抄即可,毕竟我们首要目标是实现我们的功能。

下面的代码看起来虽长,但结构比较清晰,自我感觉还是很好懂的:

2.1 定义及实现函数

实现了上面8个函数每一个的转换函数、组合函数、最终函数。实现的函数有:
主要的函数,这些函数都需要之后写入PostgreSQL的初始函数表:
1.1. Datum int_entropy_accum(PG_FUNCTION_ARGS)
1.2. Datum text_entropy_accum(PG_FUNCTION_ARGS)
1.3. Datum entropy_combine(PG_FUNCTION_ARGS)
1.4. Datum entropy_final(PG_FUNCTION_ARGS)
1.5. Datum cross_int_entropy_accum(PG_FUNCTION_ARGS)
1.6. Datum cross_text_entropy_accum(PG_FUNCTION_ARGS)
1.7. Datum cross_entropy_combine(PG_FUNCTION_ARGS)
1.8. Datum cross_entropy_final(PG_FUNCTION_ARGS)
1.9. Datum kld_final(PG_FUNCTION_ARGS)
1.10. Datum jsd_final(PG_FUNCTION_ARGS)

辅助的函数,这里的函数只在该模块内部调用:
1.11. int APHash(char *str)
1.12. static ItemPs *makeItemPsAggState(FunctionCallInfo fcinfo)
1.13. static TwoItemPs *makeTwoItemPsAggState(FunctionCallInfo fcinfo)
1.14. void freeItemPs(ItemPs *state)
1.15. void freeTwoItemPs(TwoItemPs *state)
1.16. int is_in(int value, int *arr, int length)
1.17. void add_value_to_state(ItemPs *state, int value, int count, bool N_add)

2.2 定义了两种结构体

用ItemPs和TwoItemPs,去保存聚合函数的中间状态。由于我们要计算熵,ItemPs状态需要保存要计算的元素总数N、保存各个元素的值category、各个元素的统计数目counts,num记录元素的种类数,也即category和counts保存值的个数,size记录总的开辟空间的大小。TwoItemPs由两个ItemPs构成。
具体定义如下:

typedef struct ItemPs
{
   int N;         // 元素数量统计
   int num;       // 已经有的种类个数
   int size;      // 总的长度
   int *counts;   // 各个元素的个数统计
   int *category; // 各个元素的大小
} ItemPs;

typedef struct TwoItemPs
{
   ItemPs *x;
   ItemPs *y;
} TwoItemPs;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.3 内存管理

在写C代码中,要注意内存的开辟与释放。PostgreSQL有一套自己的内存上下文管理系统,所以要用PostgreSQL提供的palloc、pfree、repalloc等函数,以及分配内存时要注意使用MemoryContextSwitchTo函数,此处不详细说明,用的时候只需要照抄别的函数即可。

2.4 对输入返回的说明

输入的PG_FUNCTION_ARGS其实是一个宏定义,就是FunctionCallInfo fcinfo,保存了调用这个函数的函数的信息以及输入的参数列表。
返回值中的Datum其实就是一个long long int,用来存储一个地址,代表一个指针,定义及相关函数操作有一个专门的模块datum.c,也在adt文件夹下面。

/*-------------------------------------------------------------------------
 - entropy.c
 - a module contain some functions defined by Qinliang Xue
 - 
 - TODO: 
 - - mse function
 - - Wasserstein distance
 - 
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <ctype.h>
#include <float.h>
#include <limits.h>
#include <math.h>
#include <string.h>

#include "catalog/pg_type.h"
#include "common/hashfn.h"
#include "common/int.h"
#include "funcapi.h"
#include "lib/hyperloglog.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "nodes/supportnodes.h"
#include "utils/array.h"
#include "utils/arrayaccess.h"
#include "utils/datum.h"
#include "utils/float.h"
#include "utils/guc.h"
#include "utils/int8.h"
#include "utils/numeric.h"
#include "utils/sortsupport.h"
#include "utils/builtins.h"

#define INIT_NUM 10

typedef struct ItemPs
{
    int N;         // 元素数量统计
    int num;       // 已经有的种类个数
    int size;      // 总的长度
    int *counts;   // 各个元素的个数统计
    int *category; // 各个元素的大小
} ItemPs;

typedef struct TwoItemPs
{
    ItemPs *x;
    ItemPs *y;
} TwoItemPs;

/* APHash - convert a string to a int
 * used in text entropy functions
 */
int APHash(char *str)
{
    int hash = 0;
    int i;

    for (i = 0; *str; i++)
    {
        if ((i & 1) == 0)
        {
            hash ^= ((hash << 7) ^ (*str++) ^ (hash >> 3));
        }
        else
        {
            hash ^= (~((hash << 11) ^ (*str++) ^ (hash >> 5)));
        }
    }

    return (hash & 0x7FFFFFFF);
}


/* makeItemPsAggState
 * Init an ItemPs state and return it,
 * the input parameter fcinfo contains the called function's information
 */
static ItemPs *
makeItemPsAggState(FunctionCallInfo fcinfo)
{
    ItemPs *state;
    MemoryContext aggcontext;
    MemoryContext oldcontext;

    if (!AggCheckCallContext(fcinfo, &aggcontext))
    {
        /* cannot be called directly because of internal-type argument */
        elog(ERROR, "aggregate function entropy called in non-aggregate context");
    }

    /*
	 * Create state in aggregate context.  It'll stay there across subsequent
	 * calls.
	 */
    oldcontext = MemoryContextSwitchTo(aggcontext);
    state = (ItemPs *)palloc(sizeof(ItemPs));
    state->category = (int *)palloc(sizeof(int) * INIT_NUM);
    state->counts = (int *)palloc(sizeof(int) * INIT_NUM);
    state->size = INIT_NUM;
    state->num = 0;
    state->N = 0;
    MemoryContextSwitchTo(oldcontext);

    return state;
}

/* makeTowItemPsAggState
 * Init a TwoItemPs state and return it,
 * the input parameter fcinfo contains the called function's information
 */
static TwoItemPs *
makeTwoItemPsAggState(FunctionCallInfo fcinfo)
{
    TwoItemPs *state;
    MemoryContext aggcontext;
    MemoryContext oldcontext;

    if (!AggCheckCallContext(fcinfo, &aggcontext))
    {
        /* cannot be called directly because of internal-type argument */
        elog(ERROR, "aggregate function entropy called in non-aggregate context");
    }

    /*
	 * Create state in aggregate context.  It'll stay there across subsequent
	 * calls.
	 */
    oldcontext = MemoryContextSwitchTo(aggcontext);
    state = (TwoItemPs *)palloc(sizeof(TwoItemPs));
    state->x = makeItemPsAggState(fcinfo);
    state->y = makeItemPsAggState(fcinfo);
    MemoryContextSwitchTo(oldcontext);

    return state;
}

void freeItemPs(ItemPs *state)
{
    pfree(state->category);
    pfree(state->counts);
    pfree(state);
}

void freeTwoItemPs(TwoItemPs *state)
{
    freeItemPs(state->x);
    freeItemPs(state->y);
    pfree(state);
}

/*
 * is_in - check a item in an array
 * the simplest way to find in order
 */
int is_in(int value, int *arr, int length)
{
    for (int i = 0; i < length; i++)
    {
        if (arr[i] == value)
        {
            return i;
        }
    }
    return -1;
}

void add_value_to_state(ItemPs *state, int value, int count, bool N_add)
{
    int index = is_in(value, state->category, state->num);
    if (index >= 0)
    {
        state->counts[index] += count;
    }
    else
    {
        if (state->num == state->size)
        {
            state->size += INIT_NUM;
            state->category = (int *)repalloc(state->category, sizeof(int) * INIT_NUM);
            state->counts = (int *)repalloc(state->counts, sizeof(int) * INIT_NUM);
        }
        state->category[state->num] = value;
        state->counts[state->num++] = count;
    }
    if (N_add)
        state->N += count;
}

/*
 * int_entropy_accum - the aggregate transition function of entropy(int, int)
 * input params: PG_FUNCTION_ARGS is a marco definition of FunctionCallInfo fcinfo, containing the actural args
 * we can get args using PG_GETARG_{type}(args_order), for example, PG_GETARG_DATUM(1) means get the second datum args.
 * return: Datum is a long long int, this is a address of pointer
 */
Datum
    int_entropy_accum(PG_FUNCTION_ARGS)
{
    ItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (ItemPs *)PG_GETARG_POINTER(0);
    if (state == NULL)
    {
        state = makeItemPsAggState(fcinfo);
    }
    int value = PG_GETARG_DATUM(1);
    add_value_to_state(state, value, 1, true); // first add, so count is 1
    PG_RETURN_POINTER(state);
}

Datum
    text_entropy_accum(PG_FUNCTION_ARGS)
{
    ItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (ItemPs *)PG_GETARG_POINTER(0);
    if (state == NULL)
    {
        state = makeItemPsAggState(fcinfo);
    }
    int value = APHash(text_to_cstring(PG_GETARG_TEXT_PP(1))); // text need hash func to int
    add_value_to_state(state, value, 1, true);                 // first add, so count is 1
    PG_RETURN_POINTER(state);
}

Datum
    entropy_combine(PG_FUNCTION_ARGS)
{
    ItemPs *state1;
    ItemPs *state2;
    state1 = (ItemPs *)PG_GETARG_POINTER(0);
    state2 = (ItemPs *)PG_GETARG_POINTER(1);
    int i;
    for (i = 0; i < state2->num; i++)
    {
        add_value_to_state(state1, state2->category[i], state2->counts[i], true);
    }
    PG_RETURN_POINTER(state1);
}

Datum
    entropy_final(PG_FUNCTION_ARGS)
{
    ItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (ItemPs *)PG_GETARG_POINTER(0);
    float8 entro = 0;
    int cate_num = state->num;
    int *counts = state->counts;
    int *category = state->category;
    int N = state->N;
    int j;
    for (j = 0; j < cate_num; j++)
    {
        float8 p = (float8)(1.0 * counts[j] / N);
        entro += -p * log(p);
    }
    freeItemPs(state);
    PG_RETURN_FLOAT8(entro);
}

Datum
    cross_int_entropy_accum(PG_FUNCTION_ARGS)
{
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    if (state == NULL)
    {
        state = makeTwoItemPsAggState(fcinfo);
    }
    int value1 = PG_GETARG_DATUM(1);
    int value2 = PG_GETARG_DATUM(2);
    int index1 = is_in(value1, state->x->category, state->x->num);
    int index12 = is_in(value1, state->y->category, state->y->num);
    int index2 = is_in(value2, state->y->category, state->y->num);
    int index21 = is_in(value2, state->x->category, state->x->num);

    add_value_to_state(state->x, value1, 1, true);
    add_value_to_state(state->y, value1, 0, false);
    add_value_to_state(state->y, value2, 1, true);
    add_value_to_state(state->x, value2, 0, false);

    PG_RETURN_POINTER(state);
}

Datum
    cross_text_entropy_accum(PG_FUNCTION_ARGS)
{
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    if (state == NULL)
    {
        state = makeTwoItemPsAggState(fcinfo);
    }
    int value1 = APHash(text_to_cstring(PG_GETARG_TEXT_PP(1)));
    int value2 = APHash(text_to_cstring(PG_GETARG_TEXT_PP(2)));
    int index1 = is_in(value1, state->x->category, state->x->num);
    int index12 = is_in(value1, state->y->category, state->y->num);
    int index2 = is_in(value2, state->y->category, state->y->num);
    int index21 = is_in(value2, state->x->category, state->x->num);

    add_value_to_state(state->x, value1, 1, true);
    add_value_to_state(state->y, value1, 0, false);
    add_value_to_state(state->y, value2, 1, true);
    add_value_to_state(state->x, value2, 0, false);
    PG_RETURN_POINTER(state);
}

Datum
    cross_entropy_combine(PG_FUNCTION_ARGS)
{
    TwoItemPs *state1;
    TwoItemPs *state2;
    state1 = (TwoItemPs *)PG_GETARG_POINTER(0);
    state2 = (TwoItemPs *)PG_GETARG_POINTER(1);
    int i;
    for (i = 0; i < state2->x->num; i++)
    {
        add_value_to_state(state1->x, state2->x->category[i], state2->x->counts[i], true);
        add_value_to_state(state1->y, state2->x->category[i], 0, false);
        add_value_to_state(state1->y, state2->y->category[i], state2->y->counts[i], true);
        add_value_to_state(state1->x, state2->y->category[i], 0, false);
    }
    PG_RETURN_POINTER(state1);
}

Datum
    cross_entropy_final(PG_FUNCTION_ARGS)
{
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    float8 entro = 0;
    if (state->x->num != state->y->num)
    {
        PG_RETURN_NULL();
    }
    if (state->x->N != state->y->N)
    {
        PG_RETURN_NULL();
    }
    int cate_num = state->x->num;
    int i;
    for (i = 0; i < cate_num; i++)
    {
        if (state->x->category[i] != state->y->category[i])
        {
            PG_RETURN_NULL();
        }
    }
    int *counts1 = state->x->counts;
    int *counts2 = state->y->counts;
    int N = state->x->N;
    int j;
    for (j = 0; j < cate_num; j++)
    {
        float8 p = (float8)(1.0 * counts1[j] / N);
        float8 q = (float8)(1.0 * counts2[j] / N);
        entro += -p * log(q);
    }
    freeTwoItemPs(state);
    PG_RETURN_FLOAT8(entro);
}

Datum
    kld_final(PG_FUNCTION_ARGS)
{
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    float8 entro = 0;
    if (state->x->num != state->y->num)
    {
        PG_RETURN_NULL();
    }
    if (state->x->N != state->y->N)
    {
        PG_RETURN_NULL();
    }
    int cate_num = state->x->num;
    int i;
    for (i = 0; i < cate_num; i++)
    {
        if (state->x->category[i] != state->y->category[i])
        {
            PG_RETURN_NULL();
        }
    }
    int *counts1 = state->x->counts;
    int *counts2 = state->y->counts;
    int N = state->x->N;
    int j;
    for (j = 0; j < cate_num; j++)
    {
        float8 p = (float8)(1.0 * counts1[j] / N);
        float8 q = (float8)(1.0 * counts2[j] / N);
        entro += p * (log(p) - log(q));
    }
    freeTwoItemPs(state);
    PG_RETURN_FLOAT8(entro);
}

Datum
    jsd_final(PG_FUNCTION_ARGS)
{
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    float8 entro = 0;
    if (state->x->num != state->y->num)
    {
        PG_RETURN_NULL();
    }
    if (state->x->N != state->y->N)
    {
        PG_RETURN_NULL();
    }
    int cate_num = state->x->num;
    int i;
    for (i = 0; i < cate_num; i++)
    {
        if (state->x->category[i] != state->y->category[i])
        {
            PG_RETURN_NULL();
        }
    }
    int *counts1 = state->x->counts;
    int *counts2 = state->y->counts;
    int N = state->x->N;
    int j;
    for (j = 0; j < cate_num; j++)
    {
        float8 p = (float8)(1.0 * counts1[j] / N);
        float8 q = (float8)(1.0 * counts2[j] / N);
        float8 m = (p + q) / 2;
        float8 e1 = p * (log(p) - log(m));
        float8 e2 = q * (log(q) - log(m));
        entro += ((e1 + e2) / 2);
    }
    freeTwoItemPs(state);
    PG_RETURN_FLOAT8(entro);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
  • 389
  • 390
  • 391
  • 392
  • 393
  • 394
  • 395
  • 396
  • 397
  • 398
  • 399
  • 400
  • 401
  • 402
  • 403
  • 404
  • 405
  • 406
  • 407
  • 408
  • 409
  • 410
  • 411
  • 412
  • 413
  • 414
  • 415
  • 416
  • 417
  • 418
  • 419
  • 420
  • 421
  • 422
  • 423
  • 424
  • 425
  • 426
  • 427
  • 428
  • 429
  • 430
  • 431
  • 432
  • 433
  • 434
  • 435
  • 436
  • 437
  • 438
  • 439
  • 440
  • 441

三、修改builtins.h文件

这个文件位置是{PGPATH}/src/include/utils/builtins.h为什么builtin.h文件要修改呢?这里其实偷了个懒,按理说我们应该新增一个.h文件,但由于实现的函数很少,就直接在builtins.h中添加了,可以达到同样的效果。
随便找个地方,加入以下几行,也就是我们上面实现的几个要用到的函数。这里是实现函数的声明。

/* entropy.c */
extern Datum int_entropy_accum(PG_FUNCTION_ARGS);
extern Datum text_entropy_accum(PG_FUNCTION_ARGS);
extern Datum entropy_combine(PG_FUNCTION_ARGS);
extern Datum entropy_final(PG_FUNCTION_ARGS);

extern Datum cross_int_entropy_accum(PG_FUNCTION_ARGS);
extern Datum cross_text_entropy_accum(PG_FUNCTION_ARGS);
extern Datum cross_entropy_combine(PG_FUNCTION_ARGS);
extern Datum cross_entropy_final(PG_FUNCTION_ARGS);
extern Datum kld_final(PG_FUNCTION_ARGS);
extern Datum jsd_final(PG_FUNCTION_ARGS);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

四、修改部分配置文件

这里修改配置文件之后,数据库在编译时会自动更新bki文件(bki即为backend interface),在初始化的时候根据bki脚本来初始化数据库。要修改的文件有:

  1. {PGPATH}/src/include/catalog/pg_proc.adt

修改pg_proc.adt是因为pg_proc表存储关于函数(或过程)的信息,这是一个系统表,在数据库初始化的时候会建立,只有把一个函数定义写入这个表,数据库才会在启动的时候加入该函数。

具体修改如下:pg_proc.adt其实是一个大型的list,内部包含了很多dict,每个dict对应一个函数的实现,在列表末尾加入以下内容(其实和顺序无关):

# entropy
{ oid => '5106', descr => 'aggregate transition function',
  proname => 'int_entropy_accum', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal int8', prosrc => 'int_entropy_accum' },
{ oid => '5107', descr => 'aggregate transition function',
  proname => 'text_entropy_accum', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal text', prosrc => 'text_entropy_accum' },
{ oid => '5108', descr => 'aggregate combine function',
  proname => 'entropy_combine', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal internal', prosrc => 'entropy_combine' },
{ oid => '5109', descr => 'aggregate final function',
  proname => 'entropy_final', prorettype => 'float8',
  proargtypes => 'internal', prosrc => 'entropy_final' },
{ oid => '5110',descr => 'calculate int entropy',
  proname => 'entropy', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'int8',
  prosrc => 'aggregate_dummy' },
{ oid => '5111',descr => 'calculate text entropy',
  proname => 'entropy', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'text',
  prosrc => 'aggregate_dummy' },

# cross_entropy
{ oid => '5112', descr => 'aggregate transition function',
  proname => 'cross_int_entropy_accum', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal int8 int8', prosrc => 'cross_int_entropy_accum' },
{ oid => '5113', descr => 'aggregate transition function',
  proname => 'cross_text_entropy_accum', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal text text', prosrc => 'cross_text_entropy_accum' },
{ oid => '5114', descr => 'aggregate combine function',
  proname => 'cross_entropy_combine', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal internal', prosrc => 'cross_entropy_combine' },
{ oid => '5115', descr => 'aggregate final function',
  proname => 'cross_entropy_final', prorettype => 'float8',
  proargtypes => 'internal', prosrc => 'cross_entropy_final' },
{ oid => '5116', descr => 'calculate cross int entropy',
  proname => 'cross_entropy', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'int8 int8',
  prosrc => 'aggregate_dummy' },
{ oid => '5117', descr => 'calculate cross text entropy',
  proname => 'cross_entropy', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'text text',
  prosrc => 'aggregate_dummy' },

#kld
{ oid => '5118', descr => 'aggregate final function',
  proname => 'kld_final', prorettype => 'float8',
  proargtypes => 'internal', prosrc => 'kld_final' },
{ oid => '5119', descr => 'calculate kl divergence',
  proname => 'kld', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'int8 int8',
  prosrc => 'aggregate_dummy' },
{ oid => '5120', descr => 'calculate kl divergence',
  proname => 'kld', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'text text',
  prosrc => 'aggregate_dummy' },

#jsd
{ oid => '5121', descr => 'aggregate final function',
  proname => 'jsd_final', prorettype => 'float8',
  proargtypes => 'internal', prosrc => 'jsd_final' },
{ oid => '5122', descr => 'calculate js divergence',
  proname => 'jsd', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'int8 int8',
  prosrc => 'aggregate_dummy' },
{ oid => '5123', descr => 'calculate js divergence',
  proname => 'jsd', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'text text',
  prosrc => 'aggregate_dummy' }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

这里特别说明一下,internal代表所有非标准的自定义的类型,比如我们自己定义的struct或者class,oid是PostgreSQL数据库中的一个唯一标识码,只需要保证没有重复使用即可。要了解上面每一项的含义,可以看【文档】中对pg_proc的描述。

  1. {PGPATH}/src/include/catalog/pg_aggregate.adt

修改pg_aggregate.adt的原因同上,这个表存储了聚合函数,如果只是普通函数不用修改这里,但聚合函数需要修改。

结构和上面的pg_proc.adt很像,具体含义参见【文档】中对pg_aggregate的描述。我们在列表末尾添加如下内容:

# entropy
{ aggfnoid => 'entropy(int8)', aggtransfn => 'int_entropy_accum', 
aggfinalfn => 'entropy_final', aggtranstype => 'internal', aggcombinefn => 'entropy_combine',
aggmtransfn => 'int_entropy_accum', aggmfinalfn => 'entropy_final', aggmtranstype => 'internal'},
{ aggfnoid => 'entropy(text)', aggtransfn => 'text_entropy_accum', 
aggfinalfn => 'entropy_final', aggtranstype => 'internal', aggcombinefn => 'entropy_combine',
aggmtransfn => 'text_entropy_accum', aggmfinalfn => 'entropy_final', aggmtranstype => 'internal'},

# cross_entropy
{ aggfnoid => 'cross_entropy(int8,int8)', aggtransfn => 'cross_int_entropy_accum', 
aggfinalfn => 'cross_entropy_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_int_entropy_accum', aggmfinalfn => 'cross_entropy_final', aggmtranstype => 'internal'},
{ aggfnoid => 'cross_entropy(text,text)', aggtransfn => 'cross_text_entropy_accum', 
aggfinalfn => 'cross_entropy_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_text_entropy_accum', aggmfinalfn => 'cross_entropy_final', aggmtranstype => 'internal'},

# kld
{ aggfnoid => 'kld(int8,int8)', aggtransfn => 'cross_int_entropy_accum', 
aggfinalfn => 'kld_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_int_entropy_accum', aggmfinalfn => 'kld_final', aggmtranstype => 'internal'},
{ aggfnoid => 'kld(text,text)', aggtransfn => 'cross_text_entropy_accum', 
aggfinalfn => 'kld_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_text_entropy_accum', aggmfinalfn => 'kld_final', aggmtranstype => 'internal'},

#jsd
{ aggfnoid => 'jsd(int8,int8)', aggtransfn => 'cross_int_entropy_accum', 
aggfinalfn => 'jsd_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_int_entropy_accum', aggmfinalfn => 'jsd_final', aggmtranstype => 'internal'},
{ aggfnoid => 'jsd(text,text)', aggtransfn => 'cross_text_entropy_accum', 
aggfinalfn => 'jsd_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_text_entropy_accum', aggmfinalfn => 'jsd_final', aggmtranstype => 'internal'}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  1. {PGPATH}/src/backend/utils/adt/Makefile

Makefile是定义一个工程如何编译链接的文件。修改这个文件是因为要把新增的一个.c文件编译出来的.o文件加入到链接的列表之中,不然我们的修改不会生效。PostgreSQL的设计艺术之一就是,各种模块很多且之间互相具有比较高的独立性,我们如果修改了一个文件,可以单独重新编译.o文件,只用重新链接就可以生成数据库,而无需编译其他文件。

这个文件改动很少,只需要在OBJS的列表中加入entropy.o即可。建议按字母序添加到相应的位置,这样和PG原有风格保持一致。

  1. {PGPATH}/src/tools/pgindent/typedefs.list

这个文件定义了所有内建的数据类型的名称,我们需要把自己定义的几个类型加进去,这个目录下有pgindent等脚本会自动执行把这些类型名称写入内建类型。

这个文件改动同样很少,只需要写入两行,分别是我们自己新定义的两个结构体。建议按字母序添加到相应的位置,这样和PG原有风格保持一致。

ItemPs
TwoItemPs
  • 1
  • 2

五、重新编译运行

简单起见,可以直接全部重新编译一次,执行默认的编译命令即可,速度也是很快的。
过程是和第一次安装时候一样(见这篇博客),首先通过visual studio的交叉编译工具,进入{PGPATH}/src/tools/msvc下。
执行下面命令,E:\pgdb是我指定的安装目录。

build DEBUG
vcregress check
install E:\pgdb
  • 1
  • 2
  • 3

然后切换到安装目录的bin文件夹下,启动数据库:

initdb -D ../data
pg_ctl start -l logfile -D ../data
psql -d postgres
  • 1
  • 2
  • 3

六、测试

随便建立几个表:

CREATE TABLE public.u1 (
    n1 integer,
    n2 integer,
    n3 character varying(10),
    n4 character varying(20)
);
CREATE TABLE public.u2 (
    n1 integer,
    n2 integer,
);
CREATE TABLE public.u3 (
    n1 integer,
    n2 integer,
    n3 character varying(10),
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

随便插入一些数据:

insert into u1 values (1,2,'a','b'),(0,2,'s','b'),
	(2,1,'s','b'),(0,1,'b','s'),(2,0,'b','a');
insert into u3 values (0,1,'a'),(0,1,'a'),(1,1,'a'),
(1,0,'a'),(0,0,'a'),(0,1,'a'),(0,1,'a'),(0,1,'a'),
(0,1,'a'),(0,1,'b'),(0,0,'b'),(1,0,'b'),(0,0,'a'),
(0,0,'b'),(0,1,'a'),(0,1,'a'),(0,1,'a'),(0,1,'a'),
(0,1,'b');
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

试一下我们添加的几个函数是否成功:

select entropy(n1) from u3;
  • 1

在这里插入图片描述

select n3, cross_entropy(n1, n2) from u3 group by n3;
  • 1

在这里插入图片描述

select n3, kld(n1, n2), kld(n2, n1) from u3 group by n3;
select n3, jsd(n1, n2), jsd(n2, n1) from u3 group by n3;
  • 1
  • 2

在这里插入图片描述
如果看到这一步,就算是我们最终大功告成,可以开心地随意添加你想添加的函数进入PostgreSQL的内核中了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/553660
推荐阅读
相关标签
  

闽ICP备14008679号