当前位置:   article > 正文

FFT之串行+MPI并行实现(C语言版)_fft 多通道并行模式

fft 多通道并行模式


前言

FFT(Fast Fourier Transformation),能够在时间复杂度为O(nlogn)的时间内将多项式的系数表示法转换成点值表示法。


一、系数表示法和点值表示法

1.1、系数表示法

f ( x ) = ∑ i = 0 n − 1 a i x i f(x) = \displaystyle\sum_{i=0}^{n-1} a_ix_i f(x)=i=0n1aixi 可以用每一项的系数表示,即:

f ( x ) = ( a 0 , a 1 , . . . , a n − 1 ) f(x) = (a_0, a_1, ..., a_{n-1}) f(x)=(a0,a1,...,an1)

就是平常最常使用的方法

1.2、点值表示法

  把 n 个不同 x x x 代入 f ( x ) f(x) f(x),会得出 n 个不同的值,根据插值法原理可知这 n 个点可以唯一确定一个多项式,即 f ( x ) f(x) f(x),所以:

f ( x ) = ( ( x 0 , f ( x 0 ) , ( x 1 , f ( x 1 ) . . . ( x n − 1 , f ( x n − 1 ) ) f(x) = ((x_0, f(x_0), (x_1, f(x_1) ... (x_{n-1}, f(x_{n-1})) f(x)=((x0,f(x0),(x1,f(x1)...(xn1,f(xn1))

1.3、多项式相乘时的时间复杂度

当两个 n-1 次多项式 f ( x ) , g ( x ) f(x), g(x) f(x),g(x)相乘时:
   ⋅ · 系数表示法的复杂度: O ( n 2 ) O(n^2) O(n2);
   ⋅ · 点值表示法的复杂度: O ( n l o g n ) O(n logn) O(nlogn);
所以:将系数表示法转换成点值表示法,可以有效降低复杂度;

但是 不能直接代入 n 个任意不同的 x x x值,因为从系数表示法转换到点值表示法还是 O ( n 2 ) O(n^2) O(n2)而FFT就能够在 O ( n l o g n ) O(nlogn) O(nlogn)的条件下完成这项任务

二、FFT预备知识

2.1、复数

  在复数范围内令 ω n = 1 \omega^n = 1 ωn=1,可以得到 n 个不同的复数根{ ω n 0 , ω n 1 , . . . , ω n n − 1 \omega_n^0, \omega_n^1, ..., \omega_n^{n-1} ωn0,ωn1,...,ωnn1},且均匀分布在模长为1的圆上,
摘取自:https://blog.csdn.net/enjoy_pascal/article/details/81478582单位根的性质:

  1) ω n m = ω 2 n 2 m \omega_n^m = \omega_{2n}^{2m} ωnm=ω2n2m;

  2) ω n m = − ω n m + n 2 \omega_n^m = -\omega_{n}^{m+\frac{n}{2}} ωnm=ωnm+2n;

  3) ( ω n m ) 2 = ω n 2 m (\omega_n^m)^2 = \omega_{n}^{2m} (ωnm)2=ωn2m;

  4) ω n m = c o s m n 2 π + i s i n m n 2 π \omega_n^m = cos\frac{m}{n}2π+isin\frac{m}{n}2π ωnm=cosnm2π+isinnm2π;

2.2、蝶形变换

对于 A ( x ) = ∑ i = 0 n − 1 a i x i = a 0 ​ + a 1 ​ x 1 + a 2 ​ x 2 + . . . + a n − 1 ​ x n − 1 : A(x)=\displaystyle\sum_{i=0}^{n-1} a_ix_i = a_0​+a_1​x^1+a_2​x^2+...+a_{n−1}​x^{n−1}: A(x)=i=0n1aixi=a0+a1x1+a2x2+...+an1xn1:

A ( x ) A(x) A(x) 下标的奇偶性 A ( x ) A(x) A(x)分成两半,右边再提一个 x : x: x:

   A ( x ) = ( a 0 ​ + a 2 ​ x 2 + . . . + a n − 2 ​ x n − 2 ) + ( a 1 ​ x 1 + a 3 ​ x 3 + . . . + a n − 1 ​ x n − 1 ) A(x)=(a_0​+a_2​x^2+...+a_{n−2}​x^{n−2})+(a_1​x^1+a_3​x^3+...+a_{n−1}​x^{n−1}) A(x)=(a0+a2x2+...+an2xn2)+(a1x1+a3x3+...+an1xn1)

     = ( a 0 ​ + a 2 ​ x 2 + . . . + a n − 2 ​ x n − 2 ) + x ( a 1 ​ + a 3 ​ x 2 + . . . + a n − 2 ​ x n − 2 ) =(a_0​+a_2​x^2+...+a_{n−2}​x^{n−2})+x(a_1​+a_3​x^2+...+a_{n−2}​x^{n−2}) =(a0+a2x2+...+an2xn2)+x(a1+a3x2+...+an2xn2)

(注意:这里默认n是 2 k ( k ∈ N + ) 2^k(k \in N^+) 2k(kN+),否则可以先对n进行扩展)

A 0 ( x ) = a 0 ​ + a 2 ​ x 1 + . . . + a n − 2 ​ x n 2 − 1 A_0(x) = a_0​+a_2​x^1+...+a_{n−2}​x^{\frac{n}{2}−1} A0(x)=a0+a2x1+...+an2x2n1

A 1 ( x ) = a 1 ​ + a 3 ​ x 1 + . . . + a n − 2 ​ x n 2 − 1 A_1(x) = a_1​+a_3​x^1+...+a_{n−2}​x^{\frac{n}{2}−1} A1(x)=a1+a3x1+...+an2x2n1

所以有 A ( x ) = A 0 ( x 2 ) + x A 1 ( x 2 ) A(x) = A_0(x^2) + xA_1(x^2) A(x)=A0(x2)+xA1(x2)

A ( x ) A(x) A(x)代入 ω n m ( m < n 2 ) \omega_n^m(m < \frac{n}{2}) ωnm(m<2n)有:

   A ( ω n m ) = A 0 ​ ( ( ω n m ​ ) 2 ) + ω n m ​ A 1 ​ ( ( ω n m ​ ) 2 ) A(\omega_n^m)=A_0​((\omega_n^m​)^2)+\omega_n^m​A_1​((ω_n^m​)^2) A(ωnm)=A0((ωnm)2)+ωnmA1((ωnm)2)

     = A 0 ( ω n 2 m ) + ω n m A 1 ( ω n 2 m ) = A 0 ( ω n 2 m ) + ω n m A 1 ( ω n 2 m ) = A_0(\omega_n^{2m}) + \omega_n^mA_1(\omega_n^{2m} ) = A_0 (\omega_\frac{n}{2}^m) + \omega_n^mA_1(\omega_\frac{n}{2}^m) =A0(ωn2m)+ωnmA1(ωn2m)=A0(ω2nm)+ωnmA1(ω2nm);

再代入 ω n m + n 2 : \omega_n^{m+\frac{n}{2}}: ωnm+2n:

   A ( ω n m + n 2 ​ ) = A 0 ​ ( ω n 2 m + n ​ ) + ω n m + n 2 ​​ A 1 ( ω n 2 m + n ​ ) A(ω_n^{m+\frac{n}{2}​} )=A_0​(ω_n^{2m+n}​)+ω_n^{m+\frac{n}{2}}​​A_1(ω_n^{2m+n}​) A(ωnm+2n)=A0(ωn2m+n)+ωnm+2n​​A1(ωn2m+n)

     = A 0 ( ω n 2 m ω n n ) − ω n m A 1 ( ω n 2 m ω n n ) = A 0 ​ ( ω n 2 m ​​ ) − ω n m ​ A 1 ( ω n 2 m ​​ ) =A_0 (ω_n^{2m} ω_n^n ) − ω_n^mA_1 (ω_n^{2m} ω_n^n) =A_0​(ω_{n\over2}^m​​)−ω_n^m​A_1(ω_{n\over2}^m​​) =A0(ωn2mωnn)ωnmA1(ωn2mωnn)=A0(ω2nm​​)ωnmA1(ω2nm​​);

So: A ( ω n m ) = A 0 ( ω n 2 m ) + ω n m A 1 ( ω n 2 m ) A(\omega_n^m)= A_0 (\omega_\frac{n}{2}^m) + \omega_n^mA_1(\omega_\frac{n}{2}^m) A(ωnm)=A0(ω2nm)+ωnmA1(ω2nm);

A ( ω n m + n 2 ​ ) = A 0 ​ ( ω n 2 m ​​ ) − ω n m ​ A 1 ( ω n 2 m ​​ ) A(ω_n^{m+\frac{n}{2}​} ) =A_0​(ω_{n\over2}^m​​)−ω_n^m​A_1(ω_{n\over2}^m​​) A(ωnm+2n)=A0(ω2nm​​)ωnmA1(ω2nm​​);

即当知道 A 0 A_0 A0 A 1 A_1 A1时,可以同时知道 A ( ω n m ) 和 A ( ω n m + n 2 ​ ) A(\omega_n^m)和A(\omega_n^{m+\frac{n}{2}​}) A(ωnm)A(ωnm+2n)的值(满足以下蝶形计算形式);
A 0 A_0 A0 A 1 A_1 A1的问题规模都是 A ( x ) A(x) A(x)一半,所以FFT可以在 O ( n l o g n ) O(n logn) O(nlogn)的条件下完成。
摘取自:https://zhuanlan.zhihu.com/p/135259438
当n = 8时,蝶形图如下:
摘取自:https://zhuanlan.zhihu.com/p/135259438说明:最左边是输入系数, x ( i ) x(i) x(i)代表第 i + 1 i+1 i+1个系数,最右边是输出结果,即{ ω n 0 , ω n 1 , . . . , ω n n − 1 \omega_n^0, \omega_n^1, ..., \omega_n^{n-1} ωn0,ωn1,...,ωnn1}对应的函数值;
观察输入数据可知:在做蝶形计算之前需要先进行一定的排序(即后面要说的码位互换);

2.3、码位倒序

当n=8时,码位倒序图:
摘取自:https://zhuanlan.zhihu.com/p/135259438>

  观察可知:倒序数和计算的顺序数一致,所以要在输入系数后对其进行码位倒序。

三、FFT串行实现

3.1、串行代码:

/*
 * @author yimik
 * @date 2020/10/13
 * @encoding GBK
 * */
#define _USE_MATH_DEFINES
#include<iostream>
#include<stdlib.h>
#include<math.h>

using namespace std;

int Extend(int num_coef);
void ReverseOrder(int n, float* coefR, float* coefI);
void Input(int num_coef, int n, float* dataR, float* dataI);
void Calculate(int P, int n, int r, int B, float* dataR, float* dataI);
void Print(int n, float* dataR, float* dataI);

int main(int argc, char** argv) {
    //使用定点计数法,显示小数点后面位数精度为2
    cout.setf(ios_base::fixed, ios_base::floatfield);
	cout.setf(ios_base::showpoint);
	cout.precision(2);
	
    //系数输入个数
    int num_coef;
    cout << "the number of coefficients you want to input is: "; 
    cin >> num_coef;
    //系数数组, 进程数组
    float *dataR, *dataI;
    
    //扩展系数
    int n = Extend(num_coef);
    //分配空间
    dataR = new float[n];
    dataI = new float[n];
    
    //系数输入
    Input(num_coef, n, dataR, dataI);
    //码位倒序
    ReverseOrder(n, dataR, dataI);
    
    //蝶形级数
    int M = log(n) / log(2);
    //FFT蝶形级数L从1--M
    for(int L=1; L<=M;L++){
        /*第L级的运算*/  
        //间隔 B = 2^(L-1);
        int B = pow(2, L - 1);
        for(int j = 0; j <= B - 1; j++){
            /*同种蝶形运算*/  
            //增量k=2^(M-L)
            int k = pow(2,M-L);
            //计算旋转指数p,增量为k时,则P=j*k
            int P = j * k;
            for(int i = 0; i <= k - 1; i++){
            /*进行蝶形运算*/
            //数组下标定为r
            int r = j + 2 * B * i;
            Calculate(P, n, r, B, dataR, dataI);        }
        }
    }
    
    //输出
    Print(n, dataR, dataI);
}
/*
 *@function 将系数项数扩展至2^X
 *@param num_coef: 输入的系数项数
 *@return 扩展后的项数
 * */
int Extend(int num_coef) {
    int m = log(num_coef) / log(2);
    if(num_coef == pow(2, m))
        return(num_coef);
    else
        return(pow(2, m + 1));
}

/*
 * @function 码位交换
 * @param n: 系数个数
 * @param coefR: 实部数组
 * @param coefI: 虚部数组
 *  */
void ReverseOrder(int n, float* coefR, float* coefI) {
    //码位个数
    int m = log(n) / log(2);
    //码位高位、低位
    int head, rear;
    //低位、高位为1 的数
    int a1, an;
    //码位交换后的数,用于交换的temp
    int j;
    float tempR, tempI;
    //遍历系数 
    for (int i = 0; i < pow(2, m); i++) {
        j = 0;
        for (int k = 0; k < (m + 1) / 2; k++) {
            //第一位为1
            a1 = 1;
            //第M位为1
            an = pow(2, m - 1);
            //移位
            a1 = a1 << k;
            an = an >> k;
            //取高位、低位
            head = i & an;
            rear = i & a1;
            //交换码位模块
            if (0 != head)    j = j | a1;
            if (0 != rear)    j = j | an;
        }
        //数组元素交换
        if (i < j) {
            tempR = coefR[i];
	        tempI = coefI[i];
            coefR[i] = coefR[j];
	        coefI[i] = coefI[j];
            coefR[j] = tempR;	    
            coefI[j] = tempI;
        }
    }
}


/*
 * @function 输入系数数组的实部和虚部
 * @param n: 系数个数
 * @param dataR: 实部
 * @param dataI: 虚部
 * */
void Input(int num_coef, int n, float* dataR, float* dataI){
    cout << "R: ";
    int i;
    for (int i = 0; i < num_coef; i++) {
        cin >> dataR[i];
    }
    cout << "I: ";
    for (i = 0; i < num_coef; i++) {
        cin >> dataI[i];
    }
    if (n > num_coef)
        for(; i < n; i++){
            dataR[i] = 0;
            dataI[i] = 0;
        }
}

/* 
 * @function 进行碟形计算
 * @param P: 旋转因子
 * @param n: 系数个数
 * @param r: 数组下标
 * @param B: 间隔
 * @param dataR: 实部
 * @param dataI: 虚部
 * */
void Calculate(int P, int n, int r, int B, float* dataR, float* dataI){
    /*进行蝶形运算*/
    //t = W * A1, W = cos(2 * PI * P / n) - i * sin(2 * PI * P / n)
    float tR = dataR[r + B] * cos(2 * M_PI * P / n) + dataI[r + B] * sin(2 * M_PI * P / n);
    float tI = dataI[r + B] * cos(2 * M_PI * P / n) - dataR[r + B] * sin(2 * M_PI * P / n);
    //A(r) = A0 + W * A1
    //A(r + B) = A0 - W * A1
    dataR[r + B] = dataR[r] - tR;
    dataI[r + B] = dataI[r] - tI;
    dataR[r] = dataR[r] + tR;
    dataI[r] = dataI[r] + tI;
}

/* 
 * @function 输出数组(复数形式)
 * */
void Print(int n, float* dataR, float* dataI){
    cout << "result:" << endl;
    for (int i = 0; i < n; i++) {
        if(dataI[i] >= 0)
            cout << dataR[i] << "+" << dataI[i] << "i" << "\t";
        else
            cout << dataR[i] << dataI[i] << "i" << "\t";
    }
    cout << endl;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184

结果截图:
FFT串行结果
结果说明: f ( ω n i ) = r e s u l t [ i ] f(\omega_n^i)=result[i] f(ωni)=result[i]

四、FFT并行实现

4.1、并行分析

现在用np(进程数)=3, n(系数项数)=8的例子说明MPI并行是如何实现的。
当系数项数n=8,进程数np = 3时,进程资源划分如下:
资源划分
说明:
  1)在每个进程中用 m d a t a R [ ] 、 m d a t a I [ ] mdataR[]、mdataI[] mdataR[]mdataI[]来存储结果的实部和虚部;
  2)在每级蝶形计算前先把上一级的结果广播给每个进程(从其他进程里面获取数据可能发生死锁);
  3)未被分配的元素会由0号进程处理;

在进行第L级计算时,遍历 m d a d a R 、 m d a t a I mdadaR、mdataI mdadaRmdataI
  若是已分配的元素:
  1)若对应蝶形组的上半部分,且蝶形组的下半部分对应的元素也在此进程中,则直接更新两个元素:
https://editor.csdn.net/md?articleId=109055800
  2)若对应蝶形组的上半部分,且蝶形组的下半部分对应的元素是不在此进程中,对下半部分对应的元素进行分析:
    2.1)若是已分配的元素:更新当前元素,同时将下半部分对应的元素发送给指定进程,如图2.1:
    2.1)若是未分配的元素:更新当前元素,但是不发送下半部分对应的元素(发送给0号进程可能造成死锁),如图2.2:

