· 3 years ago · May 31, 2022, 10:00 PM
1import evdev
2import asyncio
3import aiohttp
4import json
5from os import walk
6import logging
7from logging import getLogger
8import defs
9
10config = defs.params().parameters
11
12DEBUG = False
13
14# Map of key press codes and key names sent by the remote control.
15KEY_MAP = {
16 165: "KEY_PREVIOUS_SONG",
17 164: "KEY_PLAY_PAUSE",
18 163: "KEY_NEXT_SONG",
19 115: "KEY_VOLUMEUP",
20 114: "KEY_VOLUMEDOWN",
21 582: "KEY_SEARCH",
22 158: "KEY_RETURN",
23 172: "KEY_HOME",
24 113: "KEY_MUTE",
25 240: "KEY_MOUSE",
26 116: "KEY_POWER",
27 105: "KEY_LEFT",
28 106: "KEY_RIGHT",
29 103: "KEY_UP",
30 108: "KEY_DOWN",
31 28: "KEY_ENTER",
32 109: "KEY_COMPOSE",
33 14: "KEY_DEL",
34 127: "KEY_LIGHT",
35 11 : "REM_KEY_0",
36 2 : "REM_KEY_1",
37 3 : "REM_KEY_2",
38 4 : "REM_KEY_3",
39 5 : "REM_KEY_4",
40 6 : "REM_KEY_5",
41 7 : "REM_KEY_6",
42 8 : "REM_KEY_7",
43 9 : "REM_KEY_8",
44 10 : "REM_KEY_9",
45 104 : "KEY_BACKSPACE",
46 272: "KEY_X",
47 273: "KEY_Y"
48}
49
50# Some keys are mapped to other keys to achieve the functionality we want.
51KEY_RE = {
52 "KEY_PREVIOUS_SONG" : "KEY_PREVIOUS_SONG",
53 "KEY_PLAY_PAUSE" : "KEY_PLAY_PAUSE",
54 "KEY_NEXT_SONG" : "KEY_NEXT_SONG",
55 "KEY_VOLUMEUP" : "KEY_VOLUMEUP",
56 "KEY_VOLUMEDOWN" : "KEY_VOLUMEDOWN",
57 "KEY_SEARCH" : "KEY_BACK",
58 "KEY_RETURN" : "KEY_HOME",
59 "KEY_HOME" : "KEY_COMPOSE",
60 "KEY_MUTE" : "KEY_MUTE",
61 "KEY_MOUSE" : "KEY_F5",
62 "KEY_POWER" : "KEY_POWER",
63 "KEY_LEFT" : "KEY_LEFT",
64 "KEY_RIGHT" : "KEY_RIGHT",
65 "KEY_UP" : "KEY_UP",
66 "KEY_DOWN" : "KEY_DOWN",
67 "KEY_ENTER" : "KEY_ENTER",
68 "KEY_COMPOSE" : "KEY_SETTINGS",
69 "KEY_BACKSPACE" : "KEY_F5",
70 "KEY_LIGHT" : "KEY_LIGHT",
71 "REM_KEY_0" : "REM_KEY_0",
72 "REM_KEY_1" : "REM_KEY_1",
73 "REM_KEY_2" : "REM_KEY_2",
74 "REM_KEY_3" : "REM_KEY_3",
75 "REM_KEY_4" : "REM_KEY_4",
76 "REM_KEY_5" : "REM_KEY_5",
77 "REM_KEY_6" : "REM_KEY_6",
78 "REM_KEY_7" : "REM_KEY_7",
79 "REM_KEY_8" : "REM_KEY_8",
80 "REM_KEY_9" : "REM_KEY_9",
81 "KEY_DEL" : "KEY_DEL",
82 "KEY_X" : "KEY_ENTER",
83 "KEY_Y" : "KEY_HOME"
84}
85
86KEY_STATE = ['up', 'down', 'hold']
87DEVICE_INFO_LOG = "Using device {} from {}{}."
88DEVICE_INFO_NOT_FOUND = "Device {} not found."
89
90# Since the remote does not send repeating keypresses for some keys, we implement our own reapeating keys function.
91REPEATING_KEYS = config['repeating_keys']
92API_KEY = config['ha_token'] # Long-lived access token for HA.
93EVENT_NAME = config['event_name'] # Name of event fired in HA.
94API_ENDPOINT = config['ha_url'] + EVENT_NAME # API event endpoint for HA.
95REPEAT_DELAY = config['repeat_delay'] # Delay in seconds for repeating keys.
96WAIT = config['initial_delay'] # Initial delay before repating keys start to repeat.
97 # initial delay = REPEAT_DELAY x WAIT seconds.
98
99HEADERS = {'content-type': 'application/json','Authorization': 'Bearer {}'.format(API_KEY)}
100CMD = "cmd"
101CMD_TYPE = "cmd_type"
102CMD_TYPE_DOWN = "down"
103EVENT = "event"
104DEV_PATH = "/input/"
105state = "up" # Initial state for the key_repeater function.
106
107
108def find_device(device_name):
109 """ Function to find the device path from the device name. """
110
111 _, _, filenames = next(walk(DEV_PATH))
112 for input_device in filenames:
113 if EVENT in input_device:
114 device = evdev.InputDevice(DEV_PATH + input_device)
115 if device.name == device_name:
116 return input_device
117 return ""
118
119async def key_repeater(cmd, cmd_type, session):
120 """ Function to send repeating keypresses for some buttons. """
121
122 global state
123 cnt = 0
124 flag = False
125
126 # Repeat keypress until the button is released.
127 while state == CMD_TYPE_DOWN:
128 # Wait a bit before starting repeating keypresses.
129 if cnt > WAIT:
130 # Prepare the payload for the HA event.
131 payload = {CMD: KEY_RE[cmd], CMD_TYPE: " " + state}
132 # Fire the event in HA.
133 resp = await session.post(API_ENDPOINT, data=json.dumps(payload), headers=HEADERS)
134 await resp.text()
135 await asyncio.sleep(REPEAT_DELAY)
136 cnt = cnt + 1
137
138async def handle_events(device):
139 """ Function to listen to peripheral device
140 events using the evdev module. """
141
142 # Get exclusive access to the device.
143 device.grab()
144 log.info(device.name)
145 global state
146 task_exist = False
147 # Create a context manager for async posting of events to HA.
148 async with aiohttp.ClientSession() as session:
149
150 # Create a context manager for reading evdev events.
151 async for event in device.async_read_loop():
152 if DEBUG:
153 log.info(event)
154
155 # Only process keypress events.
156 if event.type == evdev.ecodes.EV_KEY:
157
158 # Get the event code and the event type.
159 key_num = evdev.categorize(event).scancode
160 cmd_type = KEY_STATE[evdev.categorize(event).keystate]
161
162 # Prepare the event payload for HA.
163 payload = {CMD: KEY_RE[KEY_MAP[key_num]], CMD_TYPE: " " + cmd_type}
164 if DEBUG:
165 log.info(payload)
166
167 # Fire the event in HA.
168 resp = await session.post(API_ENDPOINT, data=json.dumps(payload), headers=HEADERS)
169 await resp.text()
170 if DEBUG:
171 log.info(payload)
172 log.info(device.name)
173
174 # Check if a repeating key was pressed.
175 if KEY_MAP[key_num] in REPEATING_KEYS and (cmd_type == CMD_TYPE_DOWN ):
176 state = cmd_type
177 # Start task to repeat keypress until the button is released.
178 task = loop.create_task(key_repeater(KEY_MAP[key_num], state, session))
179 task_exist = True
180 else:
181 state = cmd_type
182 # Cancel the repeating task when the button is released.
183 if task_exist:
184 task.cancel()
185
186 # Close the aiohttp session.
187 await session.close()
188
189if __name__ == "__main__":
190 # Setup logging.
191 log = getLogger(__name__)
192 logging.getLogger('asyncio').setLevel(logging.DEBUG)
193 logging.basicConfig(level=logging.DEBUG)
194
195 devices = []
196
197 # Find the device path for all devices and add to devices list.
198 for name in config['devices']:
199 path = find_device(name)
200 if name != "":
201 device = evdev.InputDevice(DEV_PATH + path)
202 devices.append(device)
203 log.info(DEVICE_INFO_LOG.format(name, DEV_PATH, path))
204 else:
205 log.info(DEVICE_INFO_NOT_FOUND.format(name))
206
207 # Create handle_event tasks for all devices.
208 for device in devices:
209 asyncio.ensure_future(handle_events(device))
210
211 loop = asyncio.get_event_loop()
212 loop.run_forever()