numpy(4)便捷函数

第 4 章 NumPy便捷函数

Posted by Hilda on February 7, 2025

前3章以及其他补充已经整理如下:

第1章NumPy入门

第2章NumPy基础

第3章常用函数

广播与广播机制


你可能已经发现,NumPy中包含大量的函数。其实很多函数的设计初衷都是为了让你能更方便地使用。了解这些函数,你可以大大提升自己的工作效率。这些函数包括数组元素的选取(例如,根据某个条件表达式)和多项式运算等。计算股票收益率相关性的例子将让你浅尝NumPy数据分析。

本章涵盖以下内容:

  • 数据选取;
  • 简单数据分析;
  • 收益率相关性;
  • 多项式;
  • 线性代数的计算函数。

在前一章中,我们只用到了一个数据文件(data.csv)。本章将有重要的改进——我们同时用到两个数据文件。让我们继续前进,携手NumPy一起探索数据吧。


4.1 相关性

不知你是否注意过这样的现象:某公司的股价被另外一家公司的股价紧紧跟随,并且它们通常是同领域的竞争对手。对于这种现象,理论上的解释是:因为这两家公司经营的业务类型相同,它们面临同样的挑战,需要相同的原料和资源,并且争夺同类型的客户。

你可能会想到很多这样的例子,但还想检验一下它们是否真的存在关联。一种方法就是看看两个公司股票收益率的相关性强相关性意味着它们之间存在一定的关联性。当然,这不是严格的证明,特别是当我们所用的数据不够充足时。

4.2 动手实践:股票相关性分析

在本节的教程中,我们将使用2个示例数据集提供收盘价数据,其中包含收盘价的最小值。

第一家公司是BHP Billiton(BHP),其主要业务是石油、金属和钻石的开采。第二家公司是Vale (VALE),也是一家金属开采业的公司。因此,这两家公司有部分业务是重合的,尽管不是100% 相同。按照如下步骤分析它们股票的相关性。

(1) 首先,从CSV文件(本章示例代码文件夹中)中读入两只股票的收盘价数据,并计算收益率。如果你不记得该怎样做,在前一章中有很多可以参阅的例子。

image-20250206191222023

回顾数据的每个字段:

股价数据存储在CSV文件中,第一列为股票代码以标识股票(苹果公司股票代码为AAPL),第二列为dd-mm-yyyy格式的日期,第三列为空,随后各列依次是开盘价、最高价、最低价和收盘价,最后一列为当日的成交量。

下面为一行数据:

1
AAPL,28-01-2011, ,344.17,344.4,333.53,336.1,21144800 

要的是收盘价,即第6列。

1
2
bhp = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6, ))
vale = np.loadtxt("VALE.csv", delimiter=",", unpack=True, usecols=(6, ))

image-20250206192005211


然后需要计算收益率。这在3.12中详细解释过。

image-20250206192122006

1
2
bhp_returns = np.diff(bhp)/bhp[:-1]
vale_returns = np.diff(vale)/vale[:-1]

image-20250206192326943

(2) 协方差描述的是两个变量共同变化的趋势,其实就是归一化前的相关系数。使用cov函数计算股票收益率的协方差矩阵(并非必须这样做,但我们可以据此展示一些矩阵操作的方法)。

1
2
# 计算协方差
covariance = np.cov(bhp_returns, vale_returns)

得到的协方差矩阵如下:

image-20250206192434257

补充:协方差用来度量两个随机变量\(X\)和\(Y\)间的相似程度,记为\(Cov(X,Y)\),计算公式为:\(Cov(X,Y)=E(XY)-EX·EY\)。

(3) 使用diagonal函数查看对角线上的元素:

1
covariance.diagonal()

得到协方差矩阵的对角线元素如下:

image-20250206192547077

协方差矩阵中对角线上的元素并不相等,这与相关系数矩阵是不同的。


(4) 使用trace函数计算矩阵的迹,即对角线上元素之和:

1
covariance.trace()

image-20250206212358544

(5) 两个向量的相关系数被定义为协方差除以各自标准差的乘积。计算向量a和b的相关系数的公式如下。

补充:相关系数,也叫皮尔逊(Pearson)相关系数,用来度量两个随机变量\(X\)和\(Y\)间的相关程度,计算公式如下:

image-20250206212247081

1
covariance/(bhp_returns.std()*vale_returns.std())

image-20250206212526395

(6) 我们将用相关系数来度量这两只股票的相关程度。相关系数的取值范围在-1到1之间。

根据定义,一组数值与自身的相关系数等于1。这是严格线性关系的理想值,实际上如果得到稍小一些的值,我们仍然会很高兴。使用corrcoef函数计算相关系数(或者更精确地,相关系数矩阵):

corrcoef 的全称是 Correlation Coefficient,即 相关系数

1
np.corrcoef(bhp_returns, vale_returns)

image-20250206212739169

注:本书作者将该矩阵称为相关系数矩阵,译者保留不同观点。我们知道,相关系数矩阵的主对角线元素为随机变量与自身的相关系数,应该等于1。因此这一步得到的矩阵并非相关系数矩阵,而下一步中的才是。

对角线上的元素即BHP和VALE与自身的相关系数,因此均为1,很可能并非真的经过计算得出。相关系数矩阵是关于对角线对称的,因此另外两个元素的值相等,表示BHP与VA L E的相关系数等于VA L E和BHP的相关系数。看起来它们的相关程度似乎不是很强。


相关系数(correlation coefficient)是用来度量两个变量之间线性关系强度和方向的统计量。它的取值范围是从 -1 到 1,具体含义如下:

  • 1 表示完全正相关(即一个变量增加,另一个也完全按比例增加)。
  • -1 表示完全负相关(即一个变量增加,另一个按比例减少)。
  • 0 表示没有线性关系(即变量之间没有可预测的线性关系)。

当我们计算一组数据的相关系数时,这个系数帮助我们理解这两组数据是如何一起变化的。


在 Python 中,我们使用 np.corrcoef() 函数来计算相关系数,得到的结果是一个 相关系数矩阵。这个矩阵不仅包括两只股票之间的相关性,还包括它们与自身的相关性。

np.corrcoef() 函数会返回一个 2x2 的相关系数矩阵:

1
2
[[1.0, r(BHP, VALE)],
 [r(VALE, BHP), 1.0]]

对角线上的元素是 BHP 与 BHP 的相关系数(以及 VALE 与 VALE 的相关系数),因为任何数据与自己总是完全相关,所以这两个值是 1

