· 4 years ago · May 27, 2021, 02:40 PM
1from core.bigquery import BigQueryAsync
2
3import sqlalchemy as sa
4from sqlalchemy.engine.default import DefaultDialect
5
6SESSION_TABLENAME = 'ga_sessions_'
7OPTED_OUT_USERS = 'opted_out_users_spa'
8OPTED_OUT_USERS_TABLE_SCHEMA = [
9 {
10 'name': 'wgid',
11 'type': 'STRING',
12 'mode': 'REQUIRED',
13 },
14 {
15 'name': 'anonymized_id',
16 'type': 'STRING',
17 'mode': 'REQUIRED',
18 }
19]
20
21
22class BigQueryManagerAsync():
23
24 def __init__(self):
25 self.client = BigQueryAsync()
26 self._dialect = DefaultDialect()
27 self._dialect.identifier_preparer.initial_quote = '`'
28 self._dialect.identifier_preparer.final_quote = '`'
29
30 def __compile_query(self, query):
31 return str(query.compile(compile_kwargs={'literal_binds': True}, dialect=self._dialect))
32
33 async def generate_access_token(self):
34 await self.client.generate_access_token()
35
36 async def check_opted_out_visitor_table(self, project_id, dataset_id):
37 return await self.client.check_table(project_id, dataset_id, OPTED_OUT_USERS)
38
39 async def create_opted_out_visitor_table(self, project_id, dataset_id):
40 await self.client.create_table(
41 project_id,
42 dataset_id,
43 OPTED_OUT_USERS,
44 OPTED_OUT_USERS_TABLE_SCHEMA
45 )
46
47 async def ensure_opted_out_visitors_table_exists(self, project_id, dataset_id):
48 is_exist_table = await self.check_opted_out_visitor_table(
49 project_id,
50 dataset_id
51 )
52 # check and create temporary user table in BigQuery
53 if not is_exist_table:
54 await self.create_opted_out_visitor_table(project_id, dataset_id)
55
56 async def register_opted_out_visitor(
57 self,
58 project_id,
59 dataset_id,
60 wgid,
61 anonymized_id
62 ):
63 res = await self.client.query(
64 project_id,
65 f'''
66 INSERT INTO `{dataset_id}.{OPTED_OUT_USERS}` (wgid, anonymized_id)
67 SELECT
68 wgid, anonymized_id
69 FROM
70 (select '{wgid}' as wgid, '{anonymized_id}' as anonymized_id)
71 WHERE
72 NOT (EXISTS (
73 SELECT 1
74 FROM `{dataset_id}.{OPTED_OUT_USERS}`
75 WHERE `{dataset_id}.{OPTED_OUT_USERS}`.wgid = '{wgid}'
76 )
77 )
78 '''
79 )
80 return res
81
82 async def get_registered_opted_out_visitors(self, project_id, dataset_id, lookup_field):
83 '''
84 SELECT ga.date, ARRAY_AGG(DISTINCT oous.wgid) AS accounts
85 FROM `{dataset_id}.{OPTED_OUT_USERS}` AS oous
86 LEFT JOIN (
87 SELECT `{dataset_id}.{SESSION_TABLENAME}*`.date, cd.index AS index, cd.value AS value
88 FROM `{dataset_id}.{SESSION_TABLENAME}*`,
89 unnest(`{dataset_id}.{SESSION_TABLENAME}*`.customDimensions) AS cd
90 ) AS ga
91 ON oous.wgid = ga.value AND ga.index = 3
92 GROUP BY ga.date
93 '''
94 opted_out_visitor_columns = [sa.literal_column('wgid')]
95 opted_out_visitor_table = sa.table(
96 f'{dataset_id}.{OPTED_OUT_USERS}',
97 *opted_out_visitor_columns
98 ).alias('oous')
99 session_columns = [sa.literal_column('date'), sa.literal_column('customDimensions')]
100 session_table = sa.table(
101 f'{dataset_id}.{SESSION_TABLENAME}*',
102 *session_columns
103 )
104 session_custom_query = sa.select(
105 [
106 session_table.c.date,
107 sa.literal_column('cd.index').label('index'),
108 sa.literal_column('cd.value').label('value')
109 ]
110 ).select_from(session_table).select_from(
111 sa.func.unnest(session_table.c.customDimensions).alias('cd')
112 ).alias('ga')
113 query = sa.select(
114 [
115 session_custom_query.c.date,
116 sa.func.ARRAY_AGG(sa.distinct(opted_out_visitor_table.c.wgid)).label('accounts')
117 ],
118 ).select_from(
119 opted_out_visitor_table.join(
120 session_custom_query,
121 sa.and_(
122 opted_out_visitor_table.c.wgid == session_custom_query.c.value,
123 session_custom_query.c.index == lookup_field
124 ),
125 isouter=True
126 )
127 ).group_by(
128 session_custom_query.c.date
129 )
130 res = await self.client.query(
131 project_id,
132 self.__compile_query(query)
133 )
134 return res
135
136 async def anonymize_opted_out_visitors_sessions(self, project_id, dataset_id, date, lookup_field, cleanup_fields):
137 return await self.client.query(
138 project_id,
139 f'''
140 update `{dataset_id}`.`{SESSION_TABLENAME}{date}` as ga
141 set customDimensions=ARRAY(
142 SELECT
143 (index,
144 CASE
145 WHEN index = {lookup_field} THEN
146 ac.anonymized_id
147 WHEN index in ({', '.join(map(str, cleanup_fields))}) THEN
148 null
149 else
150 value
151 end
152 )
153 FROM unnest(ga.customDimensions)
154 )
155 from `{dataset_id}.{OPTED_OUT_USERS}` as ac
156 where ac.wgid=(select value from unnest(ga.customDimensions) where index=3);
157 '''
158 )
159
160 async def clear_opted_out_visitors_anonymized(self, project_id, dataset_id, accounts):
161 '''
162 delete `{dataset_id}`.`{OPTED_OUT_USERS}` as o
163 where
164 o.wgid in ({accounts})
165 '''
166 if not accounts:
167 return
168 opted_out_visitors_columns = [
169 sa.literal_column('wgid'),
170 ]
171 opted_out_visitor_table = sa.table(
172 f'{dataset_id}.{OPTED_OUT_USERS}',
173 *opted_out_visitors_columns
174 )
175 query = sa.delete(
176 opted_out_visitor_table
177 ).where(
178 opted_out_visitor_table.c.wgid.in_(accounts)
179 )
180 res = await self.client.query(
181 project_id,
182 self.__compile_query(query)
183 )
184 return res
185