· 3 years ago · Apr 09, 2022, 09:30 AM
1settings.py
2# настройки django==2.2
3
4from pathlib import Path
5from datetime import timedelta
6
7# Build paths inside the project like this: BASE_DIR / 'subdir'.
8BASE_DIR = Path(__file__).resolve().parent.parent
9
10# SECURITY WARNING: keep the secret key used in production secret!
11SECRET_KEY = 'django-insecure-e8w5p_c!5#p*c+sgp=w4#o%)o-8^j)%rucr#db)lf)i&b@1xyu'
12
13# SECURITY WARNING: don't run with debug turned on in production!
14DEBUG = True
15
16ALLOWED_HOSTS = ["127.0.0.1"]
17
18
19# Application definition
20
21INSTALLED_APPS = [
22 'mailing',
23 'rest_framework',
24 'django.contrib.admin',
25 'django.contrib.auth',
26 'django.contrib.contenttypes',
27 'django.contrib.sessions',
28 'django.contrib.messages',
29 'django.contrib.staticfiles',
30 'django_celery_results',
31]
32
33MIDDLEWARE = [
34 'django.middleware.security.SecurityMiddleware',
35 'django.contrib.sessions.middleware.SessionMiddleware',
36 'django.middleware.common.CommonMiddleware',
37 'django.middleware.csrf.CsrfViewMiddleware',
38 'django.contrib.auth.middleware.AuthenticationMiddleware',
39 'django.contrib.messages.middleware.MessageMiddleware',
40 'django.middleware.clickjacking.XFrameOptionsMiddleware',
41]
42
43ROOT_URLCONF = 'project.urls'
44
45TEMPLATES = [
46 {
47 'BACKEND': 'django.template.backends.django.DjangoTemplates',
48 'DIRS': [],
49 'APP_DIRS': True,
50 'OPTIONS': {
51 'context_processors': [
52 'django.template.context_processors.debug',
53 'django.template.context_processors.request',
54 'django.contrib.auth.context_processors.auth',
55 'django.contrib.messages.context_processors.messages',
56 ],
57 },
58 },
59]
60
61WSGI_APPLICATION = 'project.wsgi.application'
62
63DATABASES = {
64 'default': {
65 'ENGINE': 'django.db.backends.postgresql',
66 'NAME': 'dim',
67 'USER': 'dim',
68 'PASSWORD': '',
69 'HOST': 'localhost'
70 }
71}
72
73AUTH_PASSWORD_VALIDATORS = [
74 {
75 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
76 },
77 {
78 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
79 },
80 {
81 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
82 },
83 {
84 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
85 },
86]
87
88LANGUAGE_CODE = 'en-us'
89
90TIME_ZONE = 'UTC'
91
92USE_I18N = True
93
94USE_L10N = True
95
96USE_TZ = True
97
98STATIC_URL = '/static/'
99
100DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
101
102CELERY_RESULT_BACKEND = 'django-db'
103CELERY_CACHE_BACKEND = 'django-cache'
104CELERY_BROKER_URL = 'amqp://localhost'
105CELERY_ACCEPT_CONTENT = ['application/json']
106CELERY_TASK_SERIALIZER = 'json'
107CELERY_RESULT_SERIALIZER = 'json'
108CELERY_TIMEZONE = 'Europe/Moscow'
109CELERY_BEAT_SCHEDULE = {
110 'send_mailing_every_60_sec': {
111 'task': 'mailing.tasks.send',
112 'schedule': timedelta(seconds=60)
113 },
114}
115
116# настройки для сервера "Фабрика Решений"
117CELERY_SETTINGS_FBRQ = {
118 'url': 'https://probe.fbrq.cloud/v1/send/',
119 'headers': {
120 'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2Nzk3NTE5MjIsImlzcyI6ImZhYnJpcXVlIiwibmFtZSI6IkRpbU1hcmNoIn0.jf9IpbJqObb7ECKMa10TNEnXzVYJ1iy0BL2jWE_UHfo',
121 'Accept': 'application/json',
122 'Content-type': 'application/json',
123 },
124}
125
126
127
128
129
130
131
132
133
134
135views.py
136# вьюшка djangorestframework==3.13.1, которая из mailing.post дергает таск
137
138from django.utils import timezone
139from datetime import timedelta
140
141from celery.utils.log import get_logger
142from rest_framework.response import Response
143from rest_framework.views import APIView
144from rest_framework.generics import get_object_or_404
145
146from .models import Mailing, Client, Message
147from .serializers import ClientSerializer, MailingSerializer
148from .tasks import send
149
150
151logger = get_logger(__name__)
152
153
154class ClientView(APIView):
155
156 def get(self, request):
157 clients = Client.objects.all()
158 serializer = ClientSerializer(clients, many=True)
159
160 return Response({'client': serializer.data})
161
162 def post(self, request):
163 client = request.data.get('client')
164 client['code'] = int(client['number']) // 10_000_000 - 7_000
165 serializer = ClientSerializer(data=client)
166 if serializer.is_valid(raise_exception=True):
167 client = serializer.save()
168 return Response({'Success': 'Client id: {}, tel: {} is created.'.format(client.id, client.number)})
169
170 def put(self, request, pk):
171 client = get_object_or_404(Client, pk=pk)
172 data = request.data.get('client')
173 serializer = ClientSerializer(instance=client, data=data, partial=True)
174 if serializer.is_valid(raise_exception=True):
175 client = serializer.save()
176 return Response({'Success': 'Client id {}, tel: {} is updated.'.format(client.id, client.number)})
177
178 def delete(self, request, pk):
179 client = get_object_or_404(Client, pk=pk)
180 id = client.id
181 client.delete()
182 return Response({'Success': 'Client id: {}, tel: {} has been deleted.'.format(id, client.number)})
183
184
185class MailingView(APIView):
186
187 # просмотр модели: http://localhost:8000/api/mailing/
188 def get(self, request):
189 mailing = Mailing.objects.all()
190 serializer = MailingSerializer(mailing, many=True)
191 return Response({'mailing': serializer.data})
192
193 # создание модели: http://localhost:8000/api/mailing/
194 def post(self, request):
195 mailing = request.data.get('mailing')
196 serializer = MailingSerializer(data=mailing)
197 if serializer.is_valid(raise_exception=True):
198 # Проверочные валидаторы создания модели в ./models.py
199 mailing = serializer.save()
200
201 # Поиск клиентов по заданному фильтру рассылки
202 clients = Client.objects.filter(tag=mailing.filter)
203
204 # создаем сообщение по каждому клиенту. В дальнейшем, эти сообщения нужно отправить адресатам (Celery tasks)
205 for client in clients:
206 message = Message()
207 message.datetime_on = mailing.datetime_on
208 message.client = client
209 message.mailing = mailing
210 message.save()
211 # отправить рассылку по её id, если время отправки уже наступило, а время окончания еще не истекло
212 if mailing.datetime_on <= timezone.now() and mailing.datetime_off > timezone.now():
213 send.delay(mailing.id)
214
215 # вывести в клиент Rest API сообщение об успешном создании рассылки
216 return Response({'Success': 'Mailing id: {}, text: "{}" is created.'.format(mailing.id, mailing.text[:50])})
217
218 def put(self, request, pk):
219 mailing = get_object_or_404(Mailing, pk=pk)
220 data = request.data.get('mailing')
221 serializer = MailingSerializer(instance=mailing, data=data, partial=True)
222 if serializer.is_valid(raise_exception=True):
223 mailing = serializer.save()
224 return Response({'Success': 'Mailing id: {}, text: "{}" is updated.'.format(mailing.id, mailing.text[:50])})
225
226 def delete(self, request, pk):
227 mailing = get_object_or_404(Mailing, pk=pk)
228 id = mailing.id
229 mailing.delete()
230 return Response({'Success': 'Mailing id: {}, text: "{}" has been deleted.'.format(id, mailing.text[:50])})
231
232# end views.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252tasks.py
253# таски celery==5.2.3
254
255import requests
256import json
257from celery import Celery, shared_task, group, current_task
258from django.utils import timezone
259from datetime import timedelta
260
261from project.settings import CELERY_SETTINGS_FBRQ
262from .models import Message
263
264
265def message_update_state(pk, status):
266 """
267 Актуализируем статусы у модели "Сообщение".
268 Если Сообщение было взято в работу (в таск, статус "active") и таск отработал отлично, то меняем статус
269 на "исполнено" (finished)
270 Если Сообщение было взято в работу "active" и таск не смог передать сообщение на сервер, то меняем статус
271 на "новое" (new), позволяя снова пробовать отправить сообщение новым таском, который найдет его по этому статусу "new", как еще не отправленное сообщение)
272 """
273 # смотри .models.Message status field: new, active, finished
274 message = Message.objects.get(pk=pk)
275 message.status = status
276 message.save()
277
278
279app = Celery('tasks')
280
281# подгружаем настройки запроса, чтобы сервер https://probe.fbrq.cloud точно принял данные
282URL = CELERY_SETTINGS_FBRQ['url']
283HEADERS = CELERY_SETTINGS_FBRQ['headers']
284
285
286# экземпляр таска self.retry() некорректно работает в celery==5.2.0- 5.2.3
287# Поэтому views.django_rest_framework_func() --> tasks.task.retry(eta=start_time, expires=expire_time) не работает!
288# Пришлось делать через celery beat: views.django_rest_framework_func() --> tasks.task(run_every=timedelta(seconds=60))
289# см project.settings.py
290# загружаем из mailing.tasks, а не из корневого каталога celery.py, поэтому @app.on_after_configure.connect не работает!
291@app.on_after_finalize.connect
292def setup_periodic_tasks(sender, **kwargs):
293 sender.add_periodic_task(10.0, send.s(), name='mailing.tasks.send')
294
295
296@shared_task()
297def send_message(message_id, phone, text):
298 """
299 Основной таск для отправки данных на сервер https://probe.fbrq.cloud
300 """
301
302 meta = {message_id: text[:50]+'...'} # дополнительная информация к таске
303 message_update_state(pk=message_id, status='active') # меняем статус у модели "Сообщение" на "в работе"
304 current_task.update_state(state='PROGRESS', meta=meta) # меняем статус у таска на "в работе"
305
306 # спецификация url принимающего сервера:
307 url_fabrika = URL + str(message_id)
308
309 # спецификация json принимающего сервера
310 data = {
311 'id': message_id,
312 'phone': phone,
313 'text': text
314 }
315
316 # настройка, если принимающий сервер подтупливает, либо трафик идет через vpn
317 connect_timeout, read_timeout = 3, 3
318 # try:
319 response = requests.post(url=url_fabrika,
320 data=json.dumps(data),
321 headers=HEADERS,
322 verify=True,
323 timeout=(connect_timeout, read_timeout)
324 )
325 status = response.status_code
326
327 if status == 200:
328 # сервер вернул "хорошие" данные
329 message_update_state(pk=message_id, status='finished') # меняем статус у модели "Сообщение" на "исполнено"
330 current_task.update_state(state='SUCCESS', meta=meta) # меняем статус у таска на "исполнено"
331 return {'status': status, 'body': meta} # возвращаем ответ от сервера: статус, отправленное сообщение
332 else:
333 # сервер не смог принять данные
334 message_update_state(pk=message_id, status='new') # меняем статус у сообщения, снова возвращаем в работу
335 current_task.update_state(state='FAILURE', meta=meta) # меняем статус у таска на "неудачное исполнение"
336 # возвращаем общую информацию о неудачной попытке передачи сообщения
337 raise ConnectionError('Unable to connect to: ' + URL)
338
339
340@shared_task(name='mailing.tasks.send')
341def send(mailing_id=None):
342 """
343 Запускать с флагом --beat: (env) user:path$ celery -A project worker -l info --beat
344
345 Получаем управление из celery_beat.
346 Находим все сообщения (не отправленные, но которые нужно отправить), собираем их на отправку в tasks
347 """
348 # отправить все новые сообщения с подходящим временным диапазоном
349 messages = Message.objects.filter(status='new',
350 datetime_on__lte=timezone.now(),
351 mailing__datetime_off__gte=timezone.now(),
352 )
353
354 # приходит id рассылки mailing_id из .views.py
355 if mailing_id is not None:
356 messages = messages.filter(mailing=mailing_id)
357
358 tasks = group(send_message.s(m.id, m.client.number, m.mailing.text) for m in messages)
359 tasks.delay()
360
361 return tasks
362
363
364@shared_task
365def dummy():
366 """
367 Пустышка. Проверочный таск
368 """
369 return 'Its dummy task =)'
370