非对角线元素是 BHP 与 VALE 的相关系数(或者 VALE 与 BHP 的相关系数)。由于相关系数矩阵是对称的,因此这两个值是相等的,表示两只股票之间的相关性。


(7) 另外一个要点是判断两只股票的价格走势是否同步。如果它们的差值偏离了平均差值2倍于标准差的距离,则认为这两只股票走势不同步。

若判断为不同步,我们可以进行股票交易,等待它们重新回到同步的状态。计算这两只股票收盘价的差值,以判断是否同步:

1
difference = bhp - vale

检查最后一次收盘价是否在同步状态,代码如下:

1
2
3
4
# 检查最后一次收盘价是否在同步状态
avg = np.mean(difference)
dev = np.std(difference)
np.abs(difference[-1] - avg) > 2 * dev

image-20250207133429062

遗憾的是,我们暂时不能进行交易,因为输出了false。


补充:配对交易(Pair Trading)的策略

配对交易是一种市场中性策略,通常选择两只股票,它们之间有某种长期稳定的关系,比如历史上价格走势比较相似。配对交易的核心思想是,当这两只股票的价格走势出现显著的 背离(即价格差异超过正常波动范围),我们可以进行交易,预计它们会重新回到正常的价格关系上。

简单来说:

  • 如果两只股票的价格差异超过了通常的波动范围(偏离了平均差值 2 倍标准差),这就意味着它们之间的关系出现了 短期的异常,因此可以通过做多(买入)低估的一只股票,做空(卖空)高估的一只股票来获利。
  • 当这两只股票的价格差异恢复到正常范围时,配对交易策略就能平仓获利。

“同步”指的是两只股票的价格差异(bhp - vale)在某个合理的范围内波动,这通常意味着这两只股票在经济上或市场情绪上受到相似的影响。也就是说,在正常情况下,BHP 和 VALE 的价格差异应该稳定在某个 平均值附近,并围绕这个值波动。

当价格差异 偏离(即“不同步”)时,可能意味着市场出现了暂时的非理性定价,可能因为某些突发新闻或市场情绪导致一只股票相对另一只股票涨跌过度。这时候,配对交易策略就会认为两只股票的价格走势暂时不再同步,可以采取相应的交易策略进行套利。

判断这两只股票是否“不同步”的方法是看它们的价格差异(difference = bhp - vale)是否 偏离了平均值超过了 2 倍标准差,这就是统计学中的一个常见判断标准。通常来说,价格差异在 平均值 ± 2 倍标准差 范围内是正常波动的,超出这个范围则表示存在异常。

代码中使用的具体步骤是:

  • avg = np.mean(difference) 计算价格差异的平均值。
  • dev = np.std(difference) 计算价格差异的标准差。
  • np.abs(difference[-1] - avg) > 2 * dev 判断当前价格差异与历史平均值的偏离是否超过了 2 倍标准差。如果是,那么我们认为两只股票的走势 不同步

如果两只股票的价格差异远超出正常波动范围(即 不同步),那么就有可能出现短期内的一种 价格修正。这时候,配对交易策略会认为价格差异有可能会回归到正常范围,因此可以进行套利交易。

例如,如果 bhp 的价格过高,vale 的价格过低,且它们的价格差异明显大于常规波动,那么通过做空 BHP 并做多 VALE,期望它们的价格差异缩小回常态,从中获得收益。


输出为 Out of sync False,表示当前这两只股票的价格差异没有超过 2 倍标准差的阈值,换句话说,它们的价格走势是 同步的,因此此时并没有交易机会。


(8) 绘图需要Matplotlib库,我们将在第9章中详细讲解。使用如下代码进行绘图:

1
2
3
4
5
6
7
from matplotlib.pyplot import plot  
from matplotlib.pyplot import show
# 绘图
t = np.arange(len(bhp_returns))
plot(t, bhp_returns, lw = 1.0)
plot(t, vale_returns, lw = 2.0)
show()

image-20250207133933025

刚才做了些什么 : 我们分析了两只股票BHP和VA L E收盘价的相关性。更准确地说,我们计算了其收益率的相关系数。这可以用corrcoef函数来计算。我们还了解了协方差矩阵的计算过程,并可以据此计算相关系数。我们也因此展示了diagonal函数和trace函数的用法,分别可以给出矩阵的对角线元素和矩阵的迹。


完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import numpy as np
from matplotlib.pyplot import plot  
from matplotlib.pyplot import show

# 读取收盘价这一列数据
bhp = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6, ))
vale = np.loadtxt("VALE.csv", delimiter=",", unpack=True, usecols=(6, ))
# 计算利益率
bhp_returns = np.diff(bhp)/bhp[:-1]
vale_returns = np.diff(vale)/vale[:-1]
# 计算协方差
covariance = np.cov(bhp_returns, vale_returns)
# 计算差值,判断是否同步
difference = bhp - vale
# 检查最后一次收盘价是否在同步状态
avg = np.mean(difference)
dev = np.std(difference)
print(np.abs(difference[-1] - avg) > 2 * dev)

# 绘图
t = np.arange(len(bhp_returns))
plot(t, bhp_returns, lw = 1.0)
plot(t, vale_returns, lw = 2.0)
show()

突击测验:计算协方差 问题1 以下哪个函数返回的是两个数组的协方差?

(1) covariance (2) covar (3) cov (4) cvar

答案:(3),在 NumPy 中,返回两个数组的 协方差 的函数是 np.cov(),该函数计算并返回协方差矩阵。np.cov() 需要输入两个数据集,它会计算这些数据集之间的协方差。例如:

1
2
3
4
5
6
7
8
9
import numpy as np

# 示例数据
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])

# 计算协方差
cov_matrix = np.cov(a, b)
print(cov_matrix)

image-20250207134343717

4.3 多项式

你喜欢微积分吗?我非常喜欢!在微积分里有泰勒展开的概念,也就是用一个无穷级数来表示一个可微的函数。实际上,任何可微的(从而也是连续的)函数都可以用一个N次多项式来估计,而比N次幂更高阶的部分为无穷小量可忽略不计。


4.4 动手实践:多项式拟合

NumPy中的ployfit函数可以用多项式去拟合一系列数据点,无论这些数据点是否来自连续函数都适用。


补充1:ployfit函数

numpy.polyfit 是 NumPy 中用于进行 多项式拟合 的函数,常用于根据数据点拟合一个多项式,以便对数据进行建模或预测。该函数使用最小二乘法来找到最佳拟合的多项式系数。

