赞
踩
FFT(Fast Fourier Transformation),能够在时间复杂度为O(nlogn)的时间内将多项式的系数表示法转换成点值表示法。
f
(
x
)
=
∑
i
=
0
n
−
1
a
i
x
i
f(x) = \displaystyle\sum_{i=0}^{n-1} a_ix_i
f(x)=i=0∑n−1aixi 可以用每一项的系数表示,即:
把 n 个不同的
x
x
x 代入
f
(
x
)
f(x)
f(x),会得出 n 个不同的值,根据插值法原理可知这 n 个点可以唯一确定一个多项式,即
f
(
x
)
f(x)
f(x),所以:
当两个 n-1 次多项式
f
(
x
)
,
g
(
x
)
f(x), g(x)
f(x),g(x)相乘时:
⋅
·
⋅系数表示法的复杂度:
O
(
n
2
)
O(n^2)
O(n2);
⋅
·
⋅点值表示法的复杂度:
O
(
n
l
o
g
n
)
O(n logn)
O(nlogn);
所以:将系数表示法转换成点值表示法,可以有效降低复杂度;
但是 不能直接代入 n 个任意不同的 x x x值,因为从系数表示法转换到点值表示法还是 O ( n 2 ) O(n^2) O(n2),而FFT就能够在 O ( n l o g n ) O(nlogn) O(nlogn)的条件下完成这项任务;
在复数范围内令
ω
n
=
1
\omega^n = 1
ωn=1,可以得到 n 个不同的复数根{
ω
n
0
,
ω
n
1
,
.
.
.
,
ω
n
n
−
1
\omega_n^0, \omega_n^1, ..., \omega_n^{n-1}
ωn0,ωn1,...,ωnn−1},且均匀分布在模长为1的圆上,
单位根的性质:
1) ω n m = ω 2 n 2 m \omega_n^m = \omega_{2n}^{2m} ωnm=ω2n2m;
2) ω n m = − ω n m + n 2 \omega_n^m = -\omega_{n}^{m+\frac{n}{2}} ωnm=−ωnm+2n;
3) ( ω n m ) 2 = ω n 2 m (\omega_n^m)^2 = \omega_{n}^{2m} (ωnm)2=ωn2m;
4) ω n m = c o s m n 2 π + i s i n m n 2 π \omega_n^m = cos\frac{m}{n}2π+isin\frac{m}{n}2π ωnm=cosnm2π+isinnm2π;
对于 A ( x ) = ∑ i = 0 n − 1 a i x i = a 0 + a 1 x 1 + a 2 x 2 + . . . + a n − 1 x n − 1 : A(x)=\displaystyle\sum_{i=0}^{n-1} a_ix_i = a_0+a_1x^1+a_2x^2+...+a_{n−1}x^{n−1}: A(x)=i=0∑n−1aixi=a0+a1x1+a2x2+...+an−1xn−1:
按 A ( x ) A(x) A(x) 下标的奇偶性把 A ( x ) A(x) A(x)分成两半,右边再提一个 x : x: x:
A ( x ) = ( a 0 + a 2 x 2 + . . . + a n − 2 x n − 2 ) + ( a 1 x 1 + a 3 x 3 + . . . + a n − 1 x n − 1 ) A(x)=(a_0+a_2x^2+...+a_{n−2}x^{n−2})+(a_1x^1+a_3x^3+...+a_{n−1}x^{n−1}) A(x)=(a0+a2x2+...+an−2xn−2)+(a1x1+a3x3+...+an−1xn−1)
= ( a 0 + a 2 x 2 + . . . + a n − 2 x n − 2 ) + x ( a 1 + a 3 x 2 + . . . + a n − 2 x n − 2 ) =(a_0+a_2x^2+...+a_{n−2}x^{n−2})+x(a_1+a_3x^2+...+a_{n−2}x^{n−2}) =(a0+a2x2+...+an−2xn−2)+x(a1+a3x2+...+an−2xn−2)
(注意:这里默认n是 2 k ( k ∈ N + ) 2^k(k \in N^+) 2k(k∈N+),否则可以先对n进行扩展)
令 A 0 ( x ) = a 0 + a 2 x 1 + . . . + a n − 2 x n 2 − 1 A_0(x) = a_0+a_2x^1+...+a_{n−2}x^{\frac{n}{2}−1} A0(x)=a0+a2x1+...+an−2x2n−1;
A 1 ( x ) = a 1 + a 3 x 1 + . . . + a n − 2 x n 2 − 1 A_1(x) = a_1+a_3x^1+...+a_{n−2}x^{\frac{n}{2}−1} A1(x)=a1+a3x1+...+an−2x2n−1;
所以有 A ( x ) = A 0 ( x 2 ) + x A 1 ( x 2 ) A(x) = A_0(x^2) + xA_1(x^2) A(x)=A0(x2)+xA1(x2)
对 A ( x ) A(x) A(x)代入 ω n m ( m < n 2 ) \omega_n^m(m < \frac{n}{2}) ωnm(m<2n)有:
A ( ω n m ) = A 0 ( ( ω n m ) 2 ) + ω n m A 1 ( ( ω n m ) 2 ) A(\omega_n^m)=A_0((\omega_n^m)^2)+\omega_n^mA_1((ω_n^m)^2) A(ωnm)=A0((ωnm)2)+ωnmA1((ωnm)2)
= A 0 ( ω n 2 m ) + ω n m A 1 ( ω n 2 m ) = A 0 ( ω n 2 m ) + ω n m A 1 ( ω n 2 m ) = A_0(\omega_n^{2m}) + \omega_n^mA_1(\omega_n^{2m} ) = A_0 (\omega_\frac{n}{2}^m) + \omega_n^mA_1(\omega_\frac{n}{2}^m) =A0(ωn2m)+ωnmA1(ωn2m)=A0(ω2nm)+ωnmA1(ω2nm);
再代入 ω n m + n 2 : \omega_n^{m+\frac{n}{2}}: ωnm+2n:
A ( ω n m + n 2 ) = A 0 ( ω n 2 m + n ) + ω n m + n 2 A 1 ( ω n 2 m + n ) A(ω_n^{m+\frac{n}{2}} )=A_0(ω_n^{2m+n})+ω_n^{m+\frac{n}{2}}A_1(ω_n^{2m+n}) A(ωnm+2n)=A0(ωn2m+n)+ωnm+2nA1(ωn2m+n)
= A 0 ( ω n 2 m ω n n ) − ω n m A 1 ( ω n 2 m ω n n ) = A 0 ( ω n 2 m ) − ω n m A 1 ( ω n 2 m ) =A_0 (ω_n^{2m} ω_n^n ) − ω_n^mA_1 (ω_n^{2m} ω_n^n) =A_0(ω_{n\over2}^m)−ω_n^mA_1(ω_{n\over2}^m) =A0(ωn2mωnn)−ωnmA1(ωn2mωnn)=A0(ω2nm)−ωnmA1(ω2nm);
So: A ( ω n m ) = A 0 ( ω n 2 m ) + ω n m A 1 ( ω n 2 m ) A(\omega_n^m)= A_0 (\omega_\frac{n}{2}^m) + \omega_n^mA_1(\omega_\frac{n}{2}^m) A(ωnm)=A0(ω2nm)+ωnmA1(ω2nm);
A ( ω n m + n 2 ) = A 0 ( ω n 2 m ) − ω n m A 1 ( ω n 2 m ) A(ω_n^{m+\frac{n}{2}} ) =A_0(ω_{n\over2}^m)−ω_n^mA_1(ω_{n\over2}^m) A(ωnm+2n)=A0(ω2nm)−ωnmA1(ω2nm);
即当知道
A
0
A_0
A0和
A
1
A_1
A1时,可以同时知道
A
(
ω
n
m
)
和
A
(
ω
n
m
+
n
2
)
A(\omega_n^m)和A(\omega_n^{m+\frac{n}{2}})
A(ωnm)和A(ωnm+2n)的值(满足以下蝶形计算形式);
而
A
0
A_0
A0和
A
1
A_1
A1的问题规模都是
A
(
x
)
A(x)
A(x)的一半,所以FFT可以在
O
(
n
l
o
g
n
)
O(n logn)
O(nlogn)的条件下完成。
当n = 8时,蝶形图如下:
说明:最左边是输入系数,
x
(
i
)
x(i)
x(i)代表第
i
+
1
i+1
i+1个系数,最右边是输出结果,即{
ω
n
0
,
ω
n
1
,
.
.
.
,
ω
n
n
−
1
\omega_n^0, \omega_n^1, ..., \omega_n^{n-1}
ωn0,ωn1,...,ωnn−1}对应的函数值;
观察输入数据可知:在做蝶形计算之前需要先进行一定的排序(即后面要说的码位互换);
当n=8时,码位倒序图:
观察可知:倒序数和计算的顺序数一致,所以要在输入系数后对其进行码位倒序。
/* * @author yimik * @date 2020/10/13 * @encoding GBK * */ #define _USE_MATH_DEFINES #include<iostream> #include<stdlib.h> #include<math.h> using namespace std; int Extend(int num_coef); void ReverseOrder(int n, float* coefR, float* coefI); void Input(int num_coef, int n, float* dataR, float* dataI); void Calculate(int P, int n, int r, int B, float* dataR, float* dataI); void Print(int n, float* dataR, float* dataI); int main(int argc, char** argv) { //使用定点计数法,显示小数点后面位数精度为2 cout.setf(ios_base::fixed, ios_base::floatfield); cout.setf(ios_base::showpoint); cout.precision(2); //系数输入个数 int num_coef; cout << "the number of coefficients you want to input is: "; cin >> num_coef; //系数数组, 进程数组 float *dataR, *dataI; //扩展系数 int n = Extend(num_coef); //分配空间 dataR = new float[n]; dataI = new float[n]; //系数输入 Input(num_coef, n, dataR, dataI); //码位倒序 ReverseOrder(n, dataR, dataI); //蝶形级数 int M = log(n) / log(2); //FFT蝶形级数L从1--M for(int L=1; L<=M;L++){ /*第L级的运算*/ //间隔 B = 2^(L-1); int B = pow(2, L - 1); for(int j = 0; j <= B - 1; j++){ /*同种蝶形运算*/ //增量k=2^(M-L) int k = pow(2,M-L); //计算旋转指数p,增量为k时,则P=j*k int P = j * k; for(int i = 0; i <= k - 1; i++){ /*进行蝶形运算*/ //数组下标定为r int r = j + 2 * B * i; Calculate(P, n, r, B, dataR, dataI); } } } //输出 Print(n, dataR, dataI); } /* *@function 将系数项数扩展至2^X *@param num_coef: 输入的系数项数 *@return 扩展后的项数 * */ int Extend(int num_coef) { int m = log(num_coef) / log(2); if(num_coef == pow(2, m)) return(num_coef); else return(pow(2, m + 1)); } /* * @function 码位交换 * @param n: 系数个数 * @param coefR: 实部数组 * @param coefI: 虚部数组 * */ void ReverseOrder(int n, float* coefR, float* coefI) { //码位个数 int m = log(n) / log(2); //码位高位、低位 int head, rear; //低位、高位为1 的数 int a1, an; //码位交换后的数,用于交换的temp int j; float tempR, tempI; //遍历系数 for (int i = 0; i < pow(2, m); i++) { j = 0; for (int k = 0; k < (m + 1) / 2; k++) { //第一位为1 a1 = 1; //第M位为1 an = pow(2, m - 1); //移位 a1 = a1 << k; an = an >> k; //取高位、低位 head = i & an; rear = i & a1; //交换码位模块 if (0 != head) j = j | a1; if (0 != rear) j = j | an; } //数组元素交换 if (i < j) { tempR = coefR[i]; tempI = coefI[i]; coefR[i] = coefR[j]; coefI[i] = coefI[j]; coefR[j] = tempR; coefI[j] = tempI; } } } /* * @function 输入系数数组的实部和虚部 * @param n: 系数个数 * @param dataR: 实部 * @param dataI: 虚部 * */ void Input(int num_coef, int n, float* dataR, float* dataI){ cout << "R: "; int i; for (int i = 0; i < num_coef; i++) { cin >> dataR[i]; } cout << "I: "; for (i = 0; i < num_coef; i++) { cin >> dataI[i]; } if (n > num_coef) for(; i < n; i++){ dataR[i] = 0; dataI[i] = 0; } } /* * @function 进行碟形计算 * @param P: 旋转因子 * @param n: 系数个数 * @param r: 数组下标 * @param B: 间隔 * @param dataR: 实部 * @param dataI: 虚部 * */ void Calculate(int P, int n, int r, int B, float* dataR, float* dataI){ /*进行蝶形运算*/ //t = W * A1, W = cos(2 * PI * P / n) - i * sin(2 * PI * P / n) float tR = dataR[r + B] * cos(2 * M_PI * P / n) + dataI[r + B] * sin(2 * M_PI * P / n); float tI = dataI[r + B] * cos(2 * M_PI * P / n) - dataR[r + B] * sin(2 * M_PI * P / n); //A(r) = A0 + W * A1 //A(r + B) = A0 - W * A1 dataR[r + B] = dataR[r] - tR; dataI[r + B] = dataI[r] - tI; dataR[r] = dataR[r] + tR; dataI[r] = dataI[r] + tI; } /* * @function 输出数组(复数形式) * */ void Print(int n, float* dataR, float* dataI){ cout << "result:" << endl; for (int i = 0; i < n; i++) { if(dataI[i] >= 0) cout << dataR[i] << "+" << dataI[i] << "i" << "\t"; else cout << dataR[i] << dataI[i] << "i" << "\t"; } cout << endl; }
结果截图:
结果说明:
f
(
ω
n
i
)
=
r
e
s
u
l
t
[
i
]
f(\omega_n^i)=result[i]
f(ωni)=result[i]。
现在用np(进程数)=3, n(系数项数)=8的例子说明MPI并行是如何实现的。
当系数项数n=8,进程数np = 3时,进程资源划分如下:
说明:
1)在每个进程中用
m
d
a
t
a
R
[
]
、
m
d
a
t
a
I
[
]
mdataR[]、mdataI[]
mdataR[]、mdataI[]来存储结果的实部和虚部;
2)在每级蝶形计算前先把上一级的结果广播给每个进程(从其他进程里面获取数据可能发生死锁);
3)未被分配的元素会由0号进程处理;
在进行第L级计算时,遍历
m
d
a
d
a
R
、
m
d
a
t
a
I
mdadaR、mdataI
mdadaR、mdataI:
若是已分配的元素:
1)若对应蝶形组的上半部分,且蝶形组的下半部分对应的元素也在此进程中,则直接更新两个元素:
2)若对应蝶形组的上半部分,且蝶形组的下半部分对应的元素是不在此进程中,对下半部分对应的元素进行分析:
2.1)若是已分配的元素:更新当前元素,同时将下半部分对应的元素发送给指定进程,如图2.1:
2.1)若是未分配的元素:更新当前元素,但是不发送下半部分对应的元素(发送给0号进程可能造成死锁),如图2.2:
注意:这里的发送的数据指的是计算的结果,并非蝶形组左边的数据,还望不要被图片所误导,这样标注仅是为了说明进程间的发送情况。
3)若对应蝶形组的下半部分,则接收其他进程发送过来的数据:
若是未分配的元素:
1)若对应蝶形组的上半部分,则直接更新两个元素:
2)若对应蝶形组的下半部分,则更新此元素:
当M级蝶形计算计算完毕,即可得到结果。
/* * @author yimik * @date 2020/10/9 * @encoding GBK * */ #define _USE_MATH_DEFINES #include<iostream> #include<stdlib.h> #include<math.h> #include<mpi.h> using namespace std; /*输入格式:mpirun -n size ./myFFT num_coef *说明:size: 设置的进程数,可任取一个小于num_coef的数; num_coef: 输入的系数项数,可任意取值; * */ int Extend(int num_coef); void ReverseOrder(int n, float* coefR, float* coefI); void Input(int num_coef, int n, float* dataR, float* dataI); void Calculate(int P, int n, int r, int B, float* dataR, float* dataI, float& tempR, float& tempI, float& mdataR, float& mdataI); void Print(int n, float* dataR, float* dataI); int main(int argc, char** argv) { //使用定点计数法,显示小数点后面位数精度为2 cout.setf(ios_base::fixed, ios_base::floatfield); cout.setf(ios_base::showpoint); cout.precision(2); //系数输入个数 int num_coef = atoi(argv[1]); //系数数组, 进程数组 float *dataR, *dataI, *mdataR, *mdataI; //中间数 float tempR, tempI; int myrank, size; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Status status; //扩展系数 int n = Extend(num_coef); //每个进程分配的元素个数 int msize = n / size; //分配空间 mdataR = new float[msize]; mdataI = new float[msize]; dataR = new float[n]; dataI = new float[n]; //0号进程对系数数组进行输入 if(0 == myrank){ //系数输入 Input(num_coef, n, dataR, dataI); //码位倒序 ReverseOrder(n, dataR, dataI); } //等待系数数组输入完毕 MPI_Barrier(MPI_COMM_WORLD); //蝶形级数 int M = log(n) / log(2); //mdata(0) 在 data数组中对应的下标 int start = myrank * msize; //FFT蝶形级数L从1--M for (int L = 1; L <= M; L++){ MPI_Bcast(dataR, n, MPI_FLOAT, 0, MPI_COMM_WORLD); MPI_Bcast(dataI, n, MPI_FLOAT, 0, MPI_COMM_WORLD); /*第L级的运算*/ //同一组蝶形计算的间隔 B = 2^(L-1); int B = (int)pow(2, L - 1); //旋转指数P的增量k=2^(M-L) int k = (int)pow(2, M - L); for (int j = 0; j < msize; j++){ //数组下标定为r int r = start + j; //如果对应蝶形组的下方,即后半部分 if ((r % (2 * B)) >= B){ //如果不由本进程更新,则接收其他进程发送过来的结果 if(j - B < 0) { MPI_Recv(&mdataR[j], 1, MPI_FLOAT, (r - B) / msize, r, MPI_COMM_WORLD, &status); MPI_Recv(&mdataI[j], 1, MPI_FLOAT, (r - B) / msize, r, MPI_COMM_WORLD, &status); //cout << "Level" << L << ": proccess" << myrank << " Recv " << "proccess" << (r - B) / msize << ": " << r << "<-" << r - B << endl; } continue; } //计算旋转指数p int P = (r % B) * k; /*进行蝶形运算*/ Calculate(P, n, r, B, dataR, dataI, tempR, tempI, mdataR[j], mdataI[j]); //如果data(r + B)不属于本进程mdata if(j + B + 1 > msize){ //如果data(r + B) 已被分配所到进程中则将结果发送过去,否则不发送 if(r + B < msize * size){ MPI_Send(&tempR, 1, MPI_FLOAT, (r + B) / msize, r + B, MPI_COMM_WORLD); MPI_Send(&tempI, 1, MPI_FLOAT, (r + B) / msize, r + B, MPI_COMM_WORLD); //cout << "Level" << L << ": proccess" << myrank << " Send " << "proccess" << (r + B) / msize << ": " << r << "->" << r - B << endl; } } //如果A(r + B)属于本进程mdata则直接进行更新 else{ mdataR[j + B] = tempR; mdataI[j + B] = tempI; } } /*处理未被分配的元素*/ //未被分配元素开始的下标 int remainStartID = msize * size; //在0号进程中直接处理未被分配的元素,当remainStartID == num_coef时说明全部被分配 if((0 == myrank) && (remainStartID != n)){ for(int j = 0; j < n - remainStartID; j++){ int r = remainStartID + j; //计算旋转指数p int P = (r % B) * k; //如果位于蝶形组下方,即后半部分 if ((r % (2 * B)) >= B) { //如果不由处理未被分配元素的模块更新 if(j - B < 0) { //t = W * A1, W = cos(2 * PI * P / n) - i * sin(2 * PI * P / n) float tR = dataR[r] * cos(2 * M_PI * P / n) + dataI[r] * sin(2 * M_PI * P / n); float tI = dataI[r] * cos(2 * M_PI * P / n) - dataR[r] * sin(2 * M_PI * P / n); //A(r) = A0 + W * A1 //A(r + B) = A0 - W * A1 dataR[r] = dataR[r - B] - tR; dataI[r] = dataI[r - B] - tI; } continue; } /*进行蝶形运算*/ Calculate(P, n, r, B, dataR, dataI, dataR[r + B], dataI[r + B], dataR[r], dataI[r]); } } //等待此级蝶形变换全部计算完毕 MPI_Barrier(MPI_COMM_WORLD); //从各个进程中收集结果,更新dataR, dataI MPI_Gather(mdataR, msize, MPI_FLOAT, dataR, msize, MPI_FLOAT, 0, MPI_COMM_WORLD); MPI_Gather(mdataI, msize, MPI_FLOAT, dataI, msize, MPI_FLOAT, 0, MPI_COMM_WORLD); } if(0 == myrank) Print(n, dataR, dataI); //delete[] dataR; //delete[] dataI; //delete[] mdataR; //delete[] mdataI; MPI_Finalize(); return 0; } /* *@function 将系数项数扩展至2^X *@param num_coef: 输入的系数项数 *@return 扩展后的项数 * */ int Extend(int num_coef) { int m = log(num_coef) / log(2); if(num_coef == pow(2, m)) return(num_coef); else return(pow(2, m + 1)); } /* * @function 码位交换 * @param n: 系数个数 * @param coefR: 实部数组 * @param coefI: 虚部数组 * */ void ReverseOrder(int n, float* coefR, float* coefI) { //码位个数 int m = log(n) / log(2); //码位高位、低位 int head, rear; //低位、高位为1 的数 int a1, an; //码位交换后的数,用于交换的temp int j; float tempR, tempI; //遍历系数 for (int i = 0; i < pow(2, m); i++) { j = 0; for (int k = 0; k < (m + 1) / 2; k++) { //第一位为1 a1 = 1; //第M位为1 an = pow(2, m - 1); //移位 a1 = a1 << k; an = an >> k; //取高位、低位 head = i & an; rear = i & a1; //交换码位模块 if (0 != head) j = j | a1; if (0 != rear) j = j | an; } //数组元素交换 if (i < j) { tempR = coefR[i]; tempI = coefI[i]; coefR[i] = coefR[j]; coefI[i] = coefI[j]; coefR[j] = tempR; coefI[j] = tempI; } } } /* * @function 输入系数数组的实部和虚部 * @param n: 系数个数 * @param dataR: 实部 * @param dataI: 虚部 * */ void Input(int num_coef, int n, float* dataR, float* dataI){ cout << "R: "; int i; for (int i = 0; i < num_coef; i++) { cin >> dataR[i]; } cout << "I: "; for (i = 0; i < num_coef; i++) { cin >> dataI[i]; } if (n > num_coef) for(; i < n; i++){ dataR[i] = 0; dataI[i] = 0; } } /* * @function 进行碟形计算 * @param P: 旋转因子 * @param n: 系数个数 * @param r: 数组下标 * @param B: 间隔 * @param dataR、mdataR、tempR: 实部 * @param dataI、mdataI、tempI: 虚部 * @param j: 进程中结果数组的下标 * */ void Calculate(int P, int n, int r, int B, float* dataR, float* dataI, float& tempR, float& tempI, float& mdataR, float& mdataI){ /*进行蝶形运算*/ //t = W * A1, W = cos(2 * PI * P / n) - i * sin(2 * PI * P / n) float tR = dataR[r + B] * cos(2 * M_PI * P / n) + dataI[r + B] * sin(2 * M_PI * P / n); float tI = dataI[r + B] * cos(2 * M_PI * P / n) - dataR[r + B] * sin(2 * M_PI * P / n); //A(r) = A0 + W * A1 //A(r + B) = A0 - W * A1 tempR = dataR[r] - tR; tempI = dataI[r] - tI; mdataR = dataR[r] + tR; mdataI = dataI[r] + tI; } /* * @function 输出数组(复数形式) * */ void Print(int n, float* dataR, float* dataI){ cout << "result:" << endl; for (int i = 0; i < n; i++) { if(dataI[i] >= 0) cout << dataR[i] << "+" << dataI[i] << "i" << "\t"; else cout << dataR[i] << dataI[i] << "i" << "\t"; } cout << endl; }
结果截图:
[1]路人黑的纸巾,十分简明易懂的FFT(快速傅里叶变换),https://blog.csdn.net/enjoy_pascal/article/details/81478582,2018-08-07/2020-10-14.
[2]GoKing,C语言系列之FFT算法实现,https://zhuanlan.zhihu.com/p/135259438,~/2020-10-14.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。