https://editor.csdn.net/md?articleId=109055800

图2.1

https://editor.csdn.net/md?articleId=109055800

图2.2

注意:这里的发送的数据指的是计算的结果,并非蝶形组左边的数据,还望不要被图片所误导,这样标注仅是为了说明进程间的发送情况。
  3)若对应蝶形组的下半部分,则接收其他进程发送过来的数据:
https://editor.csdn.net/md?articleId=109055800
  若是未分配的元素:
  1)若对应蝶形组的上半部分,则直接更新两个元素:
https://editor.csdn.net/md?articleId=109055800
  2)若对应蝶形组的下半部分,则更新此元素:
https://editor.csdn.net/md?articleId=109055800
当M级蝶形计算计算完毕,即可得到结果。

4.2、并行代码:

/*
 * @author yimik
 * @date 2020/10/9
 * @encoding GBK
 * */
#define _USE_MATH_DEFINES
#include<iostream>
#include<stdlib.h>
#include<math.h>
#include<mpi.h>

using namespace std;

/*输入格式:mpirun -n size ./myFFT num_coef
 *说明:size: 设置的进程数,可任取一个小于num_coef的数;
       num_coef: 输入的系数项数,可任意取值;
 * */
int Extend(int num_coef);
void ReverseOrder(int n, float* coefR, float* coefI);
void Input(int num_coef, int n, float* dataR, float* dataI);
void Calculate(int P, int n, int r, int B, float* dataR, float* dataI, float& tempR, float& tempI, float& mdataR, float& mdataI);
void Print(int n, float* dataR, float* dataI);