numpy.polyfit 使用 最小二乘法(Least Squares Method)来拟合一个多项式。最小二乘法的基本思想是:通过最小化拟合的多项式与实际数据点之间的误差平方和(即残差平方和),来求得最优的多项式系数。

对于一个给定的多项式模型:\(P(x) = a_n x^n + a_{n-1}x^{n-1} + ... + a_1 x +a_0\),其中,n 是多项式的次数,\(a_n, a_{n-1}, ..., a_0\) 是多项式的系数。

目标是最小化以下残差平方和:\(S = \sum_{i=0}^{m-1}(y_i-p(x_i))^2\),

其中:

  • $y_i$是实际数据点的纵坐标值
  • \(P(x_i)\)是拟合多项式在\(x_i\)位置的预测值
  • \(m\)是数据点的数量

最小化这个平方和,通过线性代数的方法来求解系数\(a_n, a_{n-1}, ..., a_0\)


补充2:numpy.polyfit 函数的使用方法

函数签名:

1
numpy.polyfit(x, y, deg, full=False, w=None, cov=False)

参数:

  • x:输入数据的横坐标,必须是一个一维数组。
  • y:输入数据的纵坐标,必须是一个一维数组,与 x 数组的长度相同。
  • deg:多项式的 次数(degree)。比如,deg=1 表示线性拟合,deg=2 表示二次拟合,依此类推。
  • full(可选):如果设置为 True,返回更多的拟合信息,如残差、秩、矩阵等;默认为 False,只返回拟合系数。
  • w(可选):权重数组,用于加权最小二乘法拟合。如果给定,w 数组中的元素会对相应的数据点进行加权。
  • cov(可选):如果设置为 True,返回拟合系数的协方差矩阵。

返回值:

  • 返回值1:一个包含多项式系数的数组,这些系数是按从高到低的次序排列的。
  • 返回值2(如果 full=True):返回额外的拟合信息,包括残差、矩阵秩、奇异值等。
  • 返回值3(如果 cov=True):返回拟合系数的协方差矩阵。

举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import matplotlib.pyplot as plt
x = np.array([1, 2, 3, 4, 5])
y = np.array([2, 3, 5, 7, 11])

coeffs = np.polyfit(x, y, deg = 4, full=True, cov = True) # coeffs[0]是系数
# 生成拟合曲线
p = np.poly1d(coeffs[0])
y_fit = p(x)

# 绘图
plt.scatter(x, y, color="blue", label = "数据点")
plt.plot(x, y_fit, color="purple", label = "拟合线")
plt.legend()
show()

image-20250207141306098


(1) 我们继续使用BHP和VA L E的股票价格数据。用一个三次多项式去拟合两只股票收盘价的差价:

1
2
3
4
5
6
7
8
9
10
11
import sys
# 读取数据
bhp = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6,))
vale = np.loadtxt("VALE.csv", delimiter=",", unpack=True, usecols=(6,))

# N = int(sys.argv[1])
N = 3 # 因为是要用三次多项式拟合
t = np.arange(len(bhp))
# 用一个三次多项式去拟合两只股票收盘价的差价
coeffs = np.polyfit(t, bhp - vale, N)
coeffs

拟合的结果为(在这个例子中是一个三次多项式):

image-20250207141739065

(2) 上面看到的那些数字就是多项式的系数。用我们刚刚得到的多项式对象以及polyval函数,就可以推断下一个值:

1
np.polyval(coeffs, t[-1] + 1)

image-20250207141854821


补充:polyval函数的使用

numpy.polyval 是 NumPy 中用于 计算多项式值 的函数。给定多项式的系数和输入值,polyval 可以返回多项式在这些输入值上的对应值。

函数签名:

1
numpy.polyval(p, x)

参数:

  • p:一维数组,表示多项式的系数,系数按降幂排列。例如,多项式 \(P(x) = 2x^2 + 3x + 1\),则 p = [2, 3, 1]
  • x:输入值(或值的数组)。可以是单个数值,也可以是一个数组,表示我们希望在这些 \(x\) 值上计算多项式的值。

返回值:

  • 返回一个数组,表示多项式在每个 \(x\) 上的值。如果 \(x\) 是一个数组,返回的也是一个数组;如果 \(x\) 是单个数值,返回的是一个单独的数值。

(3) 理想情况下,BHP和VA L E股票收盘价的差价越小越好。在极限情况下,差值可以在某个点为0。使用roots函数找出我们拟合的多项式函数什么时候到达0值:

1
np.roots(coeffs)

解出多项式的根为:

image-20250207142150682

(4) 我们在微积分课程中还学习过求极值的知识——极值可能是函数的最大值或最小值。 记住微积分中的结论,这些极值点位于函数的导数为0的位置。使用polyder函数对多项式函数求导:

1
der = np.polyder(coeffs)

多项式函数的导函数(仍然是一个多项式函数)的系数如下:

image-20250207142309897

你看到的这些数字即为导函数的系数。

(5) 求出导数函数的根,即找出原多项式函数的极值点:

1
np.roots(der)

得到的极值点为:

image-20250207142351923

我们来复核一下结果,使用polyval计算多项式函数的值:

1
vals = np.polyval(coeffs,t)

image-20250207142545419

(6) 现在,使用argmax和argmin找出最大值点和最小值点:

1
2
np.argmax(vals)
np.argmin(vals)

image-20250207142640070

与上一步中的结果不完全一致,不过回到第1步可以看到,t是用arange函数定义的。

(7) 绘制源数据和拟合函数如下:

1
2
3
plot(t, bhp - vale)
plot(t, vals)
show()

生成的折线图如下。

image-20250207142743343

显然,光滑曲线为拟合函数,而锯齿状的为源数据。拟合得不算很好,因此你可以尝试更高阶的多项式拟合。

刚才做了些什么 : 我们使用polyfit函数对数据进行了多项式拟合。我们学习使用polyval函数计算多项式的取值,使用roots函数求得多项式函数的根,以及polyder函数求解多项式函数的导函数

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import numpy as np
from matplotlib.pyplot import plot  
from matplotlib.pyplot import show
import sys

# 读取数据
bhp = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6,))
vale = np.loadtxt("VALE.csv", delimiter=",", unpack=True, usecols=(6,))
# N = int(sys.argv[1])
N = 3 # 因为是要用三次多项式拟合
t = np.arange(len(bhp))
# 用一个三次多项式去拟合两只股票收盘价的差价
coeffs = np.polyfit(t, bhp - vale, N)

