· 5 years ago · Nov 30, 2020, 06:22 PM
1'''
2https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY_EXTENDED&symbol=IBM&interval=15min&slice=year1month2&apikey=XXXXXXXXXXXXX&datatype=csv
3
4AlphaVantage NOTE:
5To ensure optimal API response speed, the trailing 2 years of intraday data is
6evenly divided into 24 "slices" - year1month1, year1month2, year1month3, ...,
7year1month11, year1month12, year2month1, year2month2, year2month3, ...,
8year2month11, year2month12.
9Each slice is a 30-day window, with year1month1 being the most recent and
10year2month12 being the farthest from today.
11By default, slice=year1month1
12
13Every time a proxy or apikey is used save the current datetime with it
14We cycle though proxies and apikeys. Checking before using them the time since
15they where last used. When we reach an apikey that has to wait we delay until
16the proper amount of time has passed.
17When we reach a proxy that has to wait we add new proxies making sure we don't
18add ones that already exist
19'''
20import os
21import math
22import os.path
23import sys
24import glob
25import asyncio
26import requests
27import urllib.request
28import pandas as pd
29
30from time import sleep
31from datetime import datetime
32from typing import List, Dict, Tuple
33from pathlib import Path
34from proxybroker import Broker
35from itertools import cycle
36
37apikeys = ['XXXXXXXXXXXXX', 'XXXXXXXXXXXXX', 'XXXXXXXXXXXXX',
38 'XXXXXXXXXXXXX', 'XXXXXXXXXXXXX']
39
40api_dicts = [] # list of dictionaries to save the last time an apikey was used
41num = len(apikeys) # number of apikeys
42delay = 60 / num # the time to wait between API calls depends on the number of apikeys
43api_idx = None # index to select the next apikey
44i = 0 # index to select the next proxy
45
46BASE_URL = 'https://www.alphavantage.co/'
47# To download the data in a subdirectory where the script is located
48modpath = os.path.dirname(os.path.abspath(sys.argv[0]))
49
50
51def getProxies(n: int) -> List[str]:
52 '''Return a list of n working proxies
53 '''
54 async def show(proxies):
55 p = []
56 while True:
57 proxy = await proxies.get()
58 if proxy is None: break
59 p.append(f'{proxy.host}:{proxy.port}')
60 return p
61
62 proxies = asyncio.Queue()
63 broker = Broker(proxies)
64 tasks = asyncio.gather(broker.find(types=['HTTPS'], limit=n), show(proxies))
65 loop = asyncio.get_event_loop()
66 return loop.run_until_complete(tasks)[1]
67
68
69def download_with_proxy(url: str) -> requests.models.Response:
70 ''' Try to download from the url with different proxies until it succeeds
71 The proxies are selected using a Round Robin method, and everytime a
72 proxy gives an error it is deleted from the list of proxies
73 '''
74 global i
75 proxies = []
76 num = 20
77 while True:
78 if len(proxies) < num:
79 proxies.extend(x for x in getProxies(num) if x not in proxies)
80 proxy = proxies[i]
81 i = (i+1) % len(proxies)
82 try:
83 response = requests.get(url, proxies={'http': proxy, 'https': proxy})
84 if response.status_code == 200:
85 break
86 except requests.exceptions.ProxyError:
87 del proxies[i]
88 i = i % len(proxies)
89 return response
90
91
92def list_to_list_of_dicts(lst: List, keyName) -> Dict:
93 return [{keyName: lst[i]} for i in range(len(lst))]
94
95
96def get_idx_list_dicts(list_dicts: List[Dict], key, value) -> int:
97 return next((i for i, dct in enumerate(list_dicts) if item[key] == value), None)
98
99
100def get_dict(list_dicts: List[Dict], key, value) -> Dict:
101 return next((item for i, item in enumerate(list_dicts) if item[i][key] == value), None)
102
103
104def append_dict(list_dicts: List[Dict], key, values: List):
105 for value in values:
106 idx = get_idx_list_dicts(list_dicts, key, value)
107 if idx is None:
108 list_dicts.append({key: value})
109
110
111def get_proxy_without_delay(proxies, proxy_dicts) -> Tuple[str, int]:
112 global i
113 global delay
114
115 while True:
116 proxy = proxies[i]
117 i = (i+1) % len(proxies)
118 proxy_dict = get_dict(proxy_dicts, 'proxy', proxy)
119 # Keep the proxy if it hasn't been used
120 # or the proper amount of time has passed since its use
121 if 'dt' not in proxy_dict.keys() or \
122 (datetime.now()-proxy_dict['dt']).total_seconds() > delay:
123 # Modify the dictionary that contains the proxy adding the current datetime
124 dict_idx = get_idx_list_dicts(proxy_dicts, 'proxy', proxy)
125 proxy_dicts[dict_idx]['dt'] = datetime.now() # update the time the proxy was used
126 break
127 sleep(1)
128 return proxy, dict_idx
129
130
131def download_with_proxy_wait(url: str) -> requests.models.Response:
132 global i
133 proxies = []
134 proxy_dicts = []
135 num = 20 # number of proxies
136
137 while True:
138 if len(proxies) < num:
139 proxies.extend(x for x in getProxies(num) if x not in proxies)
140 append_dict(proxy_dicts, 'proxy', proxies)
141
142 proxy, dict_idx = get_proxy_without_delay(proxies, proxy_dicts)
143
144 try:
145 response = requests.get(url, proxies={'http': proxy, 'https': proxy})
146 if response.status_code == 200:
147 break
148 except requests.exceptions.ProxyError:
149 del proxies[i]
150 del proxy_dicts[dict_idx]
151 i = i % len(proxies)
152 return response
153
154
155def download_with_proxy_wait_2(url: str) -> requests.models.Response:
156 global i
157 proxies = []
158 proxy_dicts = []
159 num = 20 # number of proxies
160 delay = 12 # time we have to wait to reuse a proxy
161
162 while True:
163 if len(proxies) < num:
164 proxies.extend(x for x in getProxies(num) if x not in proxies)
165 append_dict(proxy_dicts, 'proxy', proxies)
166
167 # Get a proxy without delay
168 while True:
169 proxy = proxies[i]
170 i = (i+1) % len(proxies)
171 proxy_dict = get_dict(proxy_dicts, 'proxy', proxy)
172 # Keep the proxy if it hasn't been used
173 # or the proper amount of time has passed since its use
174 if 'dt' not in proxy_dict.keys() or \
175 (datetime.now()-proxy_dict['dt']).total_seconds() > delay:
176 # Modify the dictionary that contains the proxy adding the current datetime
177 dict_idx = get_idx_list_dicts(proxy_dicts, 'proxy', proxy)
178 proxy_dicts[dict_idx]['dt'] = datetime.now() # update the time the proxy was used
179 break
180 sleep(1)
181
182 try:
183 response = requests.get(url, proxies={'http': proxy, 'https': proxy})
184 if response.status_code == 200:
185 break
186 except requests.exceptions.ProxyError:
187 del proxies[i]
188 del proxy_dicts[dict_idx]
189 i = i % len(proxies)
190 return response
191
192
193def get_apikey() -> str:
194 global apikeys
195 global api_dicts
196 global api_idx
197 global delay
198
199 if api_idx is None:
200 api_idx = 0
201 append_dict(api_dicts, 'apikey', apikeys)
202 apikey = apikeys[api_idx]
203 api_idx = (i+1) % len(apikeys)
204 api_dict = get_dict(api_dicts, 'apikey', apikey)
205
206 if 'dt' in api_dict.keys():
207 sleep(math.ceil(delay - (datetime.now()-api_dict['dt']).total_seconds()))
208
209 api_dicts[get_idx_list_dicts(api_dicts, 'apikey', apikey)]['dt'] = datetime.now()
210
211 return apikey
212
213
214def download_previous_data(
215 file: str,
216 ticker: str,
217 timeframe: str,
218 _slice: str,
219):
220 apikey = get_apikey()
221 url = f'{BASE_URL}query?function=TIME_SERIES_INTRADAY_EXTENDED&symbol={ticker}&interval={timeframe}&slice={_slice}&apikey={apikey}&datatype=csv'
222 print(f'Downloading {_slice} of {timeframe} for {ticker}...')
223 try:
224 df = pd.read_csv(url).iloc[::-1]
225 # TODO: sleep while getting wrong data
226 if os.path.exists(file):
227 pd.read_csv(file).append(df).drop_duplicates(inplace=True).to_csv(file, encoding='utf-8-sig')
228 else:
229 df.to_csv(file, mode='w', encoding='utf-8-sig')
230 except:
231 print(f"Couldn't download data for {ticker}")
232
233
234def get_tickers(filepath) -> List[str]:
235 '''Get a list of all ticker symbols
236 '''
237 tickers = []
238 if not os.path.exists(filepath):
239 url = 'https://www.alphavantage.co/query?function=LISTING_STATUS&apikey=XXXXXXXXXXXXX'
240 print('Downloading ticker symbols:')
241 urllib.request.urlretrieve(url, filepath)
242 sleep(173)
243
244 df = pd.read_csv(filepath)
245 tickers = df.loc[df['exchange'] == 'NYSE']['symbol'].tolist()
246
247 return df, tickers
248
249
250def create_download_folders(timeframes: List[str]):
251 for timeframe in timeframes:
252 download_path = f'{modpath}/{timeframe}'
253 #download_path = f'/media/user/Portable Drive/Trading/data/{timeframe}'
254 Path(download_path).mkdir(parents=True, exist_ok=True)
255
256
257def get_data():
258 filepath = f'{modpath}/list_stocks_alphavantage.csv'
259 df, tickers = get_tickers(filepath)
260 timeframes = ['1min', '5min', '15min', '30min', '60min']
261
262 #filepath = f'{modpath}/my_traded_stocks.txt'
263 #with open(filepath) as f:
264 #tickers = f.read().replace('\n', '').split(',')
265
266 # Make sure the download folders exists
267 create_download_folders(timeframes)
268
269 # For each ticker symbol download all data available for each timeframe
270 # Each download iteration has to be in a 'try except' in case the ticker symbol isn't available on alphavantage
271
272 slices = ['year2month12', 'year2month11', 'year2month10',
273 'year2month9', 'year2month8', 'year2month7',
274 'year2month6', 'year2month5', 'year2month4',
275 'year2month3', 'year2month2', 'year2month1',
276 'year1month12', 'year1month11', 'year1month10',
277 'year1month9', 'year1month8', 'year1month7',
278 'year1month6', 'year1month5', 'year1month4',
279 'year1month3', 'year1month2', 'year1month1']
280
281 for _slice in slices:
282 for ticker in tickers:
283 if ticker not in df.values:
284 print(f'{ticker} not available. Skiping.')
285 continue
286 name = df.loc[df['symbol'] == ticker, 'name'].iat[0]
287 print(f'Downloading data for {ticker}: {name}...')
288 for timeframe in timeframes:
289 download_path = f'{modpath}/{timeframe}'
290 filepath = f'{download_path}/{ticker}.csv'
291
292 download_previous_data(filepath, ticker, timeframe, _slice)
293
294
295if __name__ == '__main__':
296 get_data()