int main(int argc, char** argv) {
    //使用定点计数法,显示小数点后面位数精度为2
    cout.setf(ios_base::fixed, ios_base::floatfield);
	cout.setf(ios_base::showpoint);
	cout.precision(2);
	
    //系数输入个数
    int num_coef = atoi(argv[1]);
    //系数数组, 进程数组
    float *dataR, *dataI, *mdataR, *mdataI;
    //中间数
    float tempR, tempI;
    
    int myrank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Status status;
    
    //扩展系数
    int n = Extend(num_coef);
    //每个进程分配的元素个数
    int msize = n / size;
    //分配空间
    mdataR = new float[msize];
    mdataI = new float[msize];
    dataR = new float[n];
    dataI = new float[n];
    
    //0号进程对系数数组进行输入
    if(0 == myrank){
        //系数输入
        Input(num_coef, n, dataR, dataI);
        //码位倒序
        ReverseOrder(n, dataR, dataI);
    }
    //等待系数数组输入完毕
    MPI_Barrier(MPI_COMM_WORLD);
    
    //蝶形级数
    int M = log(n) / log(2);
    //mdata(0) 在 data数组中对应的下标
    int start = myrank * msize;
    //FFT蝶形级数L从1--M
    for (int L = 1; L <= M; L++){
        MPI_Bcast(dataR, n, MPI_FLOAT, 0, MPI_COMM_WORLD);
        MPI_Bcast(dataI, n, MPI_FLOAT, 0, MPI_COMM_WORLD);
        
        /*第L级的运算*/
        //同一组蝶形计算的间隔 B = 2^(L-1);
        int B = (int)pow(2, L - 1);
        //旋转指数P的增量k=2^(M-L)
        int k = (int)pow(2, M - L);
        
        for (int j = 0; j < msize; j++){
            //数组下标定为r
            int r = start + j;
            
            //如果对应蝶形组的下方,即后半部分
            if ((r % (2 * B)) >= B){
                //如果不由本进程更新,则接收其他进程发送过来的结果
                if(j - B < 0) {
                    MPI_Recv(&mdataR[j], 1, MPI_FLOAT, (r - B) / msize, r, MPI_COMM_WORLD, &status);
                    MPI_Recv(&mdataI[j], 1, MPI_FLOAT, (r - B) / msize, r, MPI_COMM_WORLD, &status);
                    //cout << "Level" << L << ": proccess" << myrank << " Recv " << "proccess" << (r - B) / msize << ": " << r << "<-" << r - B << endl;
                }
                continue;
            }
            
            //计算旋转指数p
            int P = (r % B) * k;
            
            /*进行蝶形运算*/
            Calculate(P, n, r, B, dataR, dataI, tempR, tempI, mdataR[j], mdataI[j]);
            //如果data(r + B)不属于本进程mdata
            if(j + B + 1 > msize){
                //如果data(r + B) 已被分配所到进程中则将结果发送过去,否则不发送
                if(r + B < msize * size){
                    MPI_Send(&tempR, 1, MPI_FLOAT, (r + B) / msize, r + B, MPI_COMM_WORLD);
                    MPI_Send(&tempI, 1, MPI_FLOAT, (r + B) / msize, r + B, MPI_COMM_WORLD);
                    //cout << "Level" << L << ": proccess" << myrank << " Send " << "proccess" << (r + B) / msize << ": " << r << "->" << r - B << endl;
                }
            }
            //如果A(r + B)属于本进程mdata则直接进行更新
            else{
                mdataR[j + B] = tempR;
                mdataI[j + B] = tempI;
            }
        }
        
        /*处理未被分配的元素*/
        //未被分配元素开始的下标
        int remainStartID = msize * size;
        //在0号进程中直接处理未被分配的元素,当remainStartID == num_coef时说明全部被分配
        if((0 == myrank) && (remainStartID != n)){
            for(int j = 0; j < n - remainStartID; j++){
                int r = remainStartID + j;
                
                //计算旋转指数p
                int P = (r % B) * k;
                
                //如果位于蝶形组下方,即后半部分
                if ((r % (2 * B)) >= B) {
                    //如果不由处理未被分配元素的模块更新
                    if(j - B < 0) {
                        //t = W * A1, W = cos(2 * PI * P / n) - i * sin(2 * PI * P / n)
                        float tR = dataR[r] * cos(2 * M_PI * P / n) + dataI[r] * sin(2 * M_PI * P / n);
                        float tI = dataI[r] * cos(2 * M_PI * P / n) - dataR[r] * sin(2 * M_PI * P / n);
                        //A(r) = A0 + W * A1
                        //A(r + B) = A0 - W * A1
                        dataR[r] = dataR[r - B] - tR;
                        dataI[r] = dataI[r - B] - tI;
                    }
                    continue;
                }
                
                /*进行蝶形运算*/
                Calculate(P, n, r, B, dataR, dataI, dataR[r + B], dataI[r + B], dataR[r], dataI[r]);
            }
        }
        //等待此级蝶形变换全部计算完毕
        MPI_Barrier(MPI_COMM_WORLD);
        //从各个进程中收集结果,更新dataR, dataI
        MPI_Gather(mdataR, msize, MPI_FLOAT, dataR, msize, MPI_FLOAT, 0, MPI_COMM_WORLD);
        MPI_Gather(mdataI, msize, MPI_FLOAT, dataI, msize, MPI_FLOAT, 0, MPI_COMM_WORLD);
    }
    if(0 == myrank)
        Print(n, dataR, dataI);
    
    //delete[] dataR;
    //delete[] dataI;
    //delete[] mdataR;
    //delete[] mdataI;
    MPI_Finalize();

    return 0;
}