print("下一个值", np.polyval(coeffs, t[-1] + 1))
print("np.roots(coeffs)", np.roots(coeffs))

der = np.polyder(coeffs)
print("der", der)
print("np.roots(der)", np.roots(der))

vals = np.polyval(coeffs,t)
print("vals", vals)

print(np.argmax(vals))
print(np.argmin(vals))

plot(t, bhp - vale)
plot(t, vals)
show()

勇敢出发:改进拟合函数

本节中的拟合函数有很多可以改进的地方。尝试使用三次方之外的不同指数,或者考虑在拟合前对数据进行平滑处理。使用移动平均线就是一种数据平滑的方法。计算简单移动平均线和指数移动平均线的示例可参阅前面的章节。


改进方向:通过不同的拟合函数(例如使用不同次数的多项式拟合)和在拟合前进行 数据平滑(如使用移动平均线)来改善拟合结果。

  1. 使用更高次的多项式进行拟合

当前代码使用了一个 三次多项式 来拟合两只股票收盘价的差值。虽然三次多项式能拟合一些复杂的模式,但在某些情况下,较高次的多项式可能会引起 过拟合,即在训练数据集上表现得非常好,但在新数据上表现较差。

  • 建议:可以尝试不同次数的多项式(例如一次、二次、四次等),并观察拟合效果。为了避免过拟合,可以使用 交叉验证AIC/BIC 等统计方法来选择最佳的拟合次数。
  1. 数据平滑:简单移动平均线(SMA)和指数移动平均线(EMA)

在拟合之前对数据进行平滑处理有助于减少短期波动的影响,得到更加平稳的拟合曲线。移动平均线是常见的数据平滑方法。你可以尝试使用以下几种方法:

  • 简单移动平均线(SMA):通过滑动窗口计算数据的均值来平滑数据。
  • 指数移动平均线(EMA):给近期的数据点更大的权重,适用于趋势变化较快的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import numpy as np
from matplotlib.pyplot import plot  
from matplotlib.pyplot import show
import sys

# 读取数据
bhp = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6,))
vale = np.loadtxt("VALE.csv", delimiter=",", unpack=True, usecols=(6,))

# N = int(sys.argv[1])
N = 3  # 选择拟合的多项式次数,尝试不同次数,例如1, 2, 4等
t = np.arange(len(bhp))

# 计算两只股票收盘价的差值
diff = bhp - vale

# **数据平滑** - 使用简单移动平均(SMA)
def moving_average(data, window_size):
    return np.convolve(data, np.ones(window_size) / window_size, mode='valid')

# 使用简单移动平均进行平滑(例如:窗口大小为 10)
smoothed_diff_sma = moving_average(diff, 10)

# **数据平滑** - 使用指数移动平均(EMA)
def exponential_moving_average(data, span):
    return pd.Series(data).ewm(span=span, adjust=False).mean().to_numpy()

# 使用指数移动平均进行平滑(例如:span 为 10)
import pandas as pd
smoothed_diff_ema = exponential_moving_average(diff, 10)

# 用三次多项式拟合平滑后的数据
coeffs_sma = np.polyfit(t[:len(smoothed_diff_sma)], smoothed_diff_sma, N)
coeffs_ema = np.polyfit(t, smoothed_diff_ema, N)

# 输出拟合后的值和预测下一个值
print("SMA拟合下一个值:", np.polyval(coeffs_sma, t[-1] + 1))
print("EMA拟合下一个值:", np.polyval(coeffs_ema, t[-1] + 1))

# 计算导数
der_sma = np.polyder(coeffs_sma)
der_ema = np.polyder(coeffs_ema)
print("SMA拟合的导数:", der_sma)
print("EMA拟合的导数:", der_ema)

# 输出根
print("SMA拟合的根:", np.roots(coeffs_sma))
print("EMA拟合的根:", np.roots(coeffs_ema))

# 计算拟合值
vals_sma = np.polyval(coeffs_sma, t[:len(smoothed_diff_sma)])
vals_ema = np.polyval(coeffs_ema, t)

print("SMA拟合值:", vals_sma)
print("EMA拟合值:", vals_ema)

# 找到最大值和最小值的索引
print("SMA拟合值最大值索引:", np.argmax(vals_sma))
print("SMA拟合值最小值索引:", np.argmin(vals_sma))

print("EMA拟合值最大值索引:", np.argmax(vals_ema))
print("EMA拟合值最小值索引:", np.argmin(vals_ema))

# 绘图
plot(t, diff, label='原始差值', color="red")
plot(t[:len(smoothed_diff_sma)], smoothed_diff_sma, label='SMA平滑差值', color="yellow")
plot(t, smoothed_diff_ema, label='EMA平滑差值', color="green")
plot(t[:len(vals_sma)], vals_sma, label='SMA拟合结果', color="purple")
plot(t, vals_ema, label='EMA拟合结果', color = "black")

show()

image-20250207143650368

4.5 净额成交量

成交量(volume)是投资中一个非常重要的变量,它可以表示价格波动的大小。OBV(On-Balance Volume,净额成交量或叫能量潮指标)是最简单的股价指标之一,它可以由当日收盘价、前一天的收盘价以及当日成交量计算得出。这里我们以前一日为基期计算当日的OBV值(可以认为基期的OBV值为0)。若当日收盘价高于前一日收盘价,则本日OBV等于基期OBV加上当日成交量。若当日收盘价低于前一日收盘价,则本日OBV等于基期OBV减去当日成交量。若当日收盘价相比前一日没有变化,则当日成交量以0计算。


4.6 动手实践:计算 OBV

换言之,我们需要在成交量前面乘上一个由收盘价变化决定的正负号。在本节教程中,我们将学习该问题的两种解决方法,一种是使用NumPy中的sign函数,另一种是使用NumPy的piecewise函数


补充1:sign函数

numpy.sign 是 NumPy 库中的一个数学函数,用于返回输入数组中每个元素的 符号。它会根据每个元素的符号返回不同的值:如果元素为正数,则返回 1;如果为负数,则返回 -1;如果为零,则返回 0

函数签名:

1
numpy.sign(x)

参数:

  • x:输入的数值或数组,可以是任意形状的数组(例如标量、向量、矩阵等)。

返回值:

  • 返回一个数组或标量,表示输入数组中每个元素的符号。返回值的类型与输入类型一致
    • 对于正数元素,返回 1
    • 对于负数元素,返回 -1
    • 对于零,返回 0

