· 6 years ago · Apr 10, 2020, 10:30 AM
1import alpaca_trade_api as tradeapi
2import requests
3import time
4from ta import macd
5import numpy as np
6from datetime import datetime, timedelta
7from pytz import timezone
8
9# Replace these with your API connection info from the dashboard
10base_url = 'Your API URL'
11api_key_id = 'Your API Key'
12api_secret = 'Your API Secret'
13
14api = tradeapi.REST(
15 base_url=base_url,
16 key_id=api_key_id,
17 secret_key=api_secret
18)
19
20session = requests.session()
21
22# We only consider stocks with per-share prices inside this range
23min_share_price = 2.0
24max_share_price = 13.0
25# Minimum previous-day dollar volume for a stock we might consider
26min_last_dv = 500000
27# Stop limit to default to
28default_stop = .95
29# How much of our portfolio to allocate to any one position
30risk = 0.001
31
32
33def get_1000m_history_data(symbols):
34 print('Getting historical data...')
35 minute_history = {}
36 c = 0
37 for symbol in symbols:
38 minute_history[symbol] = api.polygon.historic_agg(
39 size="minute", symbol=symbol, limit=1000
40 ).df
41 c += 1
42 print('{}/{}'.format(c, len(symbols)))
43 print('Success.')
44 return minute_history
45
46
47def get_tickers():
48 print('Getting current ticker data...')
49 tickers = api.polygon.all_tickers()
50 print('Success.')
51 assets = api.list_assets()
52 symbols = [asset.symbol for asset in assets if asset.tradable]
53 return [ticker for ticker in tickers if (
54 ticker.ticker in symbols and
55 ticker.lastTrade['p'] >= min_share_price and
56 ticker.lastTrade['p'] <= max_share_price and
57 ticker.prevDay['v'] * ticker.lastTrade['p'] > min_last_dv and
58 ticker.todaysChangePerc >= 3.5
59 )]
60
61
62def find_stop(current_value, minute_history, now):
63 series = minute_history['low'][-100:] \
64 .dropna().resample('5min').min()
65 series = series[now.floor('1D'):]
66 diff = np.diff(series.values)
67 low_index = np.where((diff[:-1] <= 0) & (diff[1:] > 0))[0] + 1
68 if len(low_index) > 0:
69 return series[low_index[-1]] - 0.01
70 return current_value * default_stop
71
72
73def run(tickers, market_open_dt, market_close_dt):
74 # Establish streaming connection
75 conn = tradeapi.StreamConn(base_url=base_url, key_id=api_key_id, secret_key=api_secret)
76
77 # Update initial state with information from tickers
78 volume_today = {}
79 prev_closes = {}
80 for ticker in tickers:
81 symbol = ticker.ticker
82 prev_closes[symbol] = ticker.prevDay['c']
83 volume_today[symbol] = ticker.day['v']
84
85 symbols = [ticker.ticker for ticker in tickers]
86 print('Tracking {} symbols.'.format(len(symbols)))
87 minute_history = get_1000m_history_data(symbols)
88
89 portfolio_value = float(api.get_account().portfolio_value)
90
91 open_orders = {}
92 positions = {}
93
94 # Cancel any existing open orders on watched symbols
95 existing_orders = api.list_orders(limit=500)
96 for order in existing_orders:
97 if order.symbol in symbols:
98 api.cancel_order(order.id)
99
100 stop_prices = {}
101 latest_cost_basis = {}
102
103 # Track any positions bought during previous executions
104 existing_positions = api.list_positions()
105 for position in existing_positions:
106 if position.symbol in symbols:
107 positions[position.symbol] = float(position.qty)
108 # Recalculate cost basis and stop price
109 latest_cost_basis[position.symbol] = float(position.cost_basis)
110 stop_prices[position.symbol] = (
111 float(position.cost_basis) * default_stop
112 )
113
114 # Keep track of what we're buying/selling
115 target_prices = {}
116 partial_fills = {}
117
118 # Use trade updates to keep track of our portfolio
119 @conn.on(r'trade_update')
120 async def handle_trade_update(conn, channel, data):
121 symbol = data.order['symbol']
122 last_order = open_orders.get(symbol)
123 if last_order is not None:
124 event = data.event
125 if event == 'partial_fill':
126 qty = int(data.order['filled_qty'])
127 if data.order['side'] == 'sell':
128 qty = qty * -1
129 positions[symbol] = (
130 positions.get(symbol, 0) - partial_fills.get(symbol, 0)
131 )
132 partial_fills[symbol] = qty
133 positions[symbol] += qty
134 open_orders[symbol] = data.order
135 elif event == 'fill':
136 qty = int(data.order['filled_qty'])
137 if data.order['side'] == 'sell':
138 qty = qty * -1
139 positions[symbol] = (
140 positions.get(symbol, 0) - partial_fills.get(symbol, 0)
141 )
142 partial_fills[symbol] = 0
143 positions[symbol] += qty
144 open_orders[symbol] = None
145 elif event == 'canceled' or event == 'rejected':
146 partial_fills[symbol] = 0
147 open_orders[symbol] = None
148
149 @conn.on(r'A$')
150 async def handle_second_bar(conn, channel, data):
151 symbol = data.symbol
152
153 # First, aggregate 1s bars for up-to-date MACD calculations
154 ts = data.start
155 ts -= timedelta(seconds=ts.second, microseconds=ts.microsecond)
156 try:
157 current = minute_history[data.symbol].loc[ts]
158 except KeyError:
159 current = None
160 new_data = []
161 if current is None:
162 new_data = [
163 data.open,
164 data.high,
165 data.low,
166 data.close,
167 data.volume
168 ]
169 else:
170 new_data = [
171 current.open,
172 data.high if data.high > current.high else current.high,
173 data.low if data.low < current.low else current.low,
174 data.close,
175 current.volume + data.volume
176 ]
177 minute_history[symbol].loc[ts] = new_data
178
179 # Next, check for existing orders for the stock
180 existing_order = open_orders.get(symbol)
181 if existing_order is not None:
182 # Make sure the order's not too old
183 submission_ts = existing_order.submitted_at.astimezone(
184 timezone('America/New_York')
185 )
186 order_lifetime = ts - submission_ts
187 if order_lifetime.seconds // 60 > 1:
188 # Cancel it so we can try again for a fill
189 api.cancel_order(existing_order.id)
190 return
191
192 # Now we check to see if it might be time to buy or sell
193 since_market_open = ts - market_open_dt
194 until_market_close = market_close_dt - ts
195 if (
196 since_market_open.seconds // 60 > 15 and
197 since_market_open.seconds // 60 < 60
198 ):
199 # Check for buy signals
200
201 # See if we've already bought in first
202 position = positions.get(symbol, 0)
203 if position > 0:
204 return
205
206 # See how high the price went during the first 15 minutes
207 lbound = market_open_dt
208 ubound = lbound + timedelta(minutes=15)
209 high_15m = 0
210 try:
211 high_15m = minute_history[symbol][lbound:ubound]['high'].max()
212 except Exception as e:
213 # Because we're aggregating on the fly, sometimes the datetime
214 # index can get messy until it's healed by the minute bars
215 return
216
217 # Get the change since yesterday's market close
218 daily_pct_change = (
219 (data.close - prev_closes[symbol]) / prev_closes[symbol]
220 )
221 if (
222 daily_pct_change > .04 and
223 data.close > high_15m and
224 volume_today[symbol] > 30000
225 ):
226 # check for a positive, increasing MACD
227 hist = macd(
228 minute_history[symbol]['close'].dropna(),
229 n_fast=12,
230 n_slow=26
231 )
232 if (
233 hist[-1] < 0 or
234 not (hist[-3] < hist[-2] < hist[-1])
235 ):
236 return
237 hist = macd(
238 minute_history[symbol]['close'].dropna(),
239 n_fast=40,
240 n_slow=60
241 )
242 if hist[-1] < 0 or np.diff(hist)[-1] < 0:
243 return
244
245 # Stock has passed all checks; figure out how much to buy
246 stop_price = find_stop(
247 data.close, minute_history[symbol], ts
248 )
249 stop_prices[symbol] = stop_price
250 target_prices[symbol] = data.close + (
251 (data.close - stop_price) * 3
252 )
253 shares_to_buy = portfolio_value * risk // (
254 data.close - stop_price
255 )
256 if shares_to_buy == 0:
257 shares_to_buy = 1
258 shares_to_buy -= positions.get(symbol, 0)
259 if shares_to_buy <= 0:
260 return
261
262 print('Submitting buy for {} shares of {} at {}'.format(
263 shares_to_buy, symbol, data.close
264 ))
265 try:
266 o = api.submit_order(
267 symbol=symbol, qty=str(shares_to_buy), side='buy',
268 type='limit', time_in_force='day',
269 limit_price=str(data.close)
270 )
271 open_orders[symbol] = o
272 latest_cost_basis[symbol] = data.close
273 except Exception as e:
274 print(e)
275 return
276 if(
277 since_market_open.seconds // 60 >= 24 and
278 until_market_close.seconds // 60 > 15
279 ):
280 # Check for liquidation signals
281
282 # We can't liquidate if there's no position
283 position = positions.get(symbol, 0)
284 if position == 0:
285 return
286
287 # Sell for a loss if it's fallen below our stop price
288 # Sell for a loss if it's below our cost basis and MACD < 0
289 # Sell for a profit if it's above our target price
290 hist = macd(
291 minute_history[symbol]['close'].dropna(),
292 n_fast=13,
293 n_slow=21
294 )
295 if (
296 data.close <= stop_prices[symbol] or
297 (data.close >= target_prices[symbol] and hist[-1] <= 0) or
298 (data.close <= latest_cost_basis[symbol] and hist[-1] <= 0)
299 ):
300 print('Submitting sell for {} shares of {} at {}'.format(
301 position, symbol, data.close
302 ))
303 try:
304 o = api.submit_order(
305 symbol=symbol, qty=str(position), side='sell',
306 type='limit', time_in_force='day',
307 limit_price=str(data.close)
308 )
309 open_orders[symbol] = o
310 latest_cost_basis[symbol] = data.close
311 except Exception as e:
312 print(e)
313 return
314 elif (
315 until_market_close.seconds // 60 <= 15
316 ):
317 # Liquidate remaining positions on watched symbols at market
318 try:
319 position = api.get_position(symbol)
320 except Exception as e:
321 # Exception here indicates that we have no position
322 return
323 print('Trading over, liquidating remaining position in {}'.format(
324 symbol)
325 )
326 api.submit_order(
327 symbol=symbol, qty=position.qty, side='sell',
328 type='market', time_in_force='day'
329 )
330 symbols.remove(symbol)
331 if len(symbols) <= 0:
332 conn.close()
333 conn.deregister([
334 'A.{}'.format(symbol),
335 'AM.{}'.format(symbol)
336 ])
337
338 # Replace aggregated 1s bars with incoming 1m bars
339 @conn.on(r'AM$')
340 async def handle_minute_bar(conn, channel, data):
341 ts = data.start
342 ts -= timedelta(microseconds=ts.microsecond)
343 minute_history[data.symbol].loc[ts] = [
344 data.open,
345 data.high,
346 data.low,
347 data.close,
348 data.volume
349 ]
350 volume_today[data.symbol] += data.volume
351
352 channels = ['trade_updates']
353 for symbol in symbols:
354 symbol_channels = ['A.{}'.format(symbol), 'AM.{}'.format(symbol)]
355 channels += symbol_channels
356 print('Watching {} symbols.'.format(len(symbols)))
357 run_ws(conn, channels)
358
359
360# Handle failed websocket connections by reconnecting
361def run_ws(conn, channels):
362 try:
363 conn.run(channels)
364 except Exception as e:
365 print(e)
366 conn.close()
367 run_ws(conn, channels)
368
369
370if __name__ == "__main__":
371 # Get when the market opens or opened today
372 nyc = timezone('America/New_York')
373 today = datetime.today().astimezone(nyc)
374 today_str = datetime.today().astimezone(nyc).strftime('%Y-%m-%d')
375 calendar = api.get_calendar(start=today_str, end=today_str)[0]
376 market_open = today.replace(
377 hour=calendar.open.hour,
378 minute=calendar.open.minute,
379 second=0
380 )
381 market_open = market_open.astimezone(nyc)
382 market_close = today.replace(
383 hour=calendar.close.hour,
384 minute=calendar.close.minute,
385 second=0
386 )
387 market_close = market_close.astimezone(nyc)
388
389 # Wait until just before we might want to trade
390 current_dt = datetime.today().astimezone(nyc)
391 since_market_open = current_dt - market_open
392 while since_market_open.seconds // 60 <= 14:
393 time.sleep(1)
394 since_market_open = current_dt - market_open
395
396 run(get_tickers(), market_open, market_close)