· 5 years ago · Jun 15, 2020, 12:18 PM
1import concurrent.futures
2from collections import defaultdict
3from typing import Tuple, List, DefaultDict
4
5import boto3
6import imageio
7import mercantile
8import numpy as np
9import requests
10from affine import Affine
11from rasterio import transform, MemoryFile
12from rasterio.coords import BoundingBox
13from rasterio.io import DatasetWriter
14from rasterio.windows import Window
15
16s3 = boto3.client("s3")
17
18
19def get_tiles_from_field_metadata(rgb_key: str, channel: str, zooms: int) -> List[List[int]]:
20 # Tiles per defined zoom level with mercantile API.
21 response = requests.get(f"https://jgau9m9qpf.execute-api.us-east-1.amazonaws.com/bondville"
22 f"/metadata/{rgb_key}/{channel}/")
23 west, south, east, north = response.json()['bounds']
24 tiles = list(mercantile.tiles(west, south, east, north, zooms))
25 final_tiles = [[tile.x, tile.y, tile.z] for tile in tiles]
26 return final_tiles
27
28
29def get_resulting_image_shape(rgb_key: str, channel: str, zooms: int,
30 tile_size: tuple = (256, 256)) -> Tuple[int, int]:
31 # Final resulting image shape (height, width).
32 tiles = get_tiles_from_field_metadata(rgb_key, channel, zooms)
33 combined_arrays_by_x = combine_image_data_by_x(tiles)
34
35 image_width, image_height = len(combined_arrays_by_x.keys()) * tile_size[0], None
36 for k in combined_arrays_by_x:
37 image_height = len(combined_arrays_by_x[k]) * tile_size[1]
38 break
39 return image_width, image_height
40
41
42def combine_image_data_by_x(tiles: List[List[int]]) -> DefaultDict[int, List[List[int]]]:
43 # Get dict of image array combined by x value.
44 initial_x = tiles[0][0]
45 combined_arrays_by_x = defaultdict(list)
46 for tile in tiles:
47 local_x = tile[0]
48 if local_x == initial_x:
49 xyz_array = [tile[0], tile[1], tile[2]]
50 combined_arrays_by_x[local_x].append(xyz_array)
51 else:
52 initial_x = local_x
53 xyz_array = [tile[0], tile[1], tile[2]]
54 combined_arrays_by_x[initial_x].append(xyz_array)
55 return combined_arrays_by_x
56
57
58def get_tile_bounds(tile: list) -> BoundingBox:
59 # Returns BoundingBox object for each given tile.
60 return BoundingBox(*mercantile.xy_bounds(tile))
61
62
63def transform_from_bounds(bounds, tile_size) -> Affine:
64 return transform.from_bounds(*bounds, *tile_size)
65
66
67def get_resulting_image_bounds(rgb_key: str, channel: str, zooms: int) -> BoundingBox:
68 # Final resulting image boundary (x_min, y_min, x_max, y_max).
69 tiles = get_tiles_from_field_metadata(rgb_key, channel, zooms)
70 x_min, y_min, x_max, y_max = [], [], [], []
71 for index, tile in enumerate(tiles):
72 bbox = get_tile_bounds(tiles[index])
73 x_min.append(bbox.left)
74 y_min.append(bbox.bottom)
75 x_max.append(bbox.right)
76 y_max.append(bbox.top)
77
78 return BoundingBox(min(x_min), min(y_min), max(x_max), max(y_max))
79
80
81def construct_url(key: str, x: int, y: int, z: int):
82 # Makes api calls to rgb tile endpoint.
83 return f"https://jgau9m9qpf.execute-api.us-east-1.amazonaws.com/bondville/" \
84 f"{key}/{x}/{y}/{z}.png?&r=red&g=green&b=blue" \
85 f"&r_range=%5B2048.0%2C8735.0%5D&g_range=%5B2048.0%2C8735.0%5D&b_range=%5B2048.0%2C8735.0%5D"
86
87
88def get_image_data_array(rgb_key: str, x: int, y: int, z: int):
89 # Construct tile data each time calling api with respective x, y, z coordinates.
90 image_data = imageio.imread(construct_url(rgb_key, x, y, z))
91 return image_data
92
93
94def write_image_for_one_tile(memory_file: DatasetWriter, image_data: np.ndarray, window: Window,
95 *args):
96 # Write image data per tile basis in memory.
97 memory_file.write(np.transpose(np.moveaxis(image_data, [0, 2, 1], [1, 2, 0])), window=window)
98
99
100def thread_function(memory_file: DatasetWriter, row_offset: int, column_offset: int, tile: list,
101 width: int, height: int, g_transform: Affine, no_data_v: int, key):
102 # Thread function in order to pass to ThreadPullExecutor.
103 image_data = get_image_data_array(f"rgb/{key}", tile[2], tile[0], tile[1])
104 tile_window = Window(column_offset, row_offset, 256, 256)
105 write_image_for_one_tile(memory_file, image_data, tile_window,
106 width, height, g_transform, no_data_v)
107
108
109def create_and_write_raster_to_s3(rgb_key: str, channel: str, zoom: int,
110 s3_bucket: str, s3_bucket_key: str):
111 # Create raster in memory by appending tiles window by window taking to account row and column
112 # offsets, afterwards uploading to specified s3 bucket.
113 resulting_tiles = get_tiles_from_field_metadata(rgb_key, channel, zoom)
114 res_image_width, res_image_height = get_resulting_image_shape(rgb_key, channel, zoom)
115 res_bounds = get_resulting_image_bounds(rgb_key, channel, zoom)
116 geo_transform = transform_from_bounds(res_bounds, (res_image_width, res_image_height))
117 tiles_map = combine_image_data_by_x(resulting_tiles)
118
119 # Please refer here to come with solution to "error setting certificate verify locations".
120 # https://github.com/mapbox/rasterio/issues/1289
121
122 no_data_value = 0
123 dst_profile = {
124 'driver': "GTiff",
125 'count': 3,
126 'height': res_image_height,
127 'width': res_image_width,
128 'dtype': "uint8",
129 'nodata': no_data_value,
130 'crs': "epsg:3857",
131 'transform': geo_transform,
132 }
133
134 mem_file = MemoryFile()
135 with mem_file.open(**dst_profile) as f:
136 col_off = -256
137 for key, x_tiles in tiles_map.items():
138 col_off += 256
139 row_off = -256
140 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
141 futures = []
142 for x_tile in x_tiles:
143 row_off += 256
144 futures.append(executor.submit(thread_function, f, row_off, col_off,
145 x_tile, res_image_width, res_image_height,
146 geo_transform, no_data_value, rgb_key))
147 concurrent.futures.wait(futures)
148 mem_file.seek(0)
149 s3.put_object(Bucket=s3_bucket, Key=s3_bucket_key, Body=mem_file.read())
150
151
152create_and_write_raster_to_s3("flight/9BINMT6XB", 'red', 18, "intelinair-infra", 'file.tif')