补充2:piecewise函数

numpy.piecewise 是 NumPy 库中一个非常有用的函数,它允许你根据条件对数组中的元素进行分段函数计算。简单来说,piecewise 使得你可以指定多个条件,每个条件对应一个函数表达式,根据条件选择相应的操作。这对于处理不同区间的不同计算规则非常有用。

函数签名:

1
numpy.piecewise(x, condlist, funclist, *args, **kw)

参数:

  1. x:输入数组或数值,表示你希望对其进行操作的数据。
  2. condlist:一个列表或元组,包含多个条件表达式,每个条件对应一个区间或判断条件。每个条件都是一个布尔数组或者一个可以按元素逐个进行比较的表达式。x 中的每个元素都会依次与条件进行比较。
  3. funclist:一个列表或元组,包含与 condlist 中条件一一对应的函数或值。如果某个条件为 True,就会选择对应的函数或值。函数也可以是一个包含其他可选参数的函数。
  4. args, kw:传递给函数的额外参数。

返回值:

  • 返回一个新的数组,按照 condlist 中的条件对 x 中的元素应用对应的函数或值。

例如:

1
2
3
4
5
6
7
8
import numpy as np

x = np.array([2, -1, 0, 9, -3, 0])
conditions = [x < 0, x == 0, x > 0]
functions = [np.abs, lambda x: 0, lambda x: x**2]

arr = np.piecewise(x, conditions, functions)
arr

image-20250207144840761


(1) 把BHP数据分别加载到收盘价和成交量的数组中:

注意数据:回顾数据的每个字段

股价数据存储在CSV文件中,第一列为股票代码以标识股票(苹果公司股票代码为AAPL),第二列为dd-mm-yyyy格式的日期,第三列为空,随后各列依次是开盘价、最高价、最低价和收盘价,最后一列为当日的成交量。

下面为一行数据:

1
AAPL,28-01-2011, ,344.17,344.4,333.53,336.1,21144800 

所以,应该读取6,7两列。

1
2
# 读取数据
c, v = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6,7))

image-20250207145048113

为了判断计算中成交量前的正负号,我们先使用diff函数计算收盘价的变化量。diff函数可以计算数组中两个连续元素的差值,并返回一个由这些差值组成的数组:

1
change = np.diff(c)

收盘价差值的计算结果如下:

image-20250207145220074

(2) NumPy中的sign函数可以返回数组中每个元素的正负符号,数组元素为负时返回-1,为正时返回1,否则返回0。对change数组使用sign函数:

1
signs = np.sign(change)

change数组中各元素的正负符号如下所示:

image-20250207145320725

另外,我们也可以使用piecewise函数来获取数组元素的正负。顾名思义,piecewise函数可以分段给定取值。使用合适的返回值和对应的条件调用该函数:

1
pieces = np.piecewise(change,[change < 0, change == 0, change > 0], [-1, 0, 1])

再次输出数组元素的正负,结果如下:

image-20250207145512620

检查两次的输出是否一致:

1
np.array_equal(signs, pieces)

image-20250207145602132

(3) OBV值的计算依赖于前一日的收盘价,所以在我们的例子中无法计算首日的OBV值:

1
v[1:]*signs

image-20250207145652551

刚才做了些什么 : 我们刚刚计算了OBV值,它依赖于收盘价的变化量。我们分别使用了NumPy中的sign函数和piecewise函数这两种不同的方法来判断收盘价变化量的正负。

完整代码:

1
2
3
4
5
6
# 读取数据
c, v = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6,7))
change = np.diff(c)
signs = np.sign(change)
pieces = np.piecewise(change,[change < 0, change == 0, change > 0], [-1, 0, 1])
v[1:]*signs

4.7 交易过程模拟

你可能经常想尝试干一些事情,做一些实验,但又不希望造成任何不良后果。而NumPy就是用于实验的完美工具。我们将使用NumPy来模拟一个交易日,当然,这不会造成真正的资金损失。许多人喜欢抄底,也就是等股价下跌后才买入。类似的还有当股价比当日开盘价下跌一小部分(比如0.1%)时买入。

4.8 动手实践:避免使用循环

使用vectorize函数可以减少你的程序中使用循环的次数。我们将用它来计算单个交易日的利润。


补充:vectorize函数

numpy.vectorize 是 NumPy 中一个非常有用的函数,它允许你将普通的 Python 函数“矢量化”。简而言之,vectorize 可以将一个标量操作函数转化为一个可以作用于整个数组的函数,而不需要显式地使用 for 循环。它适用于你想对数组中的每个元素进行相同操作时。

函数签名:

1
numpy.vectorize(pyfunc, otypes=None, doc=None, excluded=None, cache=False, signature=None)

参数:

  1. pyfunc:你想要矢量化的 Python 函数。
  2. otypes:可选,指定输出的类型(如 floatint 等)。如果没有指定,NumPy 会自动推断。
  3. doc:可选,允许你指定文档字符串。
  4. excluded:可选,表示那些不需要矢量化的函数参数的索引列表。例如,如果你的函数需要一些标量输入和一些数组输入,你可以通过这个参数指定只对数组进行矢量化。
  5. cache:可选,是否启用缓存。默认为 False
  6. signature:可选,指定函数签名。通常你可以忽略这个参数。

返回值:

  • 返回一个新的函数,该函数可以应用于整个数组(或多个数组),并逐元素地进行处理。

vectorize 的核心思想是将一个接受单个标量值的函数转换为可以接受数组并返回数组的函数。这个转换是通过循环处理数组元素来实现的,而 vectorize 会帮你自动化这个过程。

例子:

1
2
3
4
5
6
def mySquare(x):
    return x**2

vectorized_square = np.vectorize(mySquare)
arr = np.array([1,3,6,10])
vectorized_square(arr)

image-20250207150303797


(1) 首先,读入数据:

回顾数据的每个字段

股价数据存储在CSV文件中,第一列为股票代码以标识股票(苹果公司股票代码为AAPL),第二列为dd-mm-yyyy格式的日期,第三列为空,随后各列依次是开盘价、最高价、最低价和收盘价,最后一列为当日的成交量。

下面为一行数据:

1
AAPL,28-01-2011, ,344.17,344.4,333.53,336.1,21144800 

1
2
# 读取数据
o, h, l, c = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(3, 4, 5, 6))

image-20250207150619408

(2) NumPy中的vectorize函数相当于Python中的map函数。调用vectorize函数并给定calc_profit函数作为参数,尽管我们还没有编写这个函数:

