我需要帮助的问题是:如何循环代码,以便它从csv文件中一个接一个地提取列?在我的excel文件中,r和m各有一列(单元格数相等)。我希望代码执行一个计算的方式与现在相同,然后跳转到csv m和r中的第二列并执行相同的计算-我需要能够对所有列重复此过程(我在两个文件中都有大约1300列)。你能告诉我怎么做吗?你知道吗
代码
import math
import numpy
"""
Note - for some of the metrics the absolute value is returns. This is because if the risk (loss) is higher we want to
discount the expected excess return from the portfolio by a higher amount. Therefore risk should be positive.
"""
def vol(returns):
# Return the standard deviation of returns
return numpy.std(returns)
def beta(returns, market):
# Create a matrix of [returns, market]
m = numpy.matrix([returns, market])
# Return the covariance of m divided by the standard deviation of the market returns
return numpy.cov(m)[0][1] / numpy.std(market)
def lpm(returns, threshold, order):
# This method returns a lower partial moment of the returns
# Create an array he same length as returns containing the minimum return threshold
threshold_array = numpy.empty(len(returns))
threshold_array.fill(threshold)
# Calculate the difference between the threshold and the returns
diff = threshold_array - returns
# Set the minimum of each to 0
diff = diff.clip(min=0)
# Return the sum of the different to the power of order
return numpy.sum(diff ** order) / len(returns)
def hpm(returns, threshold, order):
# This method returns a higher partial moment of the returns
# Create an array he same length as returns containing the minimum return threshold
threshold_array = numpy.empty(len(returns))
threshold_array.fill(threshold)
# Calculate the difference between the returns and the threshold
diff = returns - threshold_array
# Set the minimum of each to 0
diff = diff.clip(min=0)
# Return the sum of the different to the power of order
return numpy.sum(diff ** order) / len(returns)
def var(returns, alpha):
# This method calculates the historical simulation var of the returns
sorted_returns = numpy.sort(returns)
# Calculate the index associated with alpha
index = int(alpha * len(sorted_returns))
# VaR should be positive
return abs(sorted_returns[index])
def cvar(returns, alpha):
# This method calculates the condition VaR of the returns
sorted_returns = numpy.sort(returns)
# Calculate the index associated with alpha
index = int(alpha * len(sorted_returns))
# Calculate the total VaR beyond alpha
sum_var = sorted_returns[0]
for i in range(1, index):
sum_var += sorted_returns[i]
# Return the average VaR
# CVaR should be positive
return abs(sum_var / index)
def prices(returns, base):
# Converts returns into prices
s = [base]
for i in range(len(returns)):
s.append(base * (1 + returns[i]))
return numpy.array(s)
def dd(returns, tau):
# Returns the draw-down given time period tau
values = prices(returns, 100)
pos = len(values) - 1
pre = pos - tau
drawdown = float('+inf')
# Find the maximum drawdown given tau
while pre >= 0:
dd_i = (values[pos] / values[pre]) - 1
if dd_i < drawdown:
drawdown = dd_i
pos, pre = pos - 1, pre - 1
# Drawdown should be positive
return abs(drawdown)
def max_dd(returns):
# Returns the maximum draw-down for any tau in (0, T) where T is the length of the return series
max_drawdown = float('-inf')
for i in range(0, len(returns)):
drawdown_i = dd(returns, i)
if drawdown_i > max_drawdown:
max_drawdown = drawdown_i
# Max draw-down should be positive
return abs(max_drawdown)
def average_dd(returns, periods):
# Returns the average maximum drawdown over n periods
drawdowns = []
for i in range(0, len(returns)):
drawdown_i = dd(returns, i)
drawdowns.append(drawdown_i)
drawdowns = sorted(drawdowns)
total_dd = abs(drawdowns[0])
for i in range(1, periods):
total_dd += abs(drawdowns[i])
return total_dd / periods
def average_dd_squared(returns, periods):
# Returns the average maximum drawdown squared over n periods
drawdowns = []
for i in range(0, len(returns)):
drawdown_i = math.pow(dd(returns, i), 2.0)
drawdowns.append(drawdown_i)
drawdowns = sorted(drawdowns)
total_dd = abs(drawdowns[0])
for i in range(1, periods):
total_dd += abs(drawdowns[i])
return total_dd / periods
def treynor_ratio(er, returns, market, rf):
return (er - rf) / beta(returns, market)
def sharpe_ratio(er, returns, rf):
return (er - rf) / vol(returns)
def information_ratio(returns, benchmark):
diff = returns - benchmark
return numpy.mean(diff) / vol(diff)
def modigliani_ratio(er, returns, benchmark, rf):
np_rf = numpy.empty(len(returns))
np_rf.fill(rf)
rdiff = returns - np_rf
bdiff = benchmark - np_rf
return (er - rf) * (vol(rdiff) / vol(bdiff)) + rf
def excess_var(er, returns, rf, alpha):
return (er - rf) / var(returns, alpha)
def conditional_sharpe_ratio(er, returns, rf, alpha):
return (er - rf) / cvar(returns, alpha)
def omega_ratio(er, returns, rf, target=0):
return (er - rf) / lpm(returns, target, 1)
def sortino_ratio(er, returns, rf, target=0):
return (er - rf) / math.sqrt(lpm(returns, target, 2))
def kappa_three_ratio(er, returns, rf, target=0):
return (er - rf) / math.pow(lpm(returns, target, 3), float(1/3))
def gain_loss_ratio(returns, target=0):
return hpm(returns, target, 1) / lpm(returns, target, 1)
def upside_potential_ratio(returns, target=0):
return hpm(returns, target, 1) / math.sqrt(lpm(returns, target, 2))
def calmar_ratio(er, returns, rf):
return (er - rf) / max_dd(returns)
def sterling_ration(er, returns, rf, periods):
return (er - rf) / average_dd(returns, periods)
def burke_ratio(er, returns, rf, periods):
return (er - rf) / math.sqrt(average_dd_squared(returns, periods))
def test_risk_metrics(r, m):
print("vol =", vol(r))
print("beta =", beta(r, m))
print("hpm(0.0)_1 =", hpm(r, 0.0, 1))
print("lpm(0.0)_1 =", lpm(r, 0.0, 1))
print("VaR(0.05) =", var(r, 0.05))
print("CVaR(0.05) =", cvar(r, 0.05))
print("Drawdown(5) =", dd(r, 5))
print("Max Drawdown =", max_dd(r))
def test_risk_adjusted_metrics(r, m):
# Returns from the portfolio (r) and market (m)
# Expected return
e = numpy.mean(r)
# Risk free rate
f = 0.06
# Risk-adjusted return based on Volatility
print("Treynor Ratio =", treynor_ratio(e, r, m, f))
print("Sharpe Ratio =", sharpe_ratio(e, r, f))
print("Information Ratio =", information_r
atio(r, m))
# Risk-adjusted return based on Value at Risk
print("Excess VaR =", excess_var(e, r, f, 0.05))
print("Conditional Sharpe Ratio =", conditional_sharpe_ratio(e, r, f, 0.05))
# Risk-adjusted return based on Lower Partial Moments
print("Omega Ratio =", omega_ratio(e, r, f))
print("Sortino Ratio =", sortino_ratio(e, r, f))
print("Kappa 3 Ratio =", kappa_three_ratio(e, r, f))
print("Gain Loss Ratio =", gain_loss_ratio(r))
print("Upside Potential Ratio =", upside_potential_ratio(r))
# Risk-adjusted return based on Drawdown risk
print("Calmar Ratio =", calmar_ratio(e, r, f))
print("Sterling Ratio =", sterling_ration(e, r, f, 5))
print("Burke Ratio =", burke_ratio(e, r, f, 5))
if __name__ == "__main__":
import csv
# load r
with open(r'C:\Users\Lenovo\Documents\r.csv') as csvfile: # change your filename here
r = numpy.array([float(x[0]) for x in csv.reader(csvfile)])
# load m
with open(r'C:\Users\Lenovo\Documents\m.csv') as csvfile: # change your filename here
m = numpy.array([float(x[0]) for x in csv.reader(csvfile)])
test_risk_metrics(r, m)
test_risk_adjusted_metrics(r, m)
既然您提到每一列可能有不同的长度,那么我建议您逐行而不是逐列读取
r
和m
文件。原因是,通过不同长度的列进行迭代会有问题,但更重要的是,这也意味着我们必须将整个CSV加载到内存中,然后对这些列进行迭代。当我们逐行读取时,我们使用的内存更少,而且我们不必担心每行元素长度的变化。你知道吗由于我们是逐行阅读,我们不再需要依赖csv包。我们可以简单地将文件作为文本文件加载,并用空格、逗号或其他任何你认为合适的标点符号来分隔我们的值。在本例中,我将使用逗号分隔这些值。你知道吗
假设我们的
r_values
文件在下面,文件中的每一行表示一个值数组,这些值将提供给函数:我们的
m_values
文件是:现在在
__name__ == '__main__'
块中,我们加载文件,遍历行,同时将它们传递到test_risk_metrics
和test_risk_adjusted_metrics
函数中:最后,输出如下:
请注意,如果这是您要找的,但我希望它能解决您的问题:
假设
r
和m
的列数相同。你知道吗相关问题 更多 >
编程相关推荐