· 4 years ago · May 06, 2021, 10:58 AM
1import decimal
2from collections import defaultdict
3import datetime as dt
4from decimal import Decimal
5from itertools import chain
6from urllib.parse import urljoin
7
8from dateutil import parser
9import backoff
10import requests
11from requests.exceptions import RequestException
12
13from apps.grabbers.jobs.base import BaseGrabber
14from conf import settings
15
16
17class AffiseManager:
18
19 CONVERSION_STATUS_CONFIRMED = 1
20
21 def __init__(self, api_url, api_key):
22 self.api_url = api_url
23 self.api_key = api_key
24
25 @backoff.on_exception(backoff.constant, RequestException,
26 interval=settings.AFFISE_REQUEST_INTERVAL_SECONDS,
27 jitter=backoff.random_jitter,
28 max_tries=settings.AFFISE_REQUEST_MAX_TRIES)
29 def make_affise_request(self, path, data_path, params=None):
30 if params is None:
31 params = {}
32
33 if 'limit' not in params:
34 params['limit'] = 500
35
36 items = []
37 while True:
38 response = requests.get(
39 urljoin(self.api_url, path),
40 params=params,
41 headers={'API-Key': self.api_key}
42 )
43 response.raise_for_status()
44 result = response.json()
45 if data_path in result:
46 items.extend(result[data_path])
47
48 next_page = result.get('pagination', {}).get('next_page')
49 if next_page:
50 params['page'] = next_page
51 else:
52 break
53
54 return items
55
56 def get_conversion_stats(self, date_from, date_to):
57 conversions = defaultdict(lambda: defaultdict(float))
58 for conversion in self.make_affise_request('stats/conversions', 'conversions', {
59 'date_from': date_from,
60 'date_to': date_to,
61 'status': self.CONVERSION_STATUS_CONFIRMED,
62 'timezone': 'UTC'
63 }):
64 key = (
65 parser.parse(conversion['created_at']).strftime(settings.DATE_FORMAT),
66 conversion['partner']['id'],
67 conversion['partner']['login'],
68 conversion['offer']['id'],
69 conversion['offer']['title'],
70 conversion['country'],
71 conversion['city'],
72 conversion['isp_code'],
73 conversion['browser'],
74 conversion['device_type'],
75 conversion['goal'],
76 conversion['sub1'],
77 conversion['sub2'],
78 )
79 conversions[key]['payouts'] += conversion['payouts']
80 conversions[key]['count'] += 1
81
82 return conversions
83
84 def get_offers(self):
85 return self.make_affise_request('offers', 'offers')
86
87 def get_partners(self):
88 return self.make_affise_request('admin/partners', 'partners')
89
90 def get_stats_by_date(self, date_from, date_to):
91 """Get summary stats from 'date_from' to 'date_to'.
92
93 Uses `GET /3.0/stats/getbydate`__ endpoint of the Affise API.
94
95 __ https://api.affise.com/docs3.0/#by-date
96 """
97 return self.make_affise_request(
98 'stats/getbydate',
99 'stats',
100 {
101 'filter[date_from]': date_from,
102 'filter[date_to]': date_to,
103 'timezone': 'UTC'
104 }
105 )
106
107
108class AffiseGrabber(BaseGrabber):
109 job_name = 'affise_grabber'
110 partner = 'affise'
111 stats_currency = 'USD'
112 results_kafka_topic = settings.KAFKA_TE_PARTNER_EVENTS_RAW_TOPIC
113 results_kafka_schema_subject = settings.KAFKA_TE_PARTNER_EVENTS_RAW_SCHEMA_SUBJECT
114 DEFAULT_TARGETING_URL_WEIGHT = 100
115
116 def __init__(self, *args, **kwargs):
117 super().__init__(*args, **kwargs)
118 self.days = []
119 self.manager = AffiseManager(settings.AFFISE_API_URL, settings.AFFISE_API_KEY)
120 self.offers = {}
121 self.conversion_stats_report = {}
122
123 def build_partner_dictionary(self):
124 partner_dictionary = []
125 for item in self.manager.get_offers():
126 partner_dictionary.append({'type': 'offer', 'id': str(item['id']), 'name': item.get('title') or ''})
127 for item in self.manager.get_partners():
128 partner_dictionary.append({'type': 'affiliate', 'id': str(item['id']), 'name': item.get('login') or ''})
129 return partner_dictionary
130
131 @staticmethod
132 def render_incorrect_url_template(info):
133 return f"""
134Tracking url {info.get('tracking_url') or 'is empty'}:
135 Offer - ID {info.get('offer_id') or '-'}; Name {info.get('offer') or '-'};
136 Affiliate - ID {info.get('affiliate_id') or '-'}; Name {info.get('affiliate') or '-'};
137 Country - {info.get('country') or '-'};
138 Device - {info.get('device') or '-'};"""
139
140 @staticmethod
141 def format_tracking_url(template, affiliate_id, sub1, sub2):
142 class Default(dict):
143 def __missing__(self, key):
144 return f'{{{key}}}'
145 return template.format_map(Default(pid=affiliate_id, sub1=sub1, sub2=sub2))
146
147 def fetch_reports(self, day):
148 start, end = day.strftime(settings.DATE_FORMAT), day.strftime(settings.DATE_FORMAT)
149
150 self.offers = self.manager.get_offers()
151 self.conversion_stats_report = self.manager.get_conversion_stats(
152 start,
153 end
154 )
155
156 def get_offer(self, offer_id):
157 return next((offer for offer in self.offers if offer['id'] == offer_id), None)
158
159 @staticmethod
160 def is_targeting_exact_match(country, city, isp, browser, device, targeting, affiliate_id=None):
161 return (
162 (affiliate_id in targeting['affiliate_id'] if affiliate_id is not None else targeting['affiliate_id'] == []) and
163 country in targeting['country']['allow'] and
164 city in targeting['city']['allow'] and
165 isp in targeting['isp']['allow'] and
166 browser in targeting['browser']['allow'] and
167 device in targeting['device_type'] and
168 targeting.get('urls')
169 )
170
171 @staticmethod
172 def is_targeting_any_match(country, city, isp, browser, device, targeting, affiliate_id=None):
173 return (
174 (affiliate_id in targeting['affiliate_id'] if affiliate_id is not None else targeting['affiliate_id'] == []) and
175 (targeting['country']['allow'] == [] or country in targeting['country']['allow']) and
176 (targeting['city']['allow'] == [] or city in targeting['city']['allow']) and
177 (targeting['isp']['allow'] == [] or isp in targeting['isp']['allow']) and
178 (targeting['browser']['allow'] == [] or browser in targeting['browser']['allow']) and
179 (targeting['device_type'] == [] or device in targeting['device_type']) and
180 targeting.get('urls')
181 )
182
183 def get_tracking_url_template_from_targeting(self, offer_id, affiliate_id, country, city, isp, browser, device):
184 def sort_targeting(x):
185 return sum([
186 x['country']['allow'] == [],
187 x['city']['allow'] == [],
188 x['isp']['allow'] == [],
189 x['browser']['allow'] == [],
190 x['device_type'] == []
191 ])
192
193 def sort_weight(x):
194 return x.get('weight', self.DEFAULT_TARGETING_URL_WEIGHT)
195
196 offer = self.get_offer(offer_id)
197 sorted_targeting = sorted(offer['targeting'], key=sort_targeting)
198 personal_tracking_urls_for_exact_specific_attributes = (
199 targeting_url['url']
200 for targeting in sorted_targeting
201 if self.is_targeting_exact_match(country, city, isp, browser, device, targeting, affiliate_id)
202 for targeting_url in sorted(targeting['urls'], key=sort_weight)
203 )
204 personal_tracking_urls_for_any_specific_attributes = (
205 targeting_url['url']
206 for targeting in sorted_targeting
207 if self.is_targeting_any_match(country, device, city, isp, browser, targeting, affiliate_id)
208 for targeting_url in sorted(targeting['urls'], key=sort_weight)
209 )
210 generic_tracking_urls_for_exact_specific_attributes = (
211 targeting_url['url']
212 for targeting in sorted_targeting
213 if self.is_targeting_exact_match(country, city, isp, browser, device, targeting)
214 for targeting_url in sorted(targeting['urls'], key=sort_weight)
215 )
216 generic_tracking_urls_for_any_specific_attributes = (
217 targeting_url['url']
218 for targeting in sorted_targeting
219 if self.is_targeting_any_match(country, city, isp, browser, device, targeting)
220 for targeting_url in sorted(targeting['urls'], key=sort_weight)
221 )
222 # stops iteration at first match
223 return next(chain(
224 personal_tracking_urls_for_exact_specific_attributes,
225 personal_tracking_urls_for_any_specific_attributes,
226 generic_tracking_urls_for_exact_specific_attributes,
227 generic_tracking_urls_for_any_specific_attributes,
228 ), None)
229
230 def get_tracking_url_template(self, offer_id, affiliate_id, country, city, isp, browser, device):
231 offer = self.get_offer(offer_id)
232 if not offer:
233 return ''
234 tracking_url = self.get_tracking_url_template_from_targeting(offer_id, affiliate_id, country, city, isp, browser, device)
235 if tracking_url:
236 return tracking_url
237
238 tracking_url = offer['url']
239 if tracking_url:
240 return tracking_url
241
242 return
243
244 def get_affise_stats(self, day):
245 self.fetch_reports(day)
246
247 affise_data = defaultdict(list)
248 for (day, affiliate_id, affiliate_login, offer_id, offer_title, country,
249 city, isp, browser, device, goal, sub1, sub2), stat in self.conversion_stats_report.items():
250 tracking_url_template = self.get_tracking_url_template(offer_id, affiliate_id, country, city, isp, browser, device)
251 if not tracking_url_template:
252 self.notifications.append(
253 self.build_error_notification({
254 'offer_id': offer_id,
255 'offer': offer_title,
256 'affiliate_id': affiliate_id,
257 'affiliate': affiliate_login,
258 'country': country,
259 'device': device,
260 })
261 )
262 continue
263 key = (
264 day,
265 offer_id,
266 self.format_tracking_url(tracking_url_template, affiliate_id, sub1, sub2),
267 goal
268 )
269 affise_data[key].append(dict(
270 original_cost=stat['payouts'],
271 events_count=int(stat['count'])
272 ))
273
274 return affise_data
275
276 def setup_dates(self):
277 super().setup_dates()
278
279 days_delta = (self.end_date - self.start_date).days
280 if days_delta == 0:
281 self.days = [self.start_date]
282 elif days_delta == 1:
283 self.days = [self.start_date, self.end_date]
284 else:
285 self.days = [
286 self.start_date,
287 self.start_date + dt.timedelta(days=days_delta // 2),
288 self.end_date
289 ]
290
291 def get_stats(self):
292 for date in self.days:
293 affise_data = self.get_affise_stats(date)
294
295 data = []
296 level = self.levels[-1]
297 day = None
298 for (day, offer_id, tracking_url, goal), goal_data in affise_data.items():
299 original_cost = sum(Decimal(goal_entry['original_cost']) for goal_entry in goal_data)
300 events_count = sum(int(goal_entry['events_count']) for goal_entry in goal_data)
301 data.append(
302 dict(
303 day=day,
304 partner_name=self.partner,
305 partner_level_id=level.id,
306 partner_entity_id=offer_id,
307 partner_parent_entity_id=None,
308 tracking_url=tracking_url,
309 original_currency=self.stats_currency,
310 original_cost=original_cost,
311 cost=self.calculate_cost(original_cost),
312 event_type=goal,
313 events_count=events_count,
314 clicks=0, # affise cannot slit by events
315 impressions=0, # affise cannot slit by events
316 )
317 )
318 if len(data) == settings.GRAB_STATS_CHUNK_SIZE:
319 yield data
320 data = []
321 if day:
322 self.processed_days.add(day)
323 yield data
324
325 def get_cost_summary(self):
326 data = self.manager.get_stats_by_date(
327 self.start_date,
328 self.end_date
329 )
330
331 # Aggregate reports by day.
332 aggregation = {}
333
334 for item in data:
335 item_date = dt.date(
336 item['slice']['year'],
337 item['slice']['month'],
338 item['slice']['day']
339 )
340 item_cost = decimal.Decimal(item['actions']['confirmed']['revenue'])
341
342 aggregation_key = item_date
343 group = aggregation.setdefault(aggregation_key, {})
344 group.setdefault('day', item_date.isoformat())
345 group.setdefault('original_cost', decimal.Decimal(0))
346 group.setdefault(
347 'original_currency',
348 self.stats_currency.upper()
349 )
350 group['original_cost'] += item_cost
351
352 for group in aggregation.values():
353 yield group
354