1
vectorizedCalcProfits = np.vectorize(calc_profit)

(3) 我们现在可以先把func当做函数来使用。对股价数组使用我们得到的func函数:

1
profits = vectorizedCalcProfits(o, h, l, c)

(4) calc_profit函数非常简单。首先,我们尝试以比开盘价稍低一点的价格买入股票。如果这个价格不在当日的股价范围内,则尝试买入失败,没有获利,也没有亏损,我们均返回0。否则,我们将以当日收盘价卖出,所获得的利润即买入和卖出的差价。事实上,计算相对利润更为直观:

1
2
3
4
5
6
7
8
9
def calc_profit(open_, high, low, close):
    # 以比开盘价稍低的价格买入
    # buy = open * int(sys[argv[1]])
    buy = open_ * 0.99998
    # daily range 
    if low < buy < high:
        return (close - buy)/buy
    else:
        return 0

image-20250207151620239

(5) 在所有交易日中有两个零利润日,即没有利润也没有损失。我们选择非零利润的交易日并计算平均值:

1
2
3
real_trades = profits[profits != 0]
print("Number of trades", len(real_trades), round(100.0 * len(real_trades)/len(c), 2),"%" )
print("Average profit/loss %", round(np.mean(real_trades) * 100, 2))

交易结果如下:

image-20250207152510300

(6) 乐观的人们对于正盈利的交易更感兴趣。选择正盈利的交易日并计算平均利润:

1
2
3
winning_trades = profits[profits > 0]
print("Number of winning trades", len(winning_trades), round(100.0 * len(winning_trades)/len(c), 2), "%" )
print("Average profit %", round(np.mean(winning_trades) * 100, 2) )

image-20250207152637600

(7) 悲观的人们对于负盈利的交易更感兴趣,选择负盈利的交易日并计算平均损失:

1
2
3
losing_trades = profits[profits < 0]
print("Number of losing trades", len(losing_trades), round(100.0 * len(losing_trades)/len(c), 2), "%" )
print("Average loss %", round(np.mean(losing_trades) * 100, 2) )

负盈利交易的分析结果如下:

image-20250207152741570

刚才做了些什么 : 我们矢量化了一个函数,这是一种可以避免使用循环的技巧。我们使用一个能返回当日相对利润的函数来模拟一个交易日,并分别打印出正盈利和负盈利交易的概况。

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def calc_profit(open_, high, low, close):
    # 以比开盘价稍低的价格买入
    # buy = open * int(sys[argv[1]])
    buy = open_ * 0.99998
    # daily range 
    if low < buy < high:
        return (close - buy)/buy
    else:
        return 0
    
# 读取数据
o, h, l, c = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(3, 4, 5, 6))

vectorizedCalcProfits = np.vectorize(calc_profit)
profits = vectorizedCalcProfits(o, h, l, c)
real_trades = profits[profits != 0]
print("Number of trades", len(real_trades), round(100.0 * len(real_trades)/len(c), 2),"%" )
print("Average profit/loss %", round(np.mean(real_trades) * 100, 2))

winning_trades = profits[profits > 0]
print("Number of winning trades", len(winning_trades), round(100.0 * len(winning_trades)/len(c), 2), "%" )
print("Average profit %", round(np.mean(winning_trades) * 100, 2) )

losing_trades = profits[profits < 0]
print("Number of losing trades", len(losing_trades), round(100.0 * len(losing_trades)/len(c), 2), "%" )
print("Average loss %", round(np.mean(losing_trades) * 100, 2) )

勇敢出发:分析连续盈利和亏损

尽管平均利润为正值,但我们仍需要了解这段过程中是否有长期连续亏损的状况出现。这一点很重要,因为如果出现了连续亏损,我们可能会面临资本耗尽的情形,那么计算出来的平均利润就不可信了。 请检查是否出现过这样的连续亏损。如果你乐意,也可以检查是否有长时间的连续盈利。


要求检查是否出现过 长期连续亏损长期连续盈利,这是一个在金融分析中常见的风险控制问题。尽管代码计算了每笔交易的利润,并给出了平均利润等统计信息,但我们需要进一步检查这些交易中是否存在长期连续亏损(或盈利)的情形。

我们可以通过以下方式来改进现有代码:

  1. 检查连续亏损(或盈利)是否超过某个阈值(例如,连续亏损超过 3 次)。
  2. 记录并输出连续亏损的最大长度和最大连续盈利的长度。

我们将增加对 profits 数组的遍历,计算连续亏损和连续盈利的最大长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import numpy as np

def calc_profit(open_, high, low, close):
    # 以比开盘价稍低的价格买入
    buy = open_ * 0.99998
    # daily range 
    if low < buy < high:
        return (close - buy) / buy
    else:
        return 0

# 读取数据
o, h, l, c = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(3, 4, 5, 6))

vectorizedCalcProfits = np.vectorize(calc_profit)
profits = vectorizedCalcProfits(o, h, l, c)
real_trades = profits[profits != 0]

# 打印基本统计
print("Number of trades", len(real_trades), round(100.0 * len(real_trades) / len(c), 2), "%")
print("Average profit/loss %", round(np.mean(real_trades) * 100, 2))

# 盈利交易统计
winning_trades = profits[profits > 0]
print("Number of winning trades", len(winning_trades), round(100.0 * len(winning_trades) / len(c), 2), "%")
print("Average profit %", round(np.mean(winning_trades) * 100, 2))

# 亏损交易统计
losing_trades = profits[profits < 0]
print("Number of losing trades", len(losing_trades), round(100.0 * len(losing_trades) / len(c), 2), "%")
print("Average loss %", round(np.mean(losing_trades) * 100, 2))

# 检查连续亏损和连续盈利的改进方法
def find_longest_streak(profits, condition_fn):
    max_streak = 0
    current_streak = 0
    
    # 遍历所有交易
    for i in range(len(profits)):
        if condition_fn(profits[i]):
            # 如果满足条件,当前连续交易计数加 1
            if i == 0 or np.sign(profits[i]) == np.sign(profits[i - 1]):
                current_streak += 1
            else:
                # 如果当前符号和前一个交易符号不相同,重置当前连续周期计数
                current_streak = 1
        else:
            # 如果当前不满足条件,重置连续计数
            current_streak = 0
        
        max_streak = max(max_streak, current_streak)

    return max_streak

# 定义条件:盈利交易和亏损交易
winning_condition = lambda profit: profit > 0  # 盈利
losing_condition = lambda profit: profit < 0  # 亏损

