· 4 months ago · May 30, 2025, 04:25 PM
1import websocket
2import json
3import os
4import time
5from datetime import datetime, timezone
6
7# --- ANSI Color Codes ---
8GREEN = '\033[92m'
9RED = '\033[91m'
10RESET = '\033[0m' # Resets the color
11YELLOW = '\033[93m' # For no change or initial display
12BLUE = '\033[94m' # For TRADE label
13CYAN = '\033[96m' # For QUOTE label
14
15# --- TROUBLESHOOTING NOTE ---
16# "insufficient subscription (Code: 409)" from Alpaca means your API key
17# doesn't have the necessary subscription for the chosen data feed.
18# Ensure SOCKET_URL below is set appropriately (likely IEX_SOCKET_URL for paper trading).
19# --- END TROUBLESHOOTING NOTE ---
20
21# --- Configuration ---
22API_KEY = os.environ.get('APCA_API_KEY_ID') or "YOUR_API_KEY"
23API_SECRET = os.environ.get('APCA_API_SECRET_KEY') or "YOUR_API_SECRET"
24TICKER_SYMBOL = "GME"
25
26SIP_SOCKET_URL = "wss://stream.data.alpaca.markets/v2/sip"
27IEX_SOCKET_URL = "wss://stream.data.alpaca.markets/v2/iex"
28
29# CHOOSE YOUR SOCKET URL HERE:
30# For paper trading, use IEX_SOCKET_URL.
31# SOCKET_URL = SIP_SOCKET_URL
32SOCKET_URL = IEX_SOCKET_URL # Set for IEX based on paper account
33
34# --- Global Variables ---
35authenticated = False
36# Store the last prices to determine color changes
37last_trade_price = 0.0
38last_bid_price = 0.0
39last_ask_price = 0.0
40
41
42def get_price_color(current_price, last_price):
43 """Determines the color based on price change."""
44 if last_price == 0.0: # First price update
45 return YELLOW
46 if current_price is None: # current_price might be None if parsing failed
47 return RESET # Default color if price is None
48 if current_price > last_price:
49 return GREEN
50 elif current_price < last_price:
51 return RED
52 else:
53 return YELLOW
54
55def format_timestamp_iso(iso_timestamp_str):
56 """Formats an ISO 8601 timestamp string (used for both trades and quotes from IEX)."""
57 if iso_timestamp_str:
58 try:
59 if iso_timestamp_str.endswith('Z'):
60 dt_object = datetime.fromisoformat(iso_timestamp_str.replace('Z', '+00:00'))
61 else:
62 dt_object = datetime.fromisoformat(iso_timestamp_str)
63 if dt_object.tzinfo is None:
64 dt_object = dt_object.replace(tzinfo=timezone.utc)
65 return dt_object.strftime('%H:%M:%S.%f')[:-3] + " UTC"
66 except ValueError as e:
67 print(f"{RED}Error converting ISO timestamp '{iso_timestamp_str}': {e}{RESET}")
68 return "Invalid Timestamp"
69 return "N/A"
70
71
72def on_open(ws):
73 global authenticated, last_trade_price, last_bid_price, last_ask_price
74 authenticated = False
75 last_trade_price = 0.0
76 last_bid_price = 0.0
77 last_ask_price = 0.0
78 print("WebSocket connection opened.")
79 if not API_KEY or API_KEY == "YOUR_API_KEY" or not API_SECRET or API_SECRET == "YOUR_API_SECRET":
80 print(f"{RED}Error: API_KEY or API_SECRET not configured. Please set them.{RESET}")
81 ws.close()
82 return
83
84 auth_data = {"action": "auth", "key": API_KEY, "secret": API_SECRET}
85 ws.send(json.dumps(auth_data))
86 print(f"Sent authentication request for API Key ID starting with: {API_KEY[:4]}...")
87
88def on_message(ws, message):
89 global authenticated, last_trade_price, last_bid_price, last_ask_price
90 try:
91 data_list = json.loads(message)
92 for item in data_list:
93 message_type = item.get("T")
94
95 if message_type == "success" and item.get("msg") == "authenticated":
96 authenticated = True
97 print(f"{GREEN}Successfully authenticated!{RESET}")
98 subscribe_message = {
99 "action": "subscribe",
100 "trades": [TICKER_SYMBOL],
101 "quotes": [TICKER_SYMBOL],
102 }
103 ws.send(json.dumps(subscribe_message))
104 print(f"Subscribing to trades and quotes for {TICKER_SYMBOL}...")
105
106 elif message_type == "error":
107 error_msg = item.get('msg')
108 error_code = item.get('code')
109 print(f"{RED}API Error: {error_msg} (Code: {error_code}){RESET}")
110 if error_code == 409:
111 print(f"{RED}This 'insufficient subscription' error means your Alpaca account/API key "
112 f"doesn't have access to this data feed. Check SOCKET_URL and your Alpaca plan.{RESET}")
113 ws.close()
114
115 elif message_type == "subscription":
116 print(f"Subscription update: Trades: {item.get('trades', [])}, "
117 f"Quotes: {item.get('quotes', [])}, Bars: {item.get('bars', [])}")
118
119 elif authenticated:
120 symbol = item.get("S")
121 if symbol != TICKER_SYMBOL:
122 continue
123
124 if message_type == "t": # Trade message
125 price = item.get("p")
126 size = item.get("s")
127 raw_timestamp = item.get("t")
128 readable_time = format_timestamp_iso(raw_timestamp)
129
130 price_color = get_price_color(price, last_trade_price)
131 price_num = price if price is not None else 0.0
132
133 # Infer trade side
134 trade_side_indicator = f"{YELLOW}[CROSS]{RESET}" # Default
135 if price is not None:
136 if last_ask_price > 0 and price >= last_ask_price:
137 trade_side_indicator = f"{GREEN}[BUY]{RESET}"
138 elif last_bid_price > 0 and price <= last_bid_price:
139 trade_side_indicator = f"{RED}[SELL]{RESET}"
140
141 print(f"{BLUE}TRADE{RESET} [{symbol}]: {trade_side_indicator} Price: {price_color}${price_num:<7.2f}{RESET} "
142 f"Size: {size:<5} Time: {readable_time}")
143
144 if price is not None:
145 last_trade_price = price
146
147
148 elif message_type == "q": # Quote message
149 bid_price = item.get("bp")
150 bid_size = item.get("bs")
151 ask_price = item.get("ap")
152 ask_size = item.get("as")
153 raw_timestamp = item.get("t")
154 readable_time = format_timestamp_iso(raw_timestamp)
155
156 bid_color = get_price_color(bid_price, last_bid_price)
157 ask_color = get_price_color(ask_price, last_ask_price)
158
159 bid_price_num = bid_price if bid_price is not None else 0.0
160 ask_price_num = ask_price if ask_price is not None else 0.0
161
162 print(f"{CYAN}QUOTE{RESET} [{symbol}]: Bid: {bid_color}${bid_price_num:<7.2f}{RESET} (Size: {bid_size:<4}) "
163 f"Ask: {ask_color}${ask_price_num:<7.2f}{RESET} (Size: {ask_size:<4}) Time: {readable_time}")
164
165 if bid_price is not None:
166 last_bid_price = bid_price
167 if ask_price is not None:
168 last_ask_price = ask_price
169
170 elif not (message_type == "success" and item.get("msg") == "connected"):
171 print(f"Other data: {item}")
172
173 elif not (message_type == "success" and item.get("msg") == "connected"):
174 print(f"System message: {item}")
175
176 except json.JSONDecodeError:
177 print(f"{RED}Failed to decode JSON: {message}{RESET}")
178 except Exception as e:
179 print(f"{RED}An error occurred in on_message: {e!r}{RESET}")
180
181def on_error(ws, error):
182 print(f"{RED}WebSocket error: {error}{RESET}")
183
184def on_close(ws, close_status_code, close_msg):
185 global authenticated
186 authenticated = False
187 print(f"WebSocket connection closed. Code: {close_status_code}, Message: {close_msg if close_msg else 'N/A'}")
188
189def main():
190 print("Starting Alpaca WebSocket client...")
191 print(f"Connecting to: {SOCKET_URL} for ticker: {TICKER_SYMBOL}")
192
193 if API_KEY == "YOUR_API_KEY" or API_SECRET == "YOUR_API_SECRET":
194 print(f"{YELLOW}WARNING: API_KEY or API_SECRET is set to placeholder values. Please configure them.{RESET}")
195
196 ws_app = websocket.WebSocketApp(SOCKET_URL,
197 on_open=on_open,
198 on_message=on_message,
199 on_error=on_error,
200 on_close=on_close)
201 try:
202 ws_app.run_forever()
203 except KeyboardInterrupt:
204 print(f"\n{YELLOW}WebSocket client stopped by user.{RESET}")
205 except Exception as e:
206 print(f"{RED}An unexpected error occurred while running WebSocketApp: {e!r}{RESET}")
207 finally:
208 if 'ws_app' in locals() and hasattr(ws_app, 'sock') and ws_app.sock and ws_app.sock.connected:
209 ws_app.close()
210 print("WebSocket client shutdown complete.")
211
212if __name__ == "__main__":
213 main()
214