Skip to content

Commit

Permalink
【Feature】两市 ETF 成分股抓取
Browse files Browse the repository at this point in the history
  • Loading branch information
buginux committed Jul 1, 2019
1 parent dc77ca2 commit 4625497
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 31 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ marshmallow-sqlalchemy
ccxt == 1.17.191
dash==0.43.0
dash-daq==0.1.0
simplejson==3.16.0
simplejson==3.16.0
html5lib == 1.0.1
1 change: 1 addition & 0 deletions zvt/domain/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class StockCategory(enum.Enum):
concept = 'concept'
area = 'area'
main = 'main'
etf = 'etf'


class ReportPeriod(enum.Enum):
Expand Down
3 changes: 3 additions & 0 deletions zvt/domain/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class Index(MetaBase):

stocks = relationship('StockIndex', back_populates="indices")

def __repr__(self):
return f'[{self.name} - {self.code}]'


# 个股
class Stock(MetaBase):
Expand Down
191 changes: 162 additions & 29 deletions zvt/recorders/common/china_etf_list_spider.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,193 @@
# -*- coding: utf-8 -*-

import io
import re

import demjson
import requests
import pandas as pd

from zvt.api.technical import init_securities
from zvt.api.common import china_stock_code_to_id
from zvt.api.technical import init_securities, df_to_db
from zvt.domain import Provider, StockIndex, StockCategory
from zvt.recorders.consts import DEFAULT_SH_ETF_LIST_HEADER
from zvt.domain import Provider, Index
from zvt.recorders.recorder import Recorder


class ChinaETFListSpider(Recorder):
data_schema = Index
data_schema = StockIndex

def __init__(self, batch_size=10, force_update=False, sleeping_time=10, provider=Provider.EXCHANGE) -> None:
def __init__(self, batch_size=10, force_update=False, sleeping_time=2.0, provider=Provider.EXCHANGE) -> None:
self.provider = provider
super().__init__(batch_size, force_update, sleeping_time)

def run(self):
url = 'http://query.sse.com.cn/commonQuery.do?sqlId=COMMON_SSE_ZQPZ_ETFLB_L_NEW&_=1561697608673'
# 抓取沪市 ETF 列表
url = 'http://query.sse.com.cn/commonQuery.do?sqlId=COMMON_SSE_ZQPZ_ETFLB_L_NEW'
response = requests.get(url, headers=DEFAULT_SH_ETF_LIST_HEADER)
response_dict = demjson.decode(response.text)

resp = requests.get(url, headers=DEFAULT_SH_ETF_LIST_HEADER)
self.download_etf_list(response=resp, exchange='sh')
df = pd.DataFrame(response_dict.get('result', []))
self.persist_etf_list(df, exchange='sh')
self.logger.info('沪市 ETF 列表抓取完成...')

url = 'http://www.szse.cn/api/report/ShowReport?SHOWTYPE=xlsx&CATALOGID=1105&TABKEY=tab1&selectJjlb=ETF'
# 抓取沪市 ETF 成分股
self.download_sh_etf_component(df)
self.logger.info('沪市 ETF 成分股抓取完成...')

resp = requests.get(url)
self.download_etf_list(response=resp, exchange='sz')
# 抓取深市 ETF 列表
url = 'http://www.szse.cn/api/report/ShowReport?SHOWTYPE=xlsx&CATALOGID=1945'
response = requests.get(url)

def download_etf_list(self, response: requests.Response, exchange: str) -> None:
df = None
df = pd.read_excel(io.BytesIO(response.content), dtype=str)
self.persist_etf_list(df, exchange='sz')
self.logger.info('深市 ETF 列表抓取完成...')

# 抓取深市 ETF 成分股
self.download_sz_etf_component(df)
self.logger.info('沪市 ETF 成分股抓取完成...')

def persist_etf_list(self, df: pd.DataFrame, exchange: str):
if df is None:
return

df = df.copy()
if exchange == 'sh':
df = df[['FUND_ID', 'FUND_NAME']]
elif exchange == 'sz':
df = df[['证券代码', '证券简称']]

df.columns = ['code', 'name']
df['id'] = df['code'].apply(lambda code: f'index_{exchange}_{code}')
df['exchange'] = exchange
df['type'] = 'index'
df['category'] = StockCategory.etf.value

df = df.dropna(axis=0, how='any')
df = df.drop_duplicates(subset='id', keep='last')

init_securities(df, security_type='index', provider=self.provider)

def download_sh_etf_component(self, df: pd.DataFrame):
"""
ETF_CLASS => 1. 单市场 ETF 2.跨市场 ETF 3. 跨境 ETF
5. 债券 ETF 6. 黄金 ETF
:param df: ETF 列表数据
:return: None
"""
query_url = 'http://query.sse.com.cn/infodisplay/queryConstituentStockInfo.do?' \
'isPagination=false&type={}&etfClass={}'

etf_df = df[(df['ETF_CLASS'] == '1') | (df['ETF_CLASS'] == '2')]
etf_df = self.populate_sh_etf_type(etf_df)