# 计算最大连续盈利和最大连续亏损
longest_winning_streak = find_longest_streak(real_trades, winning_condition)
longest_losing_streak = find_longest_streak(real_trades, losing_condition)

# 打印连续盈利与连续亏损的最大长度
print("Longest winning streak:", longest_winning_streak)
print("Longest losing streak:", longest_losing_streak)

# 判断是否存在长期连续亏损的情况(例如:连续亏损超过 3 次)
threshold = 3
if longest_losing_streak >= threshold:
    print(f"Warning: There is a long losing streak of {longest_losing_streak} consecutive losing trades.")
else:
    print("No significant long losing streak found.")

# 判断是否存在长期连续盈利的情况(例如:连续盈利超过 3 次)
if longest_winning_streak >= threshold:
    print(f"Good: There is a long winning streak of {longest_winning_streak} consecutive winning trades.")
else:
    print("No significant long winning streak found.")

image-20250207154544730

补充:我们使用 np.sign(profits[i]) == np.sign(profits[i - 1]) 来检查连续性。如果符号相同,则 current_streak 增加。

如果符号不同,则重新开始计数。

4.9 数据平滑

噪声数据往往很难处理,因此我们通常需要对其进行平滑处理。除了用计算移动平均线的方法,我们还可以使用NumPy中的一个函数来平滑数据。 hanning函数是一个加权余弦的窗函数。在后面的章节中,我们还将更为详细地介绍其他窗函数。


补充:hanning函数的使用方法

numpy.hanning 是 NumPy 中的一个函数,用于生成汉宁窗(Hanning Window)。窗函数在信号处理、谱分析和滤波器设计中非常常见,通常用于平滑信号、减少频谱泄漏等。

汉宁窗是一种加权的窗函数,通过加权系数(在窗口的两端逐渐降低权重)来平滑数据。这有助于在进行傅里叶变换时减少信号的频谱泄漏。

数学表达式如下:\(w(n) = 0.5(1-cos(\frac{2 \pi n}{N - 1}))\),其中\(w(n)\)是汉宁窗的第 \(n\)个值,\(N\)是窗的大小。

函数签名:

1
numpy.hanning(M)

参数:

  • M:窗的长度,即生成的窗口的元素个数,必须是正整数。

返回值:

  • 返回一个包含 M 个元素的数组,表示汉宁窗。

看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import numpy as np
import matplotlib.pyplot as plt

# 设定窗函数的大小
M = 50

# 生成汉宁窗
hanning_window = np.hanning(M)

# 打印汉宁窗
print(hanning_window)

# 绘制窗函数图形
plt.plot(hanning_window)
plt.title("Hanning Window")
plt.xlabel("Sample Index")
# Amplitude(振幅)
plt.ylabel("Amplitude")
plt.grid(True)
plt.show()

image-20250207155348748


4.10 动手实践:使用 hanning 函数平滑数据

我们将使用hanning函数平滑股票收益率的数组,步骤如下。

(1) 调用hanning函数计算权重,生成一个长度为N的窗口(在这个示例中N取8):

1
2
3
4
5
6
import sys

# N = int(sys[argv[1]])
N = 8
weights = np.hanning(N)
print("weights", weights)

得到的权重如下:

image-20250207155530334

(2) 使用convolve函数计算BHP和VA L E的股票收益率,以归一化处理后的weights作为参数:

1
2
3
4
5
6
bhp = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6, ))
vale = np.loadtxt("VALE.csv", delimiter=",", unpack=True, usecols=(6,))
bhp_returns = np.diff(bhp)/bhp[:-1]
vale_returns = np.diff(vale)/vale[:-1]
smooth_bhp = np.convolve(weights/np.sum(weights), bhp_returns[N-1:-N+1])
smooth_vale = np.convolve(weights/np.sum(weights), vale_returns[N-1:-N+1])

(3) 用Matplotlib绘图:

1
2
3
4
5
6
7
# 画图
t = np.arange(N-1, len(bhp_returns))
plot(t, bhp_returns[N-1:],lw = 1.0)
plot(t, smooth_bhp, lw = 2.0)
plot(t, vale_returns[N-1:], lw = 3.0)
plot(t, smooth_vale, lw = 4.0)
show()

image-20250207160218955

图中的细线为股票收益率,粗线为平滑处理后的结果。如你所见,图中的折线有交叉。这些交叉点很重要,因为它们可能就是股价趋势的转折点,至少可以表明BHP和VA L E之间的股价关系发生了变化。这些转折点可能会经常出现,我们可以利用它们预测未来的股价走势。

(4) 使用多项式拟合平滑后的数据:

1
2
3
4
5
# 拟合
# K = int(sys[argv[1]])
K = 10   # 表示拟合的多项式的阶数
poly_bhp = np.ployfit(t, smooth_bhp, K)
poly_vale = np.polyfit(t, smooth_vale, K)

(5) 现在,我们需要解出上面的两个多项式何时取值相等,即在哪些地方存在交叉点。这等价于先对两个多项式函数作差,然后对所得的多项式函数求根。使用polysub函数对多项式作差:

1
2
3
poly_sub = np.polysub(poly_bhp, poly_vale)
xpoints = np.roots(poly_sub)
print("xpoints:", xpoints)

image-20250207160929350

(6) 得到的结果为复数,这不利于我们后续处理,除非时间也有实部和虚部。因此,这里需要用isreal函数来判断数组元素是否为实数:

1
reals = np.isreal(xpoints)

image-20250207161032605

可以看到有一部分数据为实数,因此我们用select函数选出它们。select函数可以根据一组给定的条件,从一组元素中挑选出符合条件的元素并返回数组:

1
2
3
xpoints = np.select([reals], [xpoints])
xpoints = xpoints.real
print("xpoints", xpoints)

得到的实数交叉点如下所示:

image-20250207161223913

(7) 我们需要去掉其中为0的元素。trim_zeros函数可以去掉一维数组中开头和末尾为0的元素:

注,书写的有问题。np.trim_zeros() 不会去除数组中间的零,只会去除两端的0。如果需要去除数组中所有的零,可以使用布尔索引或者 np.nonzero() 等方法:

1
print("去0:", np.trim_zeros(xpoints))

image-20250207161539823

发现没有去“0”。

1
2
3
print("用nonzero去0:", np.nonzero(xpoints))# 显示索引
xpoints_without_zero = xpoints[np.nonzero(xpoints)]
print("最终去0结果:", xpoints_without_zero)

