· 6 years ago · Sep 09, 2019, 10:52 AM
1"""
2Command line utility to create/update views on a big query's table.
3This utility pulls the JSON schema of a given table and iterates through them
4and filters out the blacklisted fields , and creats a string with all the
5required columns for a view
6Inut as Commandline Args:
7- Arg 1: projectname.datasetname.tablename (Fully qualified table name)
8- Arg2 :blacklisted fields: A string with comma separated black listed fields
9- Arg3 : bq command path
10- Arg4 : View project name.View Dataset.View Name
11Output :
12This tool filters out the blacklisted fields from the source table and creates a
13view with the remaining fields in the destination project and destination
14dataset withe given view name
15***This tool is idompetent hence creates a new view if a view does not exist
16with the given view name.
17If the view exists , it drops the view and recreate with latest column fields
18"""
19
20import json
21import sys
22import subprocess
23import os
24
25
26def view_builder(col_str, table_fqn, view_fqn, bq_path):
27 """Idempotent function to create/update view
28 Parameters :
29 col_str : String of comma separated columns require for the view
30 creation
31 table_fqn : Fully Qualified name of the table
32 i.e. ProjectName.DatasetName.TableName
33 view_fqn : Fully qualified name of the view
34 i.e. ProjectName.DatasetName.TableName
35 Creates or updates a view
36 """
37 # Parse the view_fqn
38 view_list = list(view_fqn.split('.'))
39 view_prj_name = view_list[0]
40 view_ds_name = view_list[1]
41 view_vw_name = view_list[2]
42
43 # Create the view with new columns and view name
44 new_view_query = "\"CREATE OR REPLACE VIEW \`" + \
45 view_fqn + \
46 "\` AS SELECT " + col_str + " FROM \`" + table_fqn + "\`\""
47 make_view_command = " ".join([
48 bq_path, "query", "--use_legacy_sql=false",
49 "--project_id=" + view_prj_name,
50 new_view_query
51 ])
52
53 try:
54 sys.stdout.write("Creating BQ View:")
55 sys.stdout.write(make_view_command)
56 subprocess.check_output(make_view_command, shell=True)
57 except subprocess.CalledProcessError as err:
58 raise RuntimeError(err.output)
59
60
61def pull_table_schema(src_proj, src_ds, src_table, bq_path):
62 """Function to pull the JSON schema of a given BigQuery's table
63 Parameters:
64 Source Project Name, Source Dataset Name, Source Table Name
65 Output :
66 A JSON string with source table schema is created on the local disk
67 """
68 source_table_name = src_proj + ":" + src_ds + "." + src_table
69 # Pull the Json schema of the table using table FQN
70 try:
71 schema_json = subprocess.check_output(" ".join([
72 bq_path, "show", "--format=prettyjson", source_table_name]), shell=True)
73 except subprocess.CalledProcessError as err:
74 raise RuntimeError(err.output)
75 # check that we got a json object
76 return json.loads(schema_json)
77
78
79def view_columns_builder(source_table_schema, blacklist_str):
80 """
81 Function that loads and parses a JSON File and retunrs a string of
82 whitelisted columns
83 Parameters:
84 source_table_json: dict with source table schema
85 blacklist_str : A string with comma seperated blacklisted fields
86 Ouput:
87 A string with whitelisted columns which can be used in view creation
88 """
89 view_columns_str = ""
90 # Parse the blacklist string and build the blacklist
91 blacklist = list()
92 blacklist = list(blacklist_str.split(','))
93
94 # Loop through the schama and build a string with filtered columns
95 data = source_table_schema["schema"]["fields"]
96 for p_columns in data:
97 parent_col = p_columns["name"]
98 parent_col_mode = p_columns["mode"]
99 parent_col_type = p_columns["type"]
100 if parent_col_type == "RECORD":
101 # Handle structs that are null.
102 null_handler = "IF(%s is null, null, " % parent_col
103 view_columns_str = view_columns_str + null_handler
104 if parent_col_mode == "REPEATED":
105 view_columns_str = view_columns_str + "ARRAY(SELECT AS STRUCT "
106 elif (parent_col_mode == "NULLABLE" or parent_col_mode == "REQUIRED"):
107 view_columns_str = view_columns_str + "STRUCT("
108
109 cols = p_columns["fields"]
110 item_count = 0
111
112 for col in cols:
113 field_name = col["name"]
114 if field_name not in blacklist:
115 item_count = item_count + 1
116 if item_count == 1:
117 view_columns_str = view_columns_str + parent_col + "." \
118 + field_name
119 else:
120 view_columns_str = view_columns_str + "," + \
121 parent_col + "." + field_name
122 if parent_col_mode == "REPEATED":
123 view_columns_str = view_columns_str + \
124 " FROM UNNEST(" + parent_col + ") AS " + parent_col + ")) as " + \
125 parent_col
126 elif parent_col_mode == "NULLABLE" or parent_col_mode == "REQUIRED":
127 view_columns_str = view_columns_str + ")) as " + parent_col
128 else: # Top level column is not nested.
129 if parent_col not in blacklist:
130 view_columns_str = view_columns_str + parent_col
131 # Add comma separator between fields.
132 view_columns_str = view_columns_str + ","
133 return view_columns_str.rstrip(",")
134
135
136def main():
137 bq_command_path = os.environ.get('BQ_PATH', sys.argv[3:])
138 source_table_fqn = os.environ.get('TABLE_FQN', sys.argv[1:])
139 blacklist_fields_string = os.environ.get('BLACKLIST_FIELDS', sys.argv[2:])
140 destination_view_fqn = os.environ.get('VIEW_FQN', sys.argv[4:])
141 required_args = [bq_command_path, source_table_fqn, destination_view_fqn]
142 if not all(required_args):
143 print("required variable not set:\nbq_command_path: {}\nsource_table_fqn: {}\ndestination_view_fqn: {}".format(
144 bq_command_path, source_table_fqn, destination_view_fqn))
145 exit(1)
146 view_columns = ""
147 table_list = list(source_table_fqn.split('.'))
148 source_project_name = table_list[0]
149 source_dataset_name = table_list[1]
150 source_table_name = table_list[2]
151 source_schema = pull_table_schema(source_project_name, source_dataset_name,
152 source_table_name, bq_command_path)
153 view_columns = view_columns_builder(source_schema, blacklist_fields_string)
154 view_builder(view_columns, source_table_fqn, destination_view_fqn,
155 bq_command_path)
156
157
158if __name__ == '__main__':
159 main()