for _, etf in etf_df.iterrows():
url = query_url.format(etf['ETF_TYPE'], etf['ETF_CLASS'])
response = requests.get(url, headers=DEFAULT_SH_ETF_LIST_HEADER)
response_dict = demjson.decode(response.text)
response_df = pd.DataFrame(response_dict.get('result', []))

etf_code = etf['FUND_ID']
index_id = f'index_sh_{etf_code}'
response_df = response_df[['instrumentId']]
response_df['id'] = response_df['instrumentId'].apply(lambda code: f'{index_id}_{china_stock_code_to_id(code)}')
response_df['stock_id'] = response_df['instrumentId'].apply(lambda code: china_stock_code_to_id(code))
response_df['index_id'] = index_id
response_df.drop('instrumentId', axis=1, inplace=True)

df_to_db(data_schema=self.data_schema, df=response_df, provider=self.provider)
self.logger.info(f'{etf["FUND_NAME"]} - {etf_code} 成分股抓取完成...')

self.sleep()

def download_sz_etf_component(self, df: pd.DataFrame):
query_url = 'http://vip.stock.finance.sina.com.cn/corp/go.php/vII_NewestComponent/indexid/{}.phtml'

self.parse_sz_etf_underlying_index(df)
for _, etf in df.iterrows():
underlying_index = etf['拟合指数']
etf_code = etf['证券代码']

if len(underlying_index) == 0:
self.logger.info(f'{etf["证券简称"]} - {etf_code} 非 A 股市场指数,跳过...')
continue

url = query_url.format(underlying_index)
response = requests.get(url)
response.encoding = 'gbk'

try:
dfs = pd.read_html(response.text, header=1)
except ValueError as error:
self.logger.error(f'HTML parse error: {error}, response: {response.text}')
continue

if len(dfs) < 4:
continue

response_df = dfs[3].copy()
response_df = response_df.dropna(axis=1, how='any')
response_df['品种代码'] = response_df['品种代码'].apply(lambda x: f'{x:06d}')

index_id = f'index_sz_{etf_code}'
response_df = response_df[['品种代码']]

response_df['id'] = response_df['品种代码'].apply(lambda code: f'{index_id}_{china_stock_code_to_id(code)}')
response_df['stock_id'] = response_df['品种代码'].apply(lambda code: china_stock_code_to_id(code))
response_df['index_id'] = index_id
response_df.drop('品种代码', axis=1, inplace=True)

df_to_db(data_schema=self.data_schema, df=response_df, provider=self.provider)
self.logger.info(f'{etf["证券简称"]} - {etf_code} 成分股抓取完成...')

self.sleep()

@staticmethod
def populate_sh_etf_type(df: pd.DataFrame):
"""
填充沪市 ETF 代码对应的 TYPE 到列表数据中
:param df: ETF 列表数据
:return: 包含 ETF 对应 TYPE 的列表数据
"""
query_url = 'http://query.sse.com.cn/infodisplay/queryETFNewAllInfo.do?' \
'isPagination=false&type={}&pageHelp.pageSize=25'

type_df = pd.DataFrame()
for etf_class in [1, 2]:
url = query_url.format(etf_class)
response = requests.get(url, headers=DEFAULT_SH_ETF_LIST_HEADER)
response_dict = demjson.decode(response.text)
df = pd.DataFrame(response_dict['result'])
if df is not None:
df = df[['FUND_ID', 'FUND_NAME']]
df.columns = ['code', 'name']
response_df = pd.DataFrame(response_dict.get('result', []))
response_df = response_df[['fundid1', 'etftype']]

elif exchange == 'sz':
df = pd.read_excel(io.BytesIO(response.content), dtype=str, parse_dates=['上市日期'])
if df is not None:
df = df[['基金代码', '基金简称', '上市日期']]
df.columns = ['code', 'name', 'timestamp']
type_df = pd.concat([type_df, response_df])

result_df = df.copy()
result_df = result_df.sort_values(by='FUND_ID').reset_index(drop=True)
type_df = type_df.sort_values(by='fundid1').reset_index(drop=True)

result_df['ETF_TYPE'] = type_df['etftype']

return result_df

if df is not None:
df['id'] = df['code'].apply(lambda x: f'index_{exchange}_{x}')
df['exchange'] = exchange
df['type'] = 'index'
df['category'] = 'etf'
@staticmethod
def parse_sz_etf_underlying_index(df: pd.DataFrame):
"""
解析深市 ETF 对应跟踪的指数代码
:param df: ETF 列表数据
:return: 解析完成 ETF 对应指数代码的列表数据
"""
def parse_index(text):
if len(text) == 0:
return ''

df = df.dropna(axis=0, how='any')
df = df.drop_duplicates(subset='id', keep='last')
result = re.search(r"(\d+).*", text)
if result is None:
return ''
else:
return result.group(1)

init_securities(df, security_type='index', provider=self.provider)
df['拟合指数'] = df['拟合指数'].apply(parse_index)


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion zvt/recorders/common/china_stock_list_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ def download_stock_list(self, response, exchange):


if __name__ == '__main__':
spider = ChinaStockListSpider(provider=Provider.EASTMONEY)
spider = ChinaStockListSpider(provider=Provider.EXCHANGE)
spider.run()

0 comments on commit 4625497

Please sign in to comment.