· 4 years ago · Jul 30, 2021, 11:26 AM
1#!/usr/bin/env python3
2import concurrent.futures
3import json
4import time
5import datetime as dt
6import logging
7import traceback
8
9from flask import Flask, render_template, request, jsonify, g
10from flask_executor import Executor
11
12from server.dataset import DatasetRepository
13from server.process_sheets.process_sheets_api import process_sheets_api
14from server.group_data.group_data_api import group_data_api
15from server.utilities.timing import time_function
16from server.validate_members.validate_members_api import validate_members_api
17from server.mapping.map_fields_api import map_fields_api
18from server.dataset_updates.dataset_updates_api import dataset_updates_api
19from server.dataset_ignores.dataset_ignores_api import dataset_ignores_api
20from server.dataset_types.dataset_types_api import dataset_types_api
21from server.dataset_duplicates.dataset_duplicates_api import dataset_duplicates_api
22from server.dataset_reapply.dataset_reapply_api import dataset_reapply_api
23from server.clean_data.clean_data_api import clean_data_api
24from server.clean_data.define_checks_api import define_checks_api
25from server.queries.queries_api import queries_api
26from server.output.output_api import output_api
27from server.models.dataset_calculations_api import dataset_calculations_api
28from server.merged_output.merged_output_api import merged_output_api
29
30from server.api.celery_api import celery_api
31from server.api.check_formula_api import formula_check_api
32from server.api.data_api import data_api, split_and_save_dataset
33from server.api.metadata_api import metadata_api
34from server.api.health_check_api import health_check_api
35from server.api.lookup_tables_api import lookup_table_api
36from server.errors import RulesEngineException
37from server.utilities.mongo_json_encoder import MongoJSONEncoder
38from server.utilities.logging import setup_server_logging
39from server.utilities.user_context import current_user_context
40from settings import settings
41from data.enums import ErrorSeverities, AlertTypes
42
43app = Flask(__name__)
44app.json_encoder = MongoJSONEncoder
45
46if not settings.rabbit.enabled:
47 # Register the in-memory background worker. See https://flask-executor.readthedocs.io/en/latest/
48 app.executor = Executor(app)
49
50# Register all API blueprints
51app.register_blueprint(health_check_api)
52app.register_blueprint(process_sheets_api)
53app.register_blueprint(group_data_api)
54app.register_blueprint(map_fields_api)
55app.register_blueprint(validate_members_api)
56app.register_blueprint(define_checks_api)
57app.register_blueprint(clean_data_api)
58app.register_blueprint(data_api)
59app.register_blueprint(metadata_api)
60app.register_blueprint(celery_api)
61app.register_blueprint(formula_check_api)
62app.register_blueprint(output_api)
63app.register_blueprint(merged_output_api)
64app.register_blueprint(lookup_table_api)
65app.register_blueprint(queries_api)
66app.register_blueprint(dataset_updates_api)
67app.register_blueprint(dataset_ignores_api)
68app.register_blueprint(dataset_types_api)
69app.register_blueprint(dataset_duplicates_api)
70app.register_blueprint(dataset_reapply_api)
71app.register_blueprint(dataset_calculations_api)
72
73logger = logging.getLogger(__name__)
74
75
76@app.route('/')
77def index():
78 return render_template('index.html', now=dt.datetime.utcnow().isoformat())
79
80
81@app.before_request
82def before_request():
83 g.start = time.time()
84
85
86# TODO: should probably be updated when using Flask 1.X, as per comments in utilities/logging.py
87@app.after_request
88def after_request(response):
89 logger.info('%s %s %s', request.method, request.full_path, response.status, extra={
90 'meta': {
91 'req': {
92 'url': request.full_path,
93 'method': request.method,
94 },
95 'res': {
96 'statusCode': response.status_code,
97 },
98 'responseTime': (time.time() - g.start) * 1000
99 }
100 })
101 return response
102
103
104@app.errorhandler(Exception)
105def handle_errors(exception):
106 # No need to explicitly log it, Flask server will automatically log it
107 # Make sure this is still the case if we ever update to Flask 1.X
108 # logger.error(exception, exc_info=1)
109
110 # If it is a RulesEngineException then it is a controlled error with a specific message passed
111 # The message that is passed (exception) should be consistent with a key in our client side translations
112 return jsonify({
113 **current_user_context().toJSON(),
114 'message': None if isinstance(exception, RulesEngineException) else f"{type(exception).__name__}: {str(exception)}",
115 'messageKey': str(exception) if isinstance(exception, RulesEngineException) else None,
116 'messageParams': exception.message_params if isinstance(exception, RulesEngineException) else None,
117 'severity': exception.severity if isinstance(exception, RulesEngineException) else ErrorSeverities.error,
118 # Only send details of the stack trace when enabled
119 'stack': traceback.format_exc() if settings.expose_error_details else None,
120 'alertType': exception.alert_type if isinstance(exception, RulesEngineException) else AlertTypes.globalAlert,
121 'caseId': exception.case_id if isinstance(exception, RulesEngineException) else None
122 }), 500
123
124
125if __name__ == '__main__':
126 # Do we want to get a celery worker started in the same process to make easier working
127 # in development mode without docker?: https://stackoverflow.com/a/33117923/1836935
128 @app.route("/<string:case_id>/data/split/<string:step>", methods=['POST'])
129 @time_function
130 def split_field_by_val_groups(case_id, step):
131 print(__name__)
132 data = json.loads(request.data)
133
134 dataset_repo = DatasetRepository(case_id, step)
135 datasets = dataset_repo.load_all()
136
137 # pool = Pool()
138 # for dataset in datasets:
139 # print(__name__)
140 # results = pool.apply(split_and_save_dataset, (data, dataset, dataset_repo, case_id))
141 #
142 # pool.close()
143 # pool.join()
144
145 # for dataset in datasets:
146 # executor.submit(split_and_save_dataset, data, dataset.id, dataset_repo, case_id)
147
148 # with concurrent.futures.ProcessPoolExecutor() as executor:
149 # results = [executor.submit(split_and_save_dataset, data, dataset, dataset_repo, case_id) for dataset in datasets]
150
151 processes = []
152 import multiprocessing
153 for dataset in datasets:
154 p = multiprocessing.Process(target=split_and_save_dataset, args=(data, dataset.id, dataset_repo, case_id))
155 p.start()
156 processes.append(p)
157
158 for process in processes:
159 process.join()
160
161 return 'ok', 200
162
163 setup_server_logging(app)
164 logger.info('listening on port 5000. Press Ctrl+C to stop.')
165 app.run(host=settings.bind_address, port=5000, threaded=True)
166