· 11 months ago · Nov 11, 2024, 06:30 AM
1# backend.py
2
3import os
4import time
5import threading
6import logging
7from datetime import datetime
8
9from fastapi import FastAPI, Body
10from fastapi.responses import JSONResponse
11from fastapi.middleware.cors import CORSMiddleware
12from pydantic import BaseModel
13
14import RPi.GPIO as GPIO
15import smbus2 as smbus
16import board
17import busio
18import adafruit_tcs34725
19from simple_pid import PID
20import openpyxl
21from openpyxl import Workbook
22
23# ---------------------------- Configuration Constants ----------------------------
24
25# I2C setup for TCS34725 sensor
26i2c = busio.I2C(board.SCL, board.SDA)
27TCS34725_ADDRESS = 0x29
28
29# Register addresses for color data
30CDATAL = 0x14 # Clear (ambient light) channel
31
32# Define pin numbers for PIR, IR sensor, and LEDs
33PIR_LED_PIN = 18 # LED for PIR sensor (GPIO 18, Physical Pin 12)
34IR_LED_PIN = 22 # LED for IR sensor (GPIO 22, Physical Pin 15)
35PIR_PIN = 17 # PIR sensor pin (GPIO 17, Physical Pin 11)
36IR_PIN = 27 # IR sensor pin (GPIO 27, Physical Pin 13)
37TCS_LED_PIN = 26 # TCS sensor-controlled LED (GPIO 26, Physical Pin 37)
38RED_LED_PIN = 12 # Red LED for fault indication (GPIO 12, Physical Pin 32)
39
40# Define pin numbers and physical pins for additional LEDs
41ADDITIONAL_LED_PINS = {
42 "LED1": {"gpio": 5, "physical": 29}, # Additional LED 1 (GPIO 5, Physical Pin 29)
43 "LED2": {"gpio": 6, "detection_gpio": 21, "physical": 31}, # Additional LED 2 (GPIO 6, Physical Pin 31)
44 "LED3": {"gpio": 13, "physical": 33}, # Additional LED 3 (GPIO 13, Physical Pin 33)
45}
46
47# Fault simulation options
48FAULT_MODES = {
49 '1': 'Normal Operation',
50 '2': 'Simulate PIR Sensor Failure',
51 '3': 'Simulate IR Sensor Failure',
52 '4': 'Simulate TCS Sensor Failure',
53 '5': 'Simulate I2C Communication Failure',
54 '6': 'Simulate GPIO Output Failure',
55 '7': 'Simulate Power Issues',
56 '8': 'Simulate Delayed Response',
57 '9': 'Simulate Sensor Cross-Talk',
58 '10': 'Simulate LED1 Failure',
59 '11': 'Simulate LED2 Failure',
60 '12': 'Simulate LED3 Failure',
61 # Add more fault modes as needed
62}
63
64# Light intensity thresholds
65LOW_LIGHT_THRESHOLD = 1000 # Below this lux, LEDs should be on
66HIGH_LIGHT_THRESHOLD = 10000 # Above this lux, LEDs should be off
67
68# Time to keep the LEDs on after detecting motion or object (in seconds)
69LED_ON_TIME = 10
70
71# Excel setup
72EXCEL_FILE_PATH = "/home/iiitg/sensor_data.xlsx"
73
74# PID Controller Parameters
75INITIAL_TARGET_LUX = 500 # Initial desired lux level
76MAX_BRIGHTNESS = 100 # Max PWM duty cycle (0-100)
77PID_KP = 1.0 # Proportional gain
78PID_KI = 0.1 # Integral gain
79PID_KD = 0.05 # Derivative gain
80
81# LED Power Calculation (Assume LED draws 20mA at full brightness, voltage is 5V)
82LED_VOLTAGE = 5.0 # Assuming 5V supply for the LED
83LED_MAX_CURRENT = 0.02 # Max current of 20mA at 100% duty cycle
84
85# ---------------------------- Pydantic Models ----------------------------
86
87class FaultModeRequest(BaseModel):
88 mode: str
89
90class SetTargetLuxRequest(BaseModel):
91 target_lux: float
92
93class SetPIDRequest(BaseModel):
94 Kp: float = None
95 Ki: float = None
96 Kd: float = None
97
98class SetLEDRequest(BaseModel):
99 led: str
100 state: bool
101
102# ---------------------------- FastAPI Setup ----------------------------
103
104app = FastAPI()
105
106# Configure CORS to allow requests from your frontend
107app.add_middleware(
108 CORSMiddleware,
109 allow_origins=["http://localhost:3000"], # Update with your frontend's URL
110 allow_credentials=True,
111 allow_methods=["*"],
112 allow_headers=["*"],
113)
114
115# Configure logging
116logging.basicConfig(
117 level=logging.DEBUG,
118 format='%(asctime)s - %(levelname)s - %(message)s',
119 filename='backend.log',
120 filemode='a'
121)
122
123# ---------------------------- Shared Variables and Locks ----------------------------
124
125# Fault mode management
126fault_mode = '1' # Default to Normal Operation
127fault_mode_lock = threading.Lock()
128
129# Faults dictionary
130faults = {
131 "PIR_Sensor_Failure": False,
132 "IR_Sensor_Failure": False,
133 "TCS_Sensor_Failure": False,
134 "I2C_Communication_Failure": False,
135 "Sensor_CrossTalk": False,
136 "PIR_LED_Failure": False,
137 "IR_LED_Failure": False,
138 "TCS_LED_Failure": False,
139 "LED1_Failure": False,
140 "LED2_Failure": False,
141 "LED3_Failure": False,
142 "GPIO_Output_Failure": False,
143 "Power_Issues": False,
144 "Delayed_Response": False,
145}
146faults_lock = threading.Lock()
147
148# Manual override flags
149manual_override = {
150 'LED2': False # Only LED2 has manual override
151}
152
153# Dimming parameters
154DIM_STEP = 5 # Duty cycle increment/decrement step
155DIM_DELAY = 0.05 # Delay between dimming steps in seconds
156
157# Duty cycle trackers (excluding LED2 as it's not PWM controlled)
158current_duty = {
159 'PIR': 0,
160 'IR': 0,
161 'TCS': 0,
162 'LED1': 0,
163 'LED3': 0
164}
165
166# Fade control flags to prevent multiple fade threads (excluding LED2)
167fading = {
168 'PIR': False,
169 'IR': False,
170 'TCS': False,
171 'LED1': False,
172 'LED3': False
173}
174
175# LED2 Fault Flag
176led2_fault_flag = False
177led2_fault_lock = threading.Lock()
178
179# Global variable to store additional PWM instances
180additional_pwms = {}
181
182# ---------------------------- PID Controller Setup ----------------------------
183
184pid = PID(PID_KP, PID_KI, PID_KD, setpoint=INITIAL_TARGET_LUX)
185pid.output_limits = (0, MAX_BRIGHTNESS) # Restrict output to valid PWM range
186
187# ---------------------------- Excel Logging Setup ----------------------------
188
189# Ensure the directory exists
190directory = os.path.dirname(EXCEL_FILE_PATH)
191if not os.path.exists(directory):
192 os.makedirs(directory)
193
194# Try loading existing workbook or create a new one
195try:
196 workbook = openpyxl.load_workbook(EXCEL_FILE_PATH)
197 sheet = workbook.active
198except FileNotFoundError:
199 workbook = Workbook()
200 sheet = workbook.active
201 sheet.append(["Timestamp", "Lux", "Red", "Green", "Blue", "CCT (K)", "LED Duty Cycle (%)", "Power Consumption (W)"])
202
203# ---------------------------- Helper Functions ----------------------------
204
205def calculate_power_consumption(duty_cycle):
206 """Calculate power consumption based on duty cycle."""
207 current = LED_MAX_CURRENT * (duty_cycle / 100)
208 power = LED_VOLTAGE * current
209 return power
210
211def calculate_cct(r, g, b):
212 """Calculate Correlated Color Temperature (CCT) from RGB values using McCamy's formula."""
213 try:
214 # Normalize RGB values
215 r_norm = r / 65535
216 g_norm = g / 65535
217 b_norm = b / 65535
218
219 # Calculate the chromaticity coordinates
220 X = -0.14282 * r_norm + 1.54924 * g_norm + -0.95641 * b_norm
221 Y = -0.32466 * r_norm + 1.57837 * g_norm + -0.73191 * b_norm
222 Z = -0.68202 * r_norm + 0.77073 * g_norm + 0.56332 * b_norm
223
224 # Avoid division by zero
225 if (X + Y + Z) == 0:
226 return None
227
228 # Calculate chromaticity coordinates
229 xc = X / (X + Y + Z)
230 yc = Y / (X + Y + Z)
231
232 # Calculate n
233 n = (xc - 0.3320) / (0.1858 - yc)
234
235 # Calculate CCT using McCamy's formula
236 cct = -449 * (n * 3) + 3525 * (n * 2) - 6823.3 * n + 5520.33
237 return round(cct, 2)
238 except Exception as e:
239 # Handle unexpected errors
240 logging.error(f"Error calculating CCT: {e}")
241 return None
242
243# ---------------------------- GPIO and PWM Initialization ----------------------------
244
245def initialize_gpio():
246 global PIR_PWM, IR_PWM, TCS_PWM, additional_pwms, sensor
247
248 GPIO.setwarnings(False)
249 GPIO.setmode(GPIO.BCM)
250
251 # Set up sensor input pins with pull-down resistors
252 GPIO.setup(PIR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
253 GPIO.setup(IR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
254
255 # Set up LED output pins with initial LOW
256 GPIO.setup(PIR_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
257 GPIO.setup(IR_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
258 GPIO.setup(TCS_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
259 GPIO.setup(RED_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
260
261 # Set up power pins and detection pins for additional LEDs
262 for led_name, led_info in ADDITIONAL_LED_PINS.items():
263 gpio_pin = led_info["gpio"]
264 GPIO.setup(gpio_pin, GPIO.OUT, initial=GPIO.LOW)
265 if "detection_gpio" in led_info:
266 detection_pin = led_info["detection_gpio"]
267 GPIO.setup(detection_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
268
269 # Set up PWM for PIR and IR LEDs
270 PIR_PWM = GPIO.PWM(PIR_LED_PIN, 1000) # 1 kHz
271 IR_PWM = GPIO.PWM(IR_LED_PIN, 1000) # 1 kHz
272 PIR_PWM.start(0) # Start with LEDs off
273 IR_PWM.start(0)
274
275 # Set up PWM on the TCS LED pin with 1 kHz frequency
276 TCS_PWM = GPIO.PWM(TCS_LED_PIN, 1000)
277 TCS_PWM.start(0) # Start PWM with 0% duty cycle (LED off)
278
279 # Initialize the red LED state to off
280 GPIO.output(RED_LED_PIN, GPIO.LOW)
281
282 # Set up PWM for additional LEDs (excluding LED2)
283 additional_pwms = {}
284 for name, led_info in ADDITIONAL_LED_PINS.items():
285 if name != "LED2": # Exclude LED2 from PWM control
286 pwm_instance = GPIO.PWM(led_info["gpio"], 1000) # 1 kHz
287 pwm_instance.start(0)
288 additional_pwms[name] = pwm_instance
289
290 # Initialize TCS34725 sensor
291 initialize_tcs34725()
292
293 logging.info("GPIO and PWM initialized successfully.")
294
295def initialize_tcs34725():
296 global sensor
297
298 with fault_mode_lock:
299 current_mode = fault_mode
300
301 if current_mode == '4':
302 logging.warning("Simulating TCS sensor failure. Skipping initialization.")
303 print("Simulating TCS sensor failure. Skipping initialization.")
304 return
305
306 try:
307 sensor = adafruit_tcs34725.TCS34725(i2c)
308 sensor.integration_time = 700 # Maximum integration time (~700ms)
309 sensor.gain = 60 # 60x gain for increased sensitivity
310 logging.info("TCS34725 color sensor initialized with higher sensitivity settings.")
311 print("TCS34725 color sensor initialized with higher sensitivity settings.")
312 except Exception as e:
313 logging.error(f"Error initializing TCS34725: {e}")
314 print(f"Error initializing TCS34725: {e}")
315 with faults_lock:
316 faults["TCS_Sensor_Failure"] = True
317 faults["I2C_Communication_Failure"] = True
318
319def read_sensor_data():
320 with fault_mode_lock:
321 current_mode = fault_mode
322
323 if current_mode == '4':
324 # Simulating TCS sensor failure
325 logging.warning("Simulating TCS sensor failure. Returning fixed clear value.")
326 return {
327 "lux": 5000, # Fixed high lux value to simulate sensor failure
328 "r": 100,
329 "g": 100,
330 "b": 100,
331 "cct": 5000
332 }
333
334 if current_mode == '5':
335 # Simulating I2C communication failure
336 logging.error("Simulating I2C communication failure.")
337 raise IOError("I2C communication error")
338
339 try:
340 # Read sensor data
341 r, g, b, c = sensor.color_raw
342 lux = sensor.lux
343
344 # Calculate CCT
345 cct = calculate_cct(r, g, b)
346
347 with faults_lock:
348 faults["TCS_Sensor_Failure"] = False
349 faults["I2C_Communication_Failure"] = False
350
351 logging.debug(f"Sensor Data - Lux: {lux}, R: {r}, G: {g}, B: {b}, CCT: {cct}")
352 return {
353 "lux": lux,
354 "r": r,
355 "g": g,
356 "b": b,
357 "cct": cct
358 }
359 except Exception as e:
360 logging.error(f"Error reading TCS34725 data: {e}")
361 print(f"Error reading TCS34725 data: {e}")
362 with faults_lock:
363 faults["TCS_Sensor_Failure"] = True
364 faults["I2C_Communication_Failure"] = True
365 return {
366 "lux": HIGH_LIGHT_THRESHOLD, # Assume it's bright to turn off LEDs
367 "r": 0,
368 "g": 0,
369 "b": 0,
370 "cct": None
371 }
372
373def map_lux_to_duty_cycle(lux):
374 """Map the lux value to a PWM duty cycle percentage."""
375 if lux > HIGH_LIGHT_THRESHOLD:
376 return 0 # Day mode, LEDs off
377 elif lux < LOW_LIGHT_THRESHOLD:
378 target_lux = 500 # Night mode
379 else:
380 target_lux = 350 # Moderate light mode
381
382 pid.setpoint = target_lux
383 duty_cycle = pid(lux)
384 duty_cycle = max(0, min(100, duty_cycle)) # Clamp between 0 and 100
385 logging.debug(f"Mapped Lux {lux} to Duty Cycle {duty_cycle}% with Target Lux {pid.setpoint}")
386 return duty_cycle
387
388def calculate_power_consumption(duty_cycle):
389 """Calculate power consumption based on duty cycle."""
390 current = LED_MAX_CURRENT * (duty_cycle / 100)
391 power = LED_VOLTAGE * current
392 return power
393
394def control_led_brightness(duty_cycle, led_name):
395 """Control the brightness of a specific LED using PWM."""
396 if led_name in additional_pwms:
397 pwm_instance = additional_pwms[led_name]
398 elif led_name == 'TCS':
399 pwm_instance = TCS_PWM
400 elif led_name == 'PIR':
401 pwm_instance = PIR_PWM
402 elif led_name == 'IR':
403 pwm_instance = IR_PWM
404 else:
405 logging.error(f"Invalid LED name: {led_name}")
406 return
407
408 # Apply fading to prevent abrupt changes
409 threading.Thread(target=fade_to_duty_cycle, args=(pwm_instance, led_name, duty_cycle)).start()
410
411def fade_to_duty_cycle(pwm_instance, led_name, target_dc):
412 """Fade to a specific duty cycle smoothly."""
413 global current_duty, fading
414 with faults_lock:
415 if faults.get(f"{led_name}_Failure", False):
416 logging.error(f"Cannot change duty cycle of {led_name} LED due to a detected fault.")
417 return
418 if fading.get(led_name, False):
419 return # Prevent multiple fade threads
420 fading[led_name] = True
421 logging.debug(f"Starting fade to {target_dc}% duty cycle for {led_name}")
422
423 step = DIM_STEP if target_dc > current_duty.get(led_name, 0) else -DIM_STEP
424 while (step > 0 and current_duty.get(led_name, 0) < target_dc) or \
425 (step < 0 and current_duty.get(led_name, 0) > target_dc):
426 current = current_duty.get(led_name, 0)
427 new_dc = current + step
428 if step > 0:
429 new_dc = min(new_dc, target_dc)
430 else:
431 new_dc = max(new_dc, target_dc)
432 pwm_instance.ChangeDutyCycle(new_dc)
433 current_duty[led_name] = new_dc
434 time.sleep(DIM_DELAY)
435 fading[led_name] = False
436 logging.debug(f"{led_name} duty cycle set to {target_dc}%.")
437
438def log_sensor_data(timestamp, lux, r, g, b, cct, duty_cycle, power_consumed):
439 """Log sensor data and LED status to Excel."""
440 sheet.append([timestamp, lux, r, g, b, cct, duty_cycle, power_consumed])
441 workbook.save(EXCEL_FILE_PATH)
442 logging.debug(f"Logged data at {timestamp}")
443
444def handle_individual_led_faults(led_faults):
445 """Handles faults for individual additional LEDs."""
446 for led_name, is_faulty in led_faults.items():
447 if is_faulty:
448 if led_name in additional_pwms and current_duty[led_name] != 0:
449 fade_out(additional_pwms[led_name], led_name)
450 # Ensure the LED is off
451 if led_name in additional_pwms:
452 additional_pwms[led_name].ChangeDutyCycle(0)
453 current_duty[led_name] = 0
454 logging.error(f"{led_name} LED has a fault and has been turned off.")
455
456def fade_out(pwm_instance, led_name):
457 """Gradually decrease duty cycle to 0."""
458 global current_duty, fading
459 with faults_lock:
460 if faults.get(f"{led_name}_Failure", False):
461 logging.error(f"Cannot fade out {led_name} LED due to a detected fault.")
462 return
463 if fading.get(led_name, False):
464 return # Prevent multiple fade_out threads
465 fading[led_name] = True
466 logging.debug(f"Starting fade out for {led_name}")
467 while current_duty.get(led_name, 0) > 0:
468 current_duty[led_name] = max(current_duty.get(led_name, 0) - DIM_STEP, 0)
469 pwm_instance.ChangeDutyCycle(current_duty[led_name])
470 time.sleep(DIM_DELAY)
471 fading[led_name] = False
472 logging.debug(f"{led_name} faded out to 0% duty cycle.")
473
474def fade_in(pwm_instance, led_name, target_dc=100):
475 """Gradually increase duty cycle to target_dc."""
476 global current_duty, fading
477 with faults_lock:
478 if faults.get(f"{led_name}_Failure", False):
479 logging.error(f"Cannot fade in {led_name} LED due to a detected fault.")
480 return
481 if fading.get(led_name, False):
482 return # Prevent multiple fade_in threads
483 fading[led_name] = True
484 logging.debug(f"Starting fade in for {led_name} to {target_dc}% duty cycle.")
485 while current_duty.get(led_name, 0) < target_dc:
486 current_duty[led_name] = min(current_duty.get(led_name, 0) + DIM_STEP, target_dc)
487 pwm_instance.ChangeDutyCycle(current_duty[led_name])
488 time.sleep(DIM_DELAY)
489 fading[led_name] = False
490 logging.debug(f"{led_name} faded in to {target_dc}% duty cycle.")
491
492# ---------------------------- Control Loop ----------------------------
493
494def control_loop():
495 global current_duty_cycle
496 while True:
497 try:
498 sensor_data = read_sensor_data()
499 lux = sensor_data["lux"]
500 r = sensor_data["r"]
501 g = sensor_data["g"]
502 b = sensor_data["b"]
503 cct = sensor_data["cct"]
504
505 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
506
507 # Determine duty cycle based on lux
508 duty_cycle = map_lux_to_duty_cycle(lux)
509
510 # Calculate power consumption
511 power_consumed = calculate_power_consumption(duty_cycle)
512
513 # Log data to Excel
514 log_sensor_data(timestamp, lux, r, g, b, cct, duty_cycle, power_consumed)
515
516 # Adaptive control - adjust target lux based on ambient light conditions
517 if lux < 100:
518 pid.setpoint = 500 # Increase target lux in very low light
519 elif 100 <= lux < 300:
520 pid.setpoint = 450 # Moderate increase in target lux
521 elif 300 <= lux < LOW_LIGHT_THRESHOLD:
522 pid.setpoint = 350 # Lower intensity for medium light
523 else:
524 pid.setpoint = 0 # Day mode, LEDs off
525
526 # Update duty cycle after adjusting setpoint
527 duty_cycle = map_lux_to_duty_cycle(lux)
528
529 # Control LEDs based on duty cycle
530 if lux > HIGH_LIGHT_THRESHOLD:
531 # Day Mode: Ensure LEDs are off
532 for led_name in ['PIR', 'IR', 'TCS', 'LED1', 'LED3']:
533 if led_name in current_duty and current_duty[led_name] != 0:
534 control_led_brightness(0, led_name)
535 with faults_lock:
536 if not manual_override['LED2'] and not faults.get("LED2_Failure", False):
537 GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.LOW)
538 else:
539 # Night or Moderate Light Mode: Adjust LEDs based on sensor detections
540 # Read PIR and IR sensor states unless in a fault mode
541 with fault_mode_lock:
542 current_mode = fault_mode
543
544 if current_mode in ['2', '9']: # Simulate PIR Sensor Failure or Sensor Cross-Talk
545 pir_detected = False if current_mode == '2' else True
546 ir_detected = False if current_mode == '3' else True
547 else:
548 pir_detected = GPIO.input(PIR_PIN)
549 ir_detected = GPIO.input(IR_PIN)
550
551 if pir_detected or not ir_detected:
552 # Turn on LEDs based on duty cycle
553 for led_name in ['PIR', 'IR', 'TCS', 'LED1', 'LED3']:
554 if led_name in current_duty:
555 control_led_brightness(duty_cycle, led_name)
556 # Turn on LED2 if not in manual override and not faulty
557 with faults_lock:
558 if not manual_override['LED2'] and not faults.get("LED2_Failure", False):
559 GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.HIGH)
560 else:
561 # Turn off LEDs
562 for led_name in ['PIR', 'IR', 'TCS', 'LED1', 'LED3']:
563 if led_name in current_duty and current_duty[led_name] != 0:
564 control_led_brightness(0, led_name)
565 # Turn off LED2 if not in manual override and not faulty
566 with faults_lock:
567 if not manual_override['LED2'] and not faults.get("LED2_Failure", False):
568 GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.LOW)
569
570 # Handle individual LED faults
571 led_faults = {
572 'LED1': faults.get("LED1_Failure", False),
573 'LED2': faults.get("LED2_Failure", False),
574 'LED3': faults.get("LED3_Failure", False)
575 }
576 handle_individual_led_faults(led_faults)
577
578 # Sleep before next iteration
579 time.sleep(5) # Adjust as needed
580
581 except Exception as e:
582 logging.error(f"Error in control loop: {e}")
583 time.sleep(5) # Wait before retrying
584
585# ---------------------------- API Endpoints ----------------------------
586
587@app.get("/")
588def read_root():
589 return {"message": "Backend server is running."}
590
591@app.get("/status")
592def get_status():
593 with fault_mode_lock:
594 current_mode = fault_mode
595
596 # Read LED2's detection pin (GPIO 21) to determine its state
597 led2_state = False
598 try:
599 led2_state = GPIO.input(ADDITIONAL_LED_PINS["LED2"]["detection_gpio"]) == GPIO.HIGH
600 except Exception as e:
601 logging.error(f"Error reading LED2's detection pin: {e}")
602 with faults_lock:
603 faults["LED2_Failure"] = True
604
605 status = {
606 "fault_mode": FAULT_MODES.get(current_mode, "Unknown"),
607 "current_duty": current_duty,
608 "LED2_state": led2_state, # Include LED2's ON/OFF state
609 "faults": faults.copy()
610 }
611
612 return JSONResponse(status)
613
614@app.post("/set_fault_mode")
615def set_fault_mode(request: FaultModeRequest):
616 mode = request.mode
617 if mode not in FAULT_MODES:
618 logging.error(f"Invalid fault mode attempted: {mode}")
619 return JSONResponse(status_code=400, content={"error": "Invalid fault mode."})
620
621 with fault_mode_lock:
622 global fault_mode
623 fault_mode = mode
624
625 if mode == '1':
626 # Reset all fault states
627 with faults_lock:
628 for key in faults:
629 faults[key] = False
630 logging.info("Switched to Normal Operation. All faults cleared.")
631 print("Switched to Normal Operation. All faults cleared.")
632 else:
633 # Simulate faults based on the selected mode
634 with faults_lock:
635 # First, clear all faults
636 for key in faults:
637 faults[key] = False
638
639 # Then, set the specific fault
640 if mode == '2':
641 faults["PIR_Sensor_Failure"] = True
642 elif mode == '3':
643 faults["IR_Sensor_Failure"] = True
644 elif mode == '4':
645 faults["TCS_Sensor_Failure"] = True
646 elif mode == '5':
647 faults["I2C_Communication_Failure"] = True
648 elif mode == '6':
649 faults["GPIO_Output_Failure"] = True
650 elif mode == '7':
651 faults["Power_Issues"] = True
652 elif mode == '8':
653 faults["Delayed_Response"] = True
654 elif mode == '9':
655 faults["Sensor_CrossTalk"] = True
656 elif mode == '10':
657 faults["LED1_Failure"] = True
658 elif mode == '11':
659 faults["LED2_Failure"] = True
660 elif mode == '12':
661 faults["LED3_Failure"] = True
662 # Add more fault simulations as needed
663
664 logging.info(f"Simulated Fault Mode: {FAULT_MODES[mode]}")
665 print(f"Simulated Fault Mode: {FAULT_MODES[mode]}")
666
667 return {"message": FAULT_MODES[mode]}
668
669@app.post("/set_target_lux")
670def set_target_lux(request: SetTargetLuxRequest):
671 target_lux = request.target_lux
672 pid.setpoint = target_lux
673 logging.info(f"Target lux set to {target_lux}")
674 print(f"Target lux set to {target_lux}")
675 return {"status": "success", "target_lux": pid.setpoint}
676
677@app.post("/set_pid")
678def set_pid(request: SetPIDRequest):
679 global pid
680 if request.Kp is not None:
681 pid.Kp = request.Kp
682 logging.info(f"PID Kp set to {pid.Kp}")
683 if request.Ki is not None:
684 pid.Ki = request.Ki
685 logging.info(f"PID Ki set to {pid.Ki}")
686 if request.Kd is not None:
687 pid.Kd = request.Kd
688 logging.info(f"PID Kd set to {pid.Kd}")
689 return JSONResponse({
690 "status": "success",
691 "Kp": pid.Kp,
692 "Ki": pid.Ki,
693 "Kd": pid.Kd
694 })
695
696@app.post("/set_led")
697def set_led(request: SetLEDRequest):
698 led = request.led.upper()
699 state = request.state
700
701 if led not in ADDITIONAL_LED_PINS and led not in current_duty and led != "TCS":
702 logging.error(f"Invalid LED name attempted: {led}")
703 return JSONResponse(status_code=400, content={"error": "Invalid LED name."})
704
705 # Prevent controlling LEDs that are in fault mode
706 fault_prevent = False
707 with fault_mode_lock:
708 if led == "LED1" and fault_mode == '10':
709 fault_prevent = True
710 elif led == "LED2" and fault_mode == '11':
711 fault_prevent = True
712 elif led == "LED3" and fault_mode == '12':
713 fault_prevent = True
714 elif led == "PIR" and fault_mode == '2':
715 fault_prevent = True
716 elif led == "IR" and fault_mode == '3':
717 fault_prevent = True
718 elif led == "TCS" and fault_mode == '4':
719 fault_prevent = True
720 elif fault_mode in ['6', '7', '8', '9'] and led in ['LED1', 'LED2', 'LED3']:
721 fault_prevent = True
722
723 if fault_prevent:
724 logging.warning(f"Attempted to control {led} while in fault mode.")
725 return JSONResponse(status_code=400, content={"error": f"Cannot control {led} in current fault mode."})
726
727 if led == "LED2":
728 # Control LED2 directly via GPIO6
729 GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.HIGH if state else GPIO.LOW)
730 with faults_lock:
731 manual_override['LED2'] = True # Activate manual override
732 # Reset LED2 Fault Flag if manual control is restored
733 if faults.get("LED2_Failure", False):
734 faults["LED2_Failure"] = False
735 logging.info("Manual control restored for LED2. Fault flag cleared.")
736 print("Manual control restored for LED2. Fault flag cleared.")
737 logging.info(f"{led} LED set to {'on' if state else 'off'} via manual control.")
738 return {"message": f"{led} LED turned {'on' if state else 'off'} via manual control"}
739 else:
740 # For PWM-controlled LEDs
741 duty_cycle = 100 if state else 0
742 pwm_instance = None
743 if led in ['PIR', 'IR', 'TCS']:
744 pwm_instance = globals().get(f"{led}_PWM")
745 elif led in additional_pwms:
746 pwm_instance = additional_pwms.get(led)
747
748 if pwm_instance:
749 control_led_brightness(duty_cycle, led)
750 logging.info(f"{led} LED set to {'on' if state else 'off'}.")
751 return {"message": f"{led} LED turned {'on' if state else 'off'}."}
752
753 logging.error(f"Failed to set LED: {led}")
754 return JSONResponse(status_code=500, content={"error": "Failed to set LED."})
755
756# ---------------------------- Application Lifecycle Events ----------------------------
757
758@app.on_event("startup")
759def startup_event():
760 initialize_gpio()
761 # Start the control loop in a separate daemon thread
762 control_thread = threading.Thread(target=control_loop, daemon=True)
763 control_thread.start()
764 logging.info("Backend server started and control loop initiated.")
765 print("Backend server started and control loop initiated.")
766
767@app.on_event("shutdown")
768def shutdown_event():
769 # Stop PWM and clean up GPIO settings
770 PIR_PWM.stop()
771 IR_PWM.stop()
772 TCS_PWM.stop()
773 for pwm_instance in additional_pwms.values():
774 pwm_instance.stop()
775 # Turn off the red LED
776 GPIO.output(RED_LED_PIN, GPIO.LOW)
777 # Turn off LED2
778 GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.LOW)
779 GPIO.cleanup()
780 logging.info("Backend server shutdown and GPIO cleaned up.")
781 print("Backend server shutdown and GPIO cleaned up.")
782
783# ---------------------------- Run the FastAPI App ----------------------------
784
785# To run the app, use the following command:
786# uvicorn backend:app --host 0.0.0.0 --port 8000
787
788