image-20250207161805350

刚才做了些什么 : 我们使用hanning函数对股票收益率数组进行了平滑处理,使用polysub函数对两个多项式作差运算,以及使用isreal函数判断数组元素是否为实数,并用select函数选出了实数元素。最后,我们用trim_zeros函数去掉数组首尾的0元素。


完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import sys
import numpy as np
from matplotlib.pyplot import plot  
from matplotlib.pyplot import show

# N = int(sys[argv[1]])
N = 8
weights = np.hanning(N)
print("weights", weights)
bhp = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6, ))
vale = np.loadtxt("VALE.csv", delimiter=",", unpack=True, usecols=(6,))
bhp_returns = np.diff(bhp)/bhp[:-1]
vale_returns = np.diff(vale)/vale[:-1]
smooth_bhp = np.convolve(weights/np.sum(weights), bhp_returns[N-1:-N+1])
smooth_vale = np.convolve(weights/np.sum(weights), vale_returns[N-1:-N+1])

# 画图
t = np.arange(N-1, len(bhp_returns))
plot(t, bhp_returns[N-1:],lw = 1.0)
plot(t, smooth_bhp, lw = 2.0)
plot(t, vale_returns[N-1:], lw = 3.0)
plot(t, smooth_vale, lw = 4.0)
show()

# 拟合
# K = int(sys[argv[1]])
K = 10  # 表示拟合的多项式的阶数
poly_bhp = np.polyfit(t, smooth_bhp, K)
poly_vale = np.polyfit(t, smooth_vale, K)

poly_sub = np.polysub(poly_bhp, poly_vale)
xpoints = np.roots(poly_sub)
print("xpoints:", xpoints)

reals = np.isreal(xpoints)
print(reals)

xpoints = np.select([reals], [xpoints])
xpoints = xpoints.real
print("xpoints", xpoints)


# print("去0:", np.trim_zeros(xpoints))
print("用nonzero去0:", np.nonzero(xpoints))# 显示索引
xpoints_without_zero = xpoints[np.nonzero(xpoints)]
print("最终去0结果:", xpoints_without_zero)

勇敢出发:尝试各种平滑函数

请尝试使用其他的平滑函数,如hamming、blackman、bartlett以及kaiser。它们的使用方法和hanning函数类似。


HammingBlackmanBartlettKaiser 都是常见的窗函数(Window Function),在信号处理、特别是频谱分析和滤波器设计中,通常用于减少信号在变换时的频谱泄漏(spectral leakage)。这些窗函数通过加权的方式使信号的两端趋近于零,从而减少频谱泄漏并提高频域分析的精度。

  1. Hamming 窗函数
  • Hamming 窗函数可以通过 np.hamming(N) 来获取。
  1. Blackman 窗函数
  • Blackman 窗函数可以通过 np.blackman(N) 来获取。
  1. Bartlett 窗函数
  • Bartlett 窗函数可以通过 np.bartlett(N) 来获取。
  1. Kaiser 窗函数
  • Kaiser 窗函数需要一个额外的参数 \(\beta\),它可以通过 np.kaiser(N, beta) 来获取。beta 是控制窗函数形状的参数,通常它越大,旁瓣抑制效果越强,但主瓣宽度也会变宽。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import sys
import numpy as np
from matplotlib.pyplot import plot  
from matplotlib.pyplot import show

# N = int(sys.argv[1])
N = 8  # 窗口长度

# 选择不同的窗函数
# 可以切换使用不同的窗函数,按需选择:
# weights = np.hamming(N)  # 这里使用 Hamming 窗
weights = np.blackman(N)  # 这里使用 Blackman 窗
# weights = np.bartlett(N)  # 这里使用 Bartlett 窗
# weights = np.kaiser(N, beta=14)  # 这里使用 Kaiser 窗,beta 可调节

print("weights", weights)

# 读取股票数据
bhp = np.loadtxt("BHP.csv", delimiter=",", unpack=True, usecols=(6, ))
vale = np.loadtxt("VALE.csv", delimiter=",", unpack=True, usecols=(6,))

# 计算收益率(差分计算)
bhp_returns = np.diff(bhp) / bhp[:-1]
vale_returns = np.diff(vale) / vale[:-1]

# 使用卷积平滑
smooth_bhp = np.convolve(weights / np.sum(weights), bhp_returns[N-1:-N+1])
smooth_vale = np.convolve(weights / np.sum(weights), vale_returns[N-1:-N+1])

# 绘制图形
t = np.arange(N-1, len(bhp_returns))
plot(t, bhp_returns[N-1:], lw=1.0, label="BHP Returns")
plot(t, smooth_bhp, lw=2.0, label="Smoothed BHP Returns")
plot(t, vale_returns[N-1:], lw=3.0, label="VALE Returns")
plot(t, smooth_vale, lw=4.0, label="Smoothed VALE Returns")
show()

# 拟合
K = 10  # 拟合多项式的阶数
poly_bhp = np.polyfit(t, smooth_bhp, K)
poly_vale = np.polyfit(t, smooth_vale, K)

# 计算两者的差值
poly_sub = np.polysub(poly_bhp, poly_vale)

# 求解根
xpoints = np.roots(poly_sub)
print("xpoints:", xpoints)

# 检查哪些根是实数
reals = np.isreal(xpoints)
print(reals)

# 选择实数根
xpoints = np.select([reals], [xpoints])
xpoints = xpoints.real
print("xpoints", xpoints)

# 使用 np.nonzero 去除零点
print("用nonzero去0:", np.nonzero(xpoints))  # 显示非零根的索引
xpoints_without_zero = xpoints[np.nonzero(xpoints)]
print("最终去0结果:", xpoints_without_zero)

Hamming 窗:

image-20250207162246059

Blackman 窗:

image-20250207162313837

Bartlett 窗:

image-20250207162344588

Kaiser 窗,beta 可调节:

image-20250207162411852


4.11 本章小结

在本章中,我们使用corrcoef函数计算了两只股票收益率的相关性。另外,我们还顺便学习了diagonal和trace函数的用法,分别可以给出矩阵的对角线元素和矩阵的迹。

我们使用polyfit函数拟合一系列数据点,用polyval函数计算多项式函数的取值,roots函数求解多项式的根,以及polyder函数求解多项式函数的导函数。

希望通过本章的内容,可以帮助读者提高工作效率,以便更好地学习下一章中矩阵和通用函数(ufuncs)的相关内容。