/*
 *@function 将系数项数扩展至2^X
 *@param num_coef: 输入的系数项数
 *@return 扩展后的项数
 * */
int Extend(int num_coef) {
    int m = log(num_coef) / log(2);
    if(num_coef == pow(2, m))
        return(num_coef);
    else
        return(pow(2, m + 1));
}

/*
 * @function 码位交换
 * @param n: 系数个数
 * @param coefR: 实部数组
 * @param coefI: 虚部数组
 *  */
void ReverseOrder(int n, float* coefR, float* coefI) {
    //码位个数
    int m = log(n) / log(2);
    //码位高位、低位
    int head, rear;
    //低位、高位为1 的数
    int a1, an;
    //码位交换后的数,用于交换的temp
    int j;
    float tempR, tempI;
    //遍历系数 
    for (int i = 0; i < pow(2, m); i++) {
        j = 0;
        for (int k = 0; k < (m + 1) / 2; k++) {
            //第一位为1
            a1 = 1;
            //第M位为1
            an = pow(2, m - 1);
            //移位
            a1 = a1 << k;
            an = an >> k;
            //取高位、低位
            head = i & an;
            rear = i & a1;
            //交换码位模块
            if (0 != head)    j = j | a1;
            if (0 != rear)    j = j | an;
        }
        //数组元素交换
        if (i < j) {
            tempR = coefR[i];
	        tempI = coefI[i];
            coefR[i] = coefR[j];
	        coefI[i] = coefI[j];
            coefR[j] = tempR;	    
            coefI[j] = tempI;
        }
    }
}


/*
 * @function 输入系数数组的实部和虚部
 * @param n: 系数个数
 * @param dataR: 实部
 * @param dataI: 虚部
 * */
void Input(int num_coef, int n, float* dataR, float* dataI){
    cout << "R: ";
    int i;
    for (int i = 0; i < num_coef; i++) {
        cin >> dataR[i];
    }
    cout << "I: ";
    for (i = 0; i < num_coef; i++) {
        cin >> dataI[i];
    }
    if (n > num_coef)
        for(; i < n; i++){
            dataR[i] = 0;
            dataI[i] = 0;
        }
}

/* 
 * @function 进行碟形计算
 * @param P: 旋转因子
 * @param n: 系数个数
 * @param r: 数组下标
 * @param B: 间隔
 * @param dataR、mdataR、tempR: 实部
 * @param dataI、mdataI、tempI: 虚部
 * @param j: 进程中结果数组的下标
 * */
void Calculate(int P, int n, int r, int B, float* dataR, float* dataI, float& tempR, float& tempI, float& mdataR, float& mdataI){
    /*进行蝶形运算*/
    //t = W * A1, W = cos(2 * PI * P / n) - i * sin(2 * PI * P / n)
    float tR = dataR[r + B] * cos(2 * M_PI * P / n) + dataI[r + B] * sin(2 * M_PI * P / n);
    float tI = dataI[r + B] * cos(2 * M_PI * P / n) - dataR[r + B] * sin(2 * M_PI * P / n);
    //A(r) = A0 + W * A1
    //A(r + B) = A0 - W * A1
    tempR = dataR[r] - tR;
    tempI = dataI[r] - tI;
    mdataR = dataR[r] + tR;
    mdataI = dataI[r] + tI;
}

/* 
 * @function 输出数组(复数形式)
 * */
void Print(int n, float* dataR, float* dataI){
    cout << "result:" << endl;
    for (int i = 0; i < n; i++) {
        if(dataI[i] >= 0)
            cout << dataR[i] << "+" << dataI[i] << "i" << "\t";
        else
            cout << dataR[i] << dataI[i] << "i" << "\t";
    }
    cout << endl;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280

结果截图:
FFT并行结果

参考文献

[1]路人黑的纸巾,十分简明易懂的FFT(快速傅里叶变换),https://blog.csdn.net/enjoy_pascal/article/details/81478582,2018-08-07/2020-10-14.
[2]GoKing,C语言系列之FFT算法实现,https://zhuanlan.zhihu.com/p/135259438,~/2020-10-14.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/468695
推荐阅读
相关标签
  

闽ICP备14008679号