import torch
from termcolor import colored
import matplotlib.pyplot as plt
import numpy as np
from torchvision.transforms.functional import rotate
import geopy.distance
from copy import deepcopy
from shapely.geometry import Polygon
import os
import cv2
from pyraws.utils.constants import (
BAND_SPATIAL_RESOLUTION_DICT,
BAND_SPATIAL_RESOLUTION_DICT_ACROSS,
BANDS_RAW_SHAPE_DICT,
SWIR_BANDS,
)
from pyraws.utils.raw_utils import (
export_band_to_tif,
get_bands_shift,
get_granule_px_length,
swap_latlon,
)
from pyraws.utils.visualization_utils import equalize_tensor
from pyraws.utils.band_shape_utils import image_band_upsample
from pyraws.utils.date_utils import parse_string_date
[docs]class Raw_granule:
"""Initialize an raw granule.
Args:
granule_bands (list, optional): list of torch tensors representing each band in the format [H,W]. Defaults to None.
bands_names (list, optional): list of band names. Defaults to None.
granule_name (string, optionl): granule name. Defaults to None.
granule_polygon_coordinates (list, optional): list containing [lat, lon] for every polygon point. Defaults to None.
single_granule_along_track_size (float, optional): size of a single granule along track.
Useful to keep this info when the granule has parents.
Defaults to None.
cloud_percentage (list or float, optional): Cloud coverage percentage metadata. Defaults to None.
device (torch.device, optional): torch.device. Defaults to torch.device("cpu").
"""
# ---------------- PRIVATE VARIABLES ----------------
__bands_dict = None # Bands dictionary
__device = None # Device
__bands_names = None # Bands's names
__original = (
True # This parameter is used to keep track of the history of the granule.
)
# If it is True, it means the granule was created by reading an image, and no operation is applied.
__granule_parents = (
[]
) # It is used to keep track of the parents granules originating this granule.
__granule_polygon_coordinates = (
[]
) # Coordinates of the polygon interstecting granule area.
__single_granule_along_track_size = None # Along track size of a single granule.
__cloud_percentage_list = [] # Cloud coverage percentage
__cropped_pixels = [None, None] # Cropped pixels along-across
def __init__(
self,
granule_bands=None,
bands_names=None,
granule_name=None,
granule_polygon_coordinates=None,
single_granule_along_track_size=None,
cloud_percentage=None,
device=torch.device("cpu"),
):
if (granule_bands is not None) and (bands_names is not None):
self.__bands_dict = dict(zip(bands_names, granule_bands))
else:
self.__bands_dict = None
self.__device = device
self.__bands_names = bands_names
self.granule_name = granule_name
self.__cropped_pixels = [None, None]
self.__original = (
True # This parameter is used to keep track of the history of the granule.
)
# If it is True, it means the granule was created by reading an image and no operation is applied.
self.__granule_parents = (
[]
) # It is used to keep track of the parents granules originating this granule.
if granule_polygon_coordinates is None:
self.__granule_polygon_coordinates = (
[]
) # Coordinates of the polygon interstecting granule area.
else:
self.__granule_polygon_coordinates = granule_polygon_coordinates
self.__single_granule_along_track_size = single_granule_along_track_size
if cloud_percentage is None:
self.__cloud_percentage_list = []
elif isinstance(cloud_percentage, list):
self.__cloud_percentage_list = cloud_percentage
else:
self.__cloud_percentage_list.append(cloud_percentage)
# ----------------------------------PRIVATE METHODS ----------------------------------
def __set_cropped_pixels(self, cropped_pixels):
"""Set the new granule length.
Args:
cropped_pixels (list): the number of pixels cropped along-across track
"""
self.__cropped_pixels = cropped_pixels
def __set_single_granule_along_track_size(self, along_track_size):
"""Single track along track size.
Args:
along_track_size (float): single granule along track size
"""
self.__single_granule_along_track_size = along_track_size
def __set_originality(self, originality):
"""Set originality of granule.
Args:
originality (boolean): originality.
"""
self.__original = originality
def __set_parents(self, parents_list):
"""Set granule parents from parents list
Args:
parents_list (list): parents list.
"""
for granule_parent in parents_list:
self.__granule_parents.append(granule_parent)
def __set_cloud_percentage(self, cloud_percentage_list):
"""Set the cloud percentage.
Args:
cloud_percentage_list (list): cloud percentage list.
"""
self.__cloud_percentage_list = cloud_percentage_list
# ---------------------- PUBLIC METHODS -----------------------
[docs] def is_original(self):
"""It shows if the granule is original.
Returns:
boolean: originality.
"""
return self.__original
[docs] def get_parents(self):
"""Set granule parents from parents list
Returns:
list: parents list.
"""
parents_list = []
for granule_parent in self.__granule_parents:
parents_list.append(granule_parent)
return parents_list
[docs] def get_band(self, band_name):
"""Returns a specific band as a tensor.
Args:
band_name (string): band name
Returns:
torch.tensor: requested band.
"""
return self.__bands_dict[band_name]
[docs] def show_raw_band(self, band_name):
"""Show a granule band specified by band_name.
Args:
band_name (string): band_name
Raises:
ValueError: Impossible to show the requested band: band_name
ValueError: The requested granule is void.
"""
if self.__bands_dict is not None:
band = self.__bands_dict[band_name]
try:
if self.__device == torch.device("cuda"):
plt.imshow(band.detach().cpu().numpy())
else:
plt.imshow(band)
except: # noqa: E722
raise ValueError(
"Impossible to show the requested band "
+ colored(band_name, "red")
+ "."
)
else:
raise ValueError("The requested granule is void.")
[docs] def get_granule_coordinates(self, latlon_format=True):
"""Returns the granule's coordinates.
Args:
latlon_format (bool): format to use to get the granule's coordinates. Default to: Latitude, Longitude
Returns:
list: polygon coordinates.
"""
if latlon_format:
return self.__granule_polygon_coordinates
else:
return swap_latlon(self.__granule_polygon_coordinates)
[docs] def get_single_granule_along_track_size(self):
"""Retuns single granule along track size.
Returns:
float: single ganule along track size
"""
return self.__single_granule_along_track_size
[docs] def get_along_track_size(self):
"""Returns along track size.
Returns:
float: along track size.
"""
return geopy.distance.geodesic(
self.__granule_polygon_coordinates[0], self.__granule_polygon_coordinates[1]
).km
[docs] def get_across_track_size(self):
"""Returns across track size.
Returns:
float: across track size
"""
return geopy.distance.geodesic(
self.__granule_polygon_coordinates[1], self.__granule_polygon_coordinates[2]
).km
[docs] def get_cloud_percentage(self):
"""Returns cloud percentage.
Returns:
list: cloud percentage.
"""
return self.__cloud_percentage_list
[docs] def create_granule(
self,
bands_list,
granule_bands,
granule_name,
polygon_coordinates,
single_granule_along_track_size,
cloud_percentage,
):
"""Creates a granule from the list of bands and from a torch tensor.
Args:
bands_list (list): list of band names.
granule_bands (list): list of torch.tensors (bands) in format [H,W]
granule_name (str): granule name.
polygon_coordinates (list): list of [lan, lon] coordinates for every polygon point.
single_granule_along_track_size (float): along track size for a single granule.
cloud_percentage (float or list): cloud percentage.
"""
self.__bands_dict = dict(zip(bands_list, bands_list))
for band, n in zip(bands_list, range(len(bands_list))):
self.__bands_dict[band] = granule_bands[n].to(self.__device)
self.__bands_names = bands_list
self.granule_name = granule_name
self.__granule_polygon_coordinates = polygon_coordinates
self.__single_granule_along_track_size = single_granule_along_track_size
if isinstance(cloud_percentage, list):
self.__cloud_percentage_list = cloud_percentage
else:
self.__cloud_percentage_list.append(cloud_percentage)
[docs] def check_bands_size(self, requested_bands):
"""Checks if the requested bands have the same size
Args:
requested_bands (list): list of requested bands
Returns:
boolean: return True if all the requested bands have the same size. It returns False, otherwise.
"""
band_shape = self.__bands_dict[requested_bands[0]].shape
ok_flag = True
for band_name in requested_bands:
band = self.__bands_dict[band_name]
if band.shape != band_shape:
ok_flag = False
if ok_flag:
print(colored("Bands size is matching", "green"))
else:
print(colored("Bands size is not matching.", "red"))
return ok_flag
[docs] def as_tensor(self, requested_bands=None, downsampling=True):
"""Returns a tensor containing all the bands if all the requested bands have the same resolution.
Args:
requested_bands (list, optional): list of requested bands.
If None, all the bands are used. Defaults to None.
downsampling (boolean, optional): if True, bands are downsampled to the lowest resolution.
Otherwise, they are upsampled to the highest one.
Defaults to True.
Raises:
ValueError: The granule is empty
Returns:
torch.tensor: tensor containing the requested bands.
"""
if self.__bands_dict is None:
raise ValueError("The granule is empty.")
if requested_bands is None:
requested_bands = self.__bands_names
if downsampling:
bands_resolution_list = [
BAND_SPATIAL_RESOLUTION_DICT[b] for b in requested_bands
]
lowest_resolution = max(bands_resolution_list)
band_shape = [self.get_band(b) for b in requested_bands][
bands_resolution_list.index(lowest_resolution)
].shape
bands = torch.zeros(
[band_shape[0], band_shape[1], len(requested_bands)],
device=self.__device,
)
for band_name, n in zip(requested_bands, range(len(requested_bands))):
band = self.__bands_dict[band_name]
band_resolution = BAND_SPATIAL_RESOLUTION_DICT[band_name]
if band_resolution != lowest_resolution:
usf_v = int(lowest_resolution / band_resolution)
if lowest_resolution == 60:
usf_h = max(int(20 / (band_resolution)), 1)
else:
usf_h = int(lowest_resolution / band_resolution)
band = band[::usf_v, ::usf_h]
bands[:, :, n] = band
else:
resolution_min = min(
[BAND_SPATIAL_RESOLUTION_DICT[b] for b in requested_bands]
)
band_shape = [
max(self.get_band(b).shape[0] for b in requested_bands),
max(self.get_band(b).shape[1] for b in requested_bands),
]
bands = torch.zeros(
[band_shape[0], band_shape[1], len(requested_bands)],
device=self.__device,
)
for band_name, n in zip(requested_bands, range(len(requested_bands))):
band = self.__bands_dict[band_name]
if resolution_min != BAND_SPATIAL_RESOLUTION_DICT[band_name]:
if BAND_SPATIAL_RESOLUTION_DICT[band_name] == 60:
band_upsampled = image_band_upsample(
band[:, ::3], band_name, resolution_min, "bicubic"
)
else:
band_upsampled = image_band_upsample(
band, band_name, resolution_min, "bicubic"
)
if bands.shape[0] > band_upsampled.shape[0]:
bands[: band_upsampled.shape[0], :, n] = band_upsampled
elif bands.shape[0] < band_upsampled.shape[0]:
bands[:, :, n] = band_upsampled[: bands.shape[0], :]
else:
bands[:, :, n] = band_upsampled
else:
bands[:, :, n] = band
return bands
[docs] def stack_to_granule(self, granule_other, position):
"""It stacks the granule to another input granule to a specificied position.
For instance, if position == ""T"", the granule will be stacked at the top of the ""granule_other"".
Args:
granule_other (Raw_granule): other granule to which the input granule will be stacked to.
position (string): Stacking poistion. Only [""T"", ""B"", ""R"", ""L""] are supported.
Raises:
ValueError: The granule is void.
ValueError: Impossible to stack granules. Granules have different bands.
ValueError: Only the following stacking positions are supported: [""T"", ""B"", ""R"", ""L""].
Returns:
Raw_granule: stacked granule
"""
if self.__bands_dict is None:
raise ValueError("The granule is void.")
bands_names = self.__bands_names
if bands_names != granule_other.__bands_names:
raise ValueError(
"Impossible to stack granules. Granules have different bands."
)
if position not in ["T", "B", "R", "L"]:
raise ValueError(
"Only the following stacking positions are supported: ["
"T"
", "
"B"
", "
"R"
", "
"L"
"]."
)
granule_stacked_bands = []
for band_name in bands_names:
granule_1_band = self.__bands_dict[band_name]
granule_2_band = granule_other.__bands_dict[band_name]
if granule_1_band.device == torch.device("cuda"):
granule_2_band = granule_2_band.cuda()
else:
granule_2_band = granule_2_band.cpu()
if position == "T":
granule_band = torch.zeros(
[
granule_1_band.shape[0] + granule_2_band.shape[0],
granule_1_band.shape[1],
]
)
granule_band[: granule_1_band.shape[0]] = granule_1_band
granule_band[granule_1_band.shape[0] :] = granule_2_band
elif position == "B":
granule_band = torch.zeros(
[
granule_2_band.shape[0] + granule_1_band.shape[0],
granule_1_band.shape[1],
]
)
granule_band[: granule_2_band.shape[0]] = granule_2_band
granule_band[granule_2_band.shape[0] :] = granule_1_band
elif position == "L":
granule_band = torch.zeros(
[
granule_1_band.shape[0],
granule_1_band.shape[0] + granule_2_band.shape[0],
]
)
granule_band[:, : granule_1_band.shape[0]] = granule_1_band
granule_band[:, granule_1_band.shape[0] :] = granule_2_band
else:
granule_band = torch.zeros(
[
granule_1_band.shape[0],
granule_2_band.shape[0] + granule_1_band.shape[0],
]
)
granule_band[:, : granule_2_band.shape[0]] = granule_2_band
granule_band[:, granule_2_band.shape[0] :] = granule_1_band
granule_stacked_bands.append(granule_band)
granule_name = (
self.granule_name
+ "_STACKED_"
+ position
+ "_"
+ granule_other.granule_name
)
granule_other_coordinates = granule_other.get_granule_coordinates()
granule_this_coordinates = self.get_granule_coordinates()
granule_this_cloud_percentage = self.get_cloud_percentage()
granule_other_cloud_percentage = granule_other.get_cloud_percentage()
if position == "T":
granule_coordinates_polygon = [
granule_this_coordinates[0],
granule_other_coordinates[1],
granule_other_coordinates[2],
granule_this_coordinates[3],
]
stacked_granule_cloud_percentage = [
granule_this_cloud_percentage,
granule_other_cloud_percentage,
]
elif position == "B":
granule_coordinates_polygon = [
granule_other_coordinates[0],
granule_this_coordinates[1],
granule_this_coordinates[2],
granule_other_coordinates[3],
]
stacked_granule_cloud_percentage = [
granule_other_cloud_percentage,
granule_this_cloud_percentage,
]
elif position == "L":
granule_coordinates_polygon = [
granule_this_coordinates[0],
granule_this_coordinates[1],
granule_other_coordinates[2],
granule_other_coordinates[3],
]
stacked_granule_cloud_percentage = [
granule_this_cloud_percentage,
granule_other_cloud_percentage,
]
else:
granule_coordinates_polygon = [
granule_other_coordinates[0],
granule_other_coordinates[1],
granule_this_coordinates[2],
granule_this_coordinates[3],
]
stacked_granule_cloud_percentage = [
granule_other_cloud_percentage,
granule_this_cloud_percentage,
]
stacked_granule = Raw_granule(
granule_bands=granule_stacked_bands,
bands_names=bands_names,
granule_name=granule_name,
granule_polygon_coordinates=granule_coordinates_polygon,
cloud_percentage=stacked_granule_cloud_percentage,
device=self.__device,
)
stacked_granule.__set_originality(False)
stacked_granule.__set_single_granule_along_track_size(
self.__single_granule_along_track_size
)
if len(self.__granule_parents) == 0:
stacked_granule.__set_parents(
[self.granule_name, granule_other.granule_name]
)
else:
stacked_granule.__set_parents(
self.__granule_parents + [granule_other.granule_name]
)
return stacked_granule
[docs] def rotate_band(self, requested_band):
"""Rotates one band of 180 degrees.
Args:
requested_band (string): band name.
"""
band = self.get_band(requested_band)
self.__bands_dict[requested_band] = rotate(band.unsqueeze(2), 180).squeeze(2)
self.__set_originality(False)
# def __update_coordinates_first_band(band_name):
[docs] def get_detectors_number(self):
"""Returns the detector numbers list.
Returns:
list: List of detector numbers.
"""
granule_detector_info = self.get_granule_info()
return granule_detector_info[3]
[docs] def coarse_coregistration(
self,
rotate_swir_bands=True,
granule_filler_before=None,
granule_filler_after=None,
crop_empty_pixels=False,
downsampling=True,
bands_shifts=None,
verbose=False,
):
"""It implements the coarse coregistration of the bands by compensating
the along-track pixels shift with respect to the first band.
Args:
rotate_swir_bands (boolean, optional): if True, SWIR bands are rotated before applying coregistration.
Defaults to True.
granule_filler_before (Raw_granule, optional): if not None, bands of this tile will be used to fill
the missing elements during the coregistration as before granule.
Defaults to None.
granule_filler_after (Raw_granule, optional): if not None, bands of this tile will be used to fill the missing
elements during the coregistration as bottom granule.
Defaults to None.
crop_empty_pixels (boolean, optional): if True and no fillers are available or granule fillers are None,
the image will be crop at the bottom and the top of a number
of pixels equal to the maximum zeros pixels stacked from each band.
downsampling (boolean, optional): if True, higher resolution bands will be undersampled to match the bands
with the lowest resolution.
If False, lower resolution bands will be upsampled to match the bands
with the highest resolution. Defaults to True.
bands_shifts (list, optional): bands shift values compared to the first band.
If None, they will be read by the LUT file. Defaults to None.
verbose (boolean, optional): if True, verbose mode is used. Defaults to False.
Returns:
Raw_granule: granule with coarse-coregistered bands.
"""
def shift_band(band, shifts):
band_shifted = torch.zeros_like(band)
shifts = [int(shifts[0]), int(shifts[1])]
if (shifts[0] == 0) and (shifts[1] == 0):
band_shifted = band
elif (shifts[0] == 0) and (shifts[1] < 0):
band_shifted[:, : int(shifts[1])] = band[:, -int(shifts[1]) :]
elif (shifts[0] == 0) and (shifts[1] > 0):
band_shifted[:, int(shifts[1]) :] = band[:, : -int(shifts[1])]
elif (shifts[0] < 0) and (shifts[1] == 0):
band_shifted[: int(shifts[0]), :] = band[-int(shifts[0]) :, :]
elif (shifts[0] > 0) and (shifts[1] == 0):
band_shifted[int(shifts[0]) :, :] = band[: -int(shifts[0]), :]
elif (shifts[0] > 0) and (shifts[1] > 0):
band_shifted[int(shifts[0]) :, int(shifts[1]) :] = band[
: -int(shifts[0]), : -int(shifts[1])
]
elif (shifts[0] > 0) and (shifts[1] < 0):
band_shifted[int(shifts[0]) :, : int(shifts[1])] = band[
: -int(shifts[0]), -int(shifts[1]) :
]
elif (shifts[0] < 0) and (shifts[1] > 0):
band_shifted[: int(shifts[0]), int(shifts[1]) :] = band[
-int(shifts[0]) :, : -int(shifts[1])
]
else:
band_shifted[: int(shifts[0]), : int(shifts[1])] = band[
-int(shifts[0]) :, -int(shifts[1]) :
]
return band_shifted
def shifts2TBLT(along_track: int, cross_track: int):
"""Transforms along and cross track offsets into top, bottom, left and right offsets
Args:
along_track (int): The offset in the along track direction
cross_track (int): The offset in the cross track direction
Returns:
Shifts offset in top, bottom, left, right
"""
top, left, right, bottom = 0, 0, 0, 0
if along_track < 0:
bottom = abs(along_track)
elif along_track > 0:
top = abs(along_track)
if cross_track > 0:
left = abs(cross_track)
elif cross_track < 0:
right = abs(cross_track)
return top, bottom, left, right
def maxOffsetUpdate(
max_top: int,
max_top_band: str,
max_bottom: int,
max_bottom_band: str,
max_left: int,
max_left_band: str,
max_right: int,
max_right_band: str,
top: int,
bottom: int,
left: int,
right: int,
band: str,
):
"""Updates the maximum offsets in top, bottom, left, right directions
Args:
max_top (int): The maximum offset in the top direction
max_top_band (str): band having the maximum top offset
max_bottom (int): The maximum offset in the bottom direction
max_bottom_band (str): band having the maximum top offset
max_right (int): The maximum offset in the right direction
max_left_band (str): band having the maximum top offset
max_left (int): The maximum offset in the left direction
max_right_band (str): band having the maximum top offset
top (int): The current offset in the top direction
bottom (int): The current offset in the bottom direction
right (int): The current offset in the right direction
left (int): The current offset in the left direction
band (str): curent band name
Returns:
The updated maximum offsets in top, bottom, left, right directions,
updated max_top, max_bottom, max_right, max_left
"""
top_rescaled, bottom_rescaled = (
BAND_SPATIAL_RESOLUTION_DICT[band]
/ BAND_SPATIAL_RESOLUTION_DICT[max_top_band]
* top,
BAND_SPATIAL_RESOLUTION_DICT[band]
/ BAND_SPATIAL_RESOLUTION_DICT[max_bottom_band]
* bottom,
)
right_rescaled, left_rescaled = (
min(BAND_SPATIAL_RESOLUTION_DICT[band], 20)
/ min(BAND_SPATIAL_RESOLUTION_DICT[max_right_band], 20)
* right,
min(BAND_SPATIAL_RESOLUTION_DICT[band], 20)
/ min(BAND_SPATIAL_RESOLUTION_DICT[max_left_band], 20)
* left,
)
if top_rescaled > max_top:
max_top = top
max_top_band = band
if bottom_rescaled > max_bottom:
max_bottom = bottom
max_bottom_band = band
if right_rescaled > max_right:
max_right = right
max_right_band = band
if left_rescaled > max_left:
max_left = left
max_left_band = band
return (
max_top,
max_bottom,
max_left,
max_right,
max_top_band,
max_bottom_band,
max_left_band,
max_right_band,
)
granule_detector_number = self.get_detectors_number()[0]
satellite = self.get_granule_info()[0][:3]
if bands_shifts is None:
bands_shifts = get_bands_shift(
self.__bands_names,
satellite,
granule_detector_number,
downsampling=downsampling,
cfg_file_dict=None,
)
if verbose:
print("Bands shifts: ", colored(bands_shifts, "green"))
coregistered_bands = []
max_top, max_bottom, max_left, max_right = (
0,
0,
0,
0,
) # Maximum of offsets init.
max_top_band = self.__bands_names[0]
max_bottom_band = self.__bands_names[0]
max_left_band = self.__bands_names[0]
max_right_band = self.__bands_names[0]
# Assigning to None and changed later if needed.
filler_name = {"top": None, "bottom": None}
for n in range(len(self.__bands_names)):
band = self.__bands_dict[self.__bands_names[n]]
if (self.__bands_names[n] in SWIR_BANDS) and (rotate_swir_bands):
band = rotate(band.unsqueeze(2), 180).squeeze(2)
# No shift for the first band.
if n == 0:
coregistered_bands.append(band) # MASTER BAND
else:
along_track, cross_track = bands_shifts[n - 1]
top, bottom, left, right = shifts2TBLT(
along_track, cross_track
) # Shfits to top, bottom, right, left
(
max_top,
max_bottom,
max_left,
max_right,
max_top_band,
max_bottom_band,
max_left_band,
max_right_band,
) = maxOffsetUpdate(
max_top,
max_top_band,
max_bottom,
max_bottom_band,
max_left,
max_left_band,
max_right,
max_right_band,
top,
bottom,
left,
right,
self.__bands_names[n],
)
band_name = self.__bands_names[n]
band_shifted = shift_band(band, shifts=[along_track, cross_track])
# -------------------------- BEFORE CASE:
if granule_filler_before is not None:
band_filler_before = granule_filler_before.get_band(band_name)
if (self.__bands_names[n] in SWIR_BANDS) and (rotate_swir_bands):
band_filler_before = rotate(
band_filler_before.unsqueeze(2), 180
).squeeze(2)
band_filler_before_across_shifted = shift_band(
band_filler_before, shifts=[0, cross_track]
)
# Band filler is on top:
if top != 0:
band_shifted[: int(top), :] = band_filler_before_across_shifted[
-int(top) :, :
]
filler_name["top"] = granule_filler_before.get_granule_info()[0]
# --------------------- AFTER CASE:
if granule_filler_after is not None:
band_filler_after = granule_filler_after.get_band(band_name)
if (self.__bands_names[n] in SWIR_BANDS) and (rotate_swir_bands):
band_filler_after = rotate(
band_filler_after.unsqueeze(2), 180
).squeeze(2)
band_filler_after_across_shifted = shift_band(
band_filler_after, shifts=[0, cross_track]
)
# Band filler is on bottom:
if bottom != 0:
band_shifted[
-int(bottom) :, :
] = band_filler_after_across_shifted[: int(bottom), :]
filler_name["bottom"] = granule_filler_after.get_granule_info()[
0
]
coregistered_bands.append(band_shifted)
# Managing crop exmpty pixel case
if crop_empty_pixels:
for band, n in zip(coregistered_bands, range(len(coregistered_bands))):
band_name = self.__bands_names[n]
# Resizing max pixel shifts with respect to the resolution of the current band
max_top_resized = int(
max_top
* BAND_SPATIAL_RESOLUTION_DICT[max_top_band]
/ BAND_SPATIAL_RESOLUTION_DICT[band_name]
)
max_bottom_resized = int(
max_bottom
* BAND_SPATIAL_RESOLUTION_DICT[max_bottom_band]
/ BAND_SPATIAL_RESOLUTION_DICT[band_name]
)
max_right_resized = int(
max_right
* min(BAND_SPATIAL_RESOLUTION_DICT[max_right_band], 20)
/ min(BAND_SPATIAL_RESOLUTION_DICT[band_name], 20)
)
max_left_resized = int(
max_left
* min(BAND_SPATIAL_RESOLUTION_DICT[max_left_band], 20)
/ min(BAND_SPATIAL_RESOLUTION_DICT[band_name], 20)
)
# The band shall be cropped only if it has not been filled.
if (filler_name["bottom"] is None) and (max_bottom_resized != 0):
band_cropped = band[:-max_bottom_resized]
else:
band_cropped = band
# The band shall be cropped only if it has not been filled.
if (filler_name["top"] is None) and (max_top_resized != 0):
band_cropped = band_cropped[max_top_resized:]
if max_right_resized != 0:
band_cropped = band_cropped[:, max_left_resized:-max_right_resized]
else:
band_cropped = band_cropped[:, max_left_resized:]
coregistered_bands[n] = band_cropped
# Getting information to manage coordinates adjustments.
polygon_coordinates = np.array(deepcopy(self.get_granule_coordinates()))
n_stacked_granules = self.get_number_of_granules_along_track_line()
granule_length = get_granule_px_length(
n_stacked_granules, satellite, granule_detector_number
)
granule_length_across = int(BANDS_RAW_SHAPE_DICT["B02"][1])
# Adjusting coordinates after cropping on top and bottom
if max_top != 0 or max_bottom != 0:
# Reprojecting max cropped pixels with respect to band 2 to have on ground resolution.
max_top_resized = int(
max_top
* BAND_SPATIAL_RESOLUTION_DICT[max_top_band]
/ BAND_SPATIAL_RESOLUTION_DICT["B02"]
)
max_bottom_resized = int(
max_bottom
* BAND_SPATIAL_RESOLUTION_DICT[max_bottom_band]
/ BAND_SPATIAL_RESOLUTION_DICT["B02"]
)
band_shift_01_top = (
np.abs(max_top_resized)
/ granule_length
* (polygon_coordinates[1] - polygon_coordinates[0])
)
band_shift_32_top = (
np.abs(max_top_resized)
/ granule_length
* (polygon_coordinates[2] - polygon_coordinates[3])
)
band_shift_01_bottom = (
np.abs(max_bottom_resized)
/ granule_length
* (polygon_coordinates[1] - polygon_coordinates[0])
)
band_shift_32_bottom = (
np.abs(max_bottom_resized)
/ granule_length
* (polygon_coordinates[2] - polygon_coordinates[3])
)
# Adjusting coordinates only in absence of top filling.
if filler_name["top"] is None:
# Reprojecting top shift with respect to band 9
# (used inside the granule to manage the lenght of the
# pixel when bands coordinates are used.)
max_top_resized_09 = int(
max_top
* BAND_SPATIAL_RESOLUTION_DICT[max_top_band]
/ BAND_SPATIAL_RESOLUTION_DICT["B09"]
)
point_0_shifted = polygon_coordinates[0] + band_shift_01_top
point_3_shifted = polygon_coordinates[3] + band_shift_32_top
else:
# No pixels cropped.
max_top_resized_09 = 0
point_0_shifted = polygon_coordinates[0]
point_3_shifted = polygon_coordinates[3]
# Adjusting coordinates only in absence of bottom filling.
if filler_name["bottom"] is None:
# Reprojecting bottom shift with respect to band 9
# (used inside the granule to manage the lenght of the pixel
# when bands coordinates are used.)
max_bottom_resized_09 = int(
max_bottom
* BAND_SPATIAL_RESOLUTION_DICT[max_bottom_band]
/ BAND_SPATIAL_RESOLUTION_DICT["B09"]
)
point_1_shifted = polygon_coordinates[1] - band_shift_01_bottom
point_2_shifted = polygon_coordinates[2] - band_shift_32_bottom
else:
# No pixels cropped.
max_bottom_resized_09 = 0
point_1_shifted = polygon_coordinates[1]
point_2_shifted = polygon_coordinates[2]
coregistered_polygon_coordinates = [
point_0_shifted,
point_1_shifted,
point_2_shifted,
point_3_shifted,
]
polygon_coordinates = coregistered_polygon_coordinates
# Used later in left and right adjustment to take into account of cropped coordinates on top and right.
else: # No coordinates adjustment on top and bottom.
max_top_resized_09 = 0
max_bottom_resized_09 = 0
polygon_coordinates = np.array(deepcopy(self.get_granule_coordinates()))
# Same coordinates of original granule. No top or bottom coordinates adjustment.
# Adjusting coordinates after cropping on right and left
if max_left != 0 or max_right != 0:
max_left_resized = int(
max_left
* BAND_SPATIAL_RESOLUTION_DICT_ACROSS[max_left_band]
/ BAND_SPATIAL_RESOLUTION_DICT_ACROSS["B02"]
)
band_shift_30_left = (
np.abs(max_left_resized)
/ granule_length_across
* (polygon_coordinates[3] - polygon_coordinates[0])
)
band_shift_21_left = (
np.abs(max_left_resized)
/ granule_length_across
* (polygon_coordinates[2] - polygon_coordinates[1])
)
max_right_resized = int(
max_right
* BAND_SPATIAL_RESOLUTION_DICT_ACROSS[max_right_band]
/ BAND_SPATIAL_RESOLUTION_DICT_ACROSS["B02"]
)
band_shift_30_right = (
np.abs(max_right_resized)
/ granule_length_across
* (polygon_coordinates[3] - polygon_coordinates[0])
)
band_shift_21_right = (
np.abs(max_right_resized)
/ granule_length_across
* (polygon_coordinates[2] - polygon_coordinates[1])
)
point_0_shifted = polygon_coordinates[0] + band_shift_30_left
point_3_shifted = polygon_coordinates[3] + band_shift_30_right
point_1_shifted = polygon_coordinates[1] + band_shift_21_left
point_2_shifted = polygon_coordinates[2] + band_shift_21_right
coregistered_polygon_coordinates = [
point_0_shifted,
point_1_shifted,
point_2_shifted,
point_3_shifted,
]
# Reprojecting right shift with respect to band 9
# (used inside the granule to manage the lenght of the pixel
# across-track when bands coordinates are used.)
max_left_resized_09 = int(
max_left
* BAND_SPATIAL_RESOLUTION_DICT_ACROSS[max_left_band]
/ BAND_SPATIAL_RESOLUTION_DICT_ACROSS["B09"]
)
max_right_resized_09 = int(
max_right
* BAND_SPATIAL_RESOLUTION_DICT_ACROSS[max_right_band]
/ BAND_SPATIAL_RESOLUTION_DICT_ACROSS["B09"]
)
else:
# Set to polygon coordinates, which takes into account vertically cropped pixels if needed.
coregistered_polygon_coordinates = polygon_coordinates
# No pixels cropped.
max_left_resized_09 = 0
max_right_resized_09 = 0
else: # No cropped pixels
max_top_resized_09 = 0
max_bottom_resized_09 = 0
max_left_resized_09 = 0
max_right_resized_09 = 0
coregistered_polygon_coordinates = self.get_granule_coordinates()
# Adjusting granule name.
granule_name = self.granule_name
# Adding COMPLEMENTED_WITH_top_granule_number_bottom_granule_number
# depending on the case
if filler_name["top"] is not None or filler_name["bottom"] is not None:
granule_name = granule_name + "_COMPLEMENTED_WITH"
if filler_name["top"] is not None:
granule_name = granule_name + f"_top_{filler_name['top']}"
if filler_name["bottom"] is not None:
granule_name = granule_name + f"_bottom_{filler_name['bottom']}"
# In any case, adding _COREGISTERED mark
granule_name = granule_name + "_COREGISTERED"
# Creating a new granule
coregistered_granule = Raw_granule(
granule_bands=coregistered_bands,
bands_names=self.__bands_names,
granule_name=granule_name,
granule_polygon_coordinates=coregistered_polygon_coordinates,
cloud_percentage=self.get_cloud_percentage(),
device=self.__device,
)
# Adding cropped pixels information to manage a correct extraction of bands coordinateds for the new granule.
coregistered_granule.__set_cropped_pixels(
cropped_pixels=[
(max_bottom_resized_09 + max_top_resized_09),
(max_left_resized_09 + max_right_resized_09),
]
)
coregistered_granule.__set_originality(False)
# Adding parents.
coregistered_granule.__set_parents(self.__granule_parents)
# Adding information on being a single granule along track
coregistered_granule.__set_single_granule_along_track_size(
self.__single_granule_along_track_size
)
return coregistered_granule
[docs] def show_bands_superimposition(
self, requested_bands=None, downsampling=True, equalize=False, n_std=2
):
"""It shows the superimposition of bands in requested_bands
Args:
requested_bands (list, optional): requested bands list.
If None, all the bands are used.
Defaults to None.
downsampling (boolean, optional): if True, bands are downsampled to have the same resolution.
Default to False.
equalize (boolean, optional): if True, bands are equalized for a better plotting by saturating
the outliers of each band with an upper and lower valuer respectively
equal to pixel value mean *- n_std * histogram standard deviation.
Default to False.
n_std (integer, optional): number of times the value of the pixel values standard deviation used
for histogram cropping. Defaults to 2.
Raises:
ValueError: Impossible to superimpose more than 3 bands
"""
if requested_bands is None:
requested_bands = self.__bands_names
if len(requested_bands) > 3:
raise ValueError("Impossible to superimpose more than 3 bands.")
bands_superimposed = self.as_tensor(requested_bands, downsampling=downsampling)
if bands_superimposed.shape[2] == 2:
bands_superimposed_zeros = torch.zeros(
[bands_superimposed.shape[0], bands_superimposed.shape[1], 3]
)
bands_superimposed_zeros[:, :, :2] = bands_superimposed
bands_superimposed_zeros[:, :, 2] = (
bands_superimposed[:, :, 0] + bands_superimposed[:, :, 1]
) / 2
bands_superimposed = bands_superimposed_zeros
if equalize:
bands_superimposed_equalized = equalize_tensor(bands_superimposed, n_std)
bands_superimposed_equalized = (
bands_superimposed_equalized
/ (2**12 - 1)
* bands_superimposed.max()
/ bands_superimposed_equalized.max()
)
else:
bands_superimposed_equalized = bands_superimposed
bands_superimposed_equalized = (
bands_superimposed_equalized / bands_superimposed_equalized.max()
)
plt.imshow(
bands_superimposed_equalized.detach().cpu().numpy(),
)
[docs] def show_bands(
self,
requested_bands=None,
downsampling=False,
oversampling=False,
rotate_swir_bands=False,
):
"""It shows the requested bands.
Args:
requested_bands (list, optional): list of requested bands to show.
If None, all the bands are shown. Defaults to None.
downsampling (bool, optional): if True, bands are downsampled to have the same resolution.
Default to False.
oversampling (bool, optional): if True, bands are oversampled to have the same resolution.
Downsampling has priority over upampling. Default to False.
rotate_swir_bands (bool, optional): if True, SWIR bands are rotated. Default to False.
"""
if requested_bands is None:
requested_bands = self.__bands_names
n_bands = len(requested_bands)
n_rows = int(np.ceil(n_bands / 3))
if n_rows > 1:
fig, ax = plt.subplots(n_rows, 3)
plt.tight_layout()
else:
fig, ax = plt.subplots(n_rows, 3)
if downsampling:
n_printed = 0
granule_tensor = self.as_tensor(downsampling=True)
for n in range(len(self.__bands_names)):
if self.__bands_names[n] in requested_bands:
if n_rows > 1:
row = int(np.floor(n / 3))
column = int(n_printed - 3 * row)
ax_plot = ax[row, column]
else:
ax_plot = ax[n_printed]
band_name = self.__bands_names[n]
band = granule_tensor[:, :, n]
if rotate_swir_bands and (band_name in SWIR_BANDS):
band = rotate(band.unsqueeze(2), 180).squeeze(2)
if self.__device == torch.device("cuda"):
ax_plot.imshow(band.detach().cpu().numpy())
else:
ax_plot.imshow(band)
ax_plot.grid(False)
ax_plot.axis("off")
ax_plot.set_title(band_name, fontsize=30)
n_printed += 1
if n_rows > 1:
for n in range(n_printed, 3 * n_rows):
ax_plot = ax[n_rows - 1, n - n_printed + 1]
ax_plot.set_axis_off()
else:
for n in range(n_printed, 3 * n_rows):
ax_plot = ax[n]
ax_plot.set_axis_off()
elif (oversampling) and not (downsampling):
n_printed = 0
granule_tensor = self.as_tensor(downsampling=False)
for n in range(len(self.__bands_names)):
if self.__bands_names[n] in requested_bands:
if n_rows > 1:
row = int(np.floor(n / 3))
column = int(n_printed - 3 * row)
ax_plot = ax[row, column]
else:
ax_plot = ax[n_printed]
band_name = self.__bands_names[n]
band = granule_tensor[:, :, n]
if rotate_swir_bands and (band_name in SWIR_BANDS):
band = rotate(band.unsqueeze(2), 180).squeeze(2)
if self.__device == torch.device("cuda"):
ax_plot.imshow(band.detach().cpu().numpy())
else:
ax_plot.imshow(band)
ax_plot.grid(False)
ax_plot.axis("off")
ax_plot.set_title(band_name, fontsize=30)
n_printed += 1
if n_rows > 1:
for n in range(n_printed, 3 * n_rows):
ax_plot = ax[n_rows - 1, n - n_printed + 1]
ax_plot.set_axis_off()
else:
for n in range(n_printed, 3):
ax_plot = ax[n]
ax_plot.set_axis_off()
else:
for n in range(n_rows * 3):
if n_rows > 1:
row = int(np.floor(n / 3))
column = int(n - 3 * row)
ax_plot = ax[row, column]
else:
ax_plot = ax[n]
if n < n_bands:
band_name = requested_bands[n]
band = self.get_band(band_name)
if rotate_swir_bands and (band_name in SWIR_BANDS):
band = rotate(band.unsqueeze(2), 180).squeeze(2)
if self.__device == torch.device("cuda"):
ax_plot.imshow(band.detach().cpu().numpy())
else:
ax_plot.imshow(band)
ax_plot.grid(False)
ax_plot.axis("off")
ax_plot.title.set_text(band_name)
ax_plot.set_title(band_name, fontsize=30)
else:
ax_plot.set_axis_off()
[docs] def get_granule_info(self):
"""Returns name, sensing time, acquisition time, detector number information,
originality, parents list, polygon coordinates list, cloud percentages.
If the granule have parents, sensing time, acquisition time, detector number of parents,
and cloud percentages are returned.
Returns:
string: name
list of datetime: list of sensing time.
list of datetime: acquisition time.
list of int: detector numbers.
boolean: originality
list: parents_list.
list: polygon coordinates.
list: granules percentages.
"""
granule_parents = self.get_parents()
if len(granule_parents) == 0:
granule_name = self.granule_name
# Removing _COREGISTERED tag
if granule_name.find("_COREGISTERED") > 0:
granule_name = granule_name[: granule_name.find("_COREGISTERED")]
sensing_time = parse_string_date(granule_name[-26:-11])
acquisition_time = parse_string_date(granule_name[-43:-29])
detector_numer = int(granule_name[-9:-7])
return (
granule_name,
[sensing_time],
[acquisition_time],
[detector_numer],
self.is_original(),
granule_parents,
self.__granule_polygon_coordinates,
self.__cloud_percentage_list,
)
else:
sensing_time_list = []
acquisition_time_list = []
detector_numer_list = []
for granule_parent in granule_parents:
granule_name = granule_parent
sensing_time_list.append(parse_string_date(granule_name[-26:-11]))
acquisition_time_list.append(parse_string_date(granule_name[-43:-29]))
detector_numer_list.append(int(granule_name[-9:-7]))
return (
self.granule_name,
sensing_time_list,
acquisition_time_list,
detector_numer_list,
self.is_original(),
granule_parents,
self.__granule_polygon_coordinates,
self.__cloud_percentage_list,
)
[docs] def show_granule_info(self):
"""Print granule info."""
granule_info = self.get_granule_info()
print(colored("------------------Granule ----------------------------", "blue"))
print("Name: ", colored(granule_info[0], "red"))
print("Sensing time: ", colored(granule_info[1], "red"))
print("Creation time: ", colored(granule_info[2], "red"))
print("Detector number: ", colored(granule_info[3], "red"))
print("Originality: ", colored(granule_info[4], "red"))
print("Parents: ", colored(granule_info[5], "red"))
coordinates = granule_info[6]
print("Polygon coordinates: \n")
for m in range(len(coordinates)):
print(
colored("\tP_" + str(m), "blue")
+ " : "
+ colored(str(coordinates[m]) + "\n", "red")
)
print("Cloud coverage: ", colored(granule_info[7], "red"))
print("\n")
[docs] def get_number_of_granules_along_track_line(self):
"""Returns the maximum number of granules stacked along track among parents.
Returns:
int: Number of granules correctly stacked (same detector number and correct sensing time difference) along track.
"""
if not (len(self.__granule_parents)):
return 1
else:
parents = self.__granule_parents
granule_info = self.get_granule_info()
detector_number_list = granule_info[3]
sensing_time_list = granule_info[1]
max_along_track = 1
tested_parent_list = [0 for n in range(len(parents))]
for n in range(len(parents)):
stacked_to_n_list = [n]
if not (tested_parent_list[n]):
tested_parent_list[n] = 1
for m in range(n + 1, len(parents)):
if detector_number_list[n] == detector_number_list[m]:
if not (
(sensing_time_list[m] - sensing_time_list[n]).seconds % 3
) or not (
(sensing_time_list[n] - sensing_time_list[m]).seconds % 4
):
tested_parent_list[m] = 1
stacked_to_n_list.append(m)
max_along_track = max(max_along_track, len(stacked_to_n_list))
return max_along_track
[docs] def get_bands_coordinates(self, downsampling=True, latlon_format=True):
"""Returns the coordinates of the bands.
Args:
downsampling (bool, optional): if True, bands are downsampled to have the same resolution.
Default to False.
latlon_format (bool): format to use to get the granule's coordinates.
Default to: Latitude, Longitude.
Returns:
dict: band-names/band coordinates dict.
"""
detector_number = self.get_detectors_number()[0]
polygon_coordinates = np.array(deepcopy(self.get_granule_coordinates()))
band_names = self.__bands_names
band_shifted_list = []
n_stacked_granules = self.get_number_of_granules_along_track_line()
satellite = self.get_granule_info()[0][:3]
granule_length = get_granule_px_length(
n_stacked_granules, satellite, detector_number, self.__cropped_pixels[0]
)
for band_name, n in zip(band_names[:], range(len(band_names[:]))):
res_scale_factor = (
BAND_SPATIAL_RESOLUTION_DICT[band_name]
/ BAND_SPATIAL_RESOLUTION_DICT["B02"]
)
band = self.get_band(band_name)
p0_1_angle = (
res_scale_factor
* len(band)
/ granule_length
* (polygon_coordinates[1] - polygon_coordinates[0])
)
p3_2_angle = (
res_scale_factor
* len(band)
/ granule_length
* (polygon_coordinates[2] - polygon_coordinates[3])
)
if band_name == "B02":
point_1_shifted = polygon_coordinates[0] + p0_1_angle
point_2_shifted = polygon_coordinates[3] + p3_2_angle
shifted_points = [
polygon_coordinates[0],
point_1_shifted,
point_2_shifted,
polygon_coordinates[3],
]
elif band_name == "B09":
point_0_shifted = polygon_coordinates[1] - p0_1_angle
point_3_shifted = polygon_coordinates[2] - p3_2_angle
shifted_points = [
point_0_shifted,
polygon_coordinates[1],
polygon_coordinates[2],
point_3_shifted,
]
else:
band_shift = abs(
get_bands_shift(
["B02", band_name],
satellite,
detector_number,
downsampling=downsampling,
)[0][0]
)
band_shift_01 = (
np.abs(band_shift * res_scale_factor)
/ granule_length
* (polygon_coordinates[1] - polygon_coordinates[0])
)
band_shift_32 = (
np.abs(band_shift * res_scale_factor)
/ granule_length
* (polygon_coordinates[2] - polygon_coordinates[3])
)
if not (detector_number % 2):
point_0_shifted = polygon_coordinates[0] + band_shift_01
point_1_shifted = (
polygon_coordinates[0] + band_shift_01 + p0_1_angle
)
point_2_shifted = (
polygon_coordinates[3] + band_shift_32 + p3_2_angle
)
point_3_shifted = polygon_coordinates[3] + band_shift_32
else:
point_0_shifted = polygon_coordinates[1] - band_shift_01
point_1_shifted = (
polygon_coordinates[1] - band_shift_01 - p0_1_angle
)
point_2_shifted = (
polygon_coordinates[2] - band_shift_32 - p3_2_angle
)
point_3_shifted = polygon_coordinates[2] - band_shift_32
shifted_points = [
point_0_shifted,
point_1_shifted,
point_2_shifted,
point_3_shifted,
]
shifted_points_list = [x.tolist() for x in shifted_points]
polygon = Polygon([list(x) for x in shifted_points_list]).convex_hull
if latlon_format:
band_shifted_list.append(
[
(x, y)
for (x, y) in zip(
polygon.exterior.coords.xy[0], polygon.exterior.coords.xy[1]
)
]
)
else:
band_shifted_list.append(
[
(y, x)
for (x, y) in zip(
polygon.exterior.coords.xy[0], polygon.exterior.coords.xy[1]
)
]
)
return dict(zip(band_names, band_shifted_list))
[docs] def export_to_tif(self, save_path):
"""Export to TIF file.
Args:
save_path (str): save path.
downsampling (bool, optional): If True, bands are downsampled. Defaults to True.
"""
coords = self.get_bands_coordinates(downsampling=True, latlon_format=False)
for band_name in list(self.__bands_dict.keys()):
band = self.__bands_dict[band_name]
export_band_to_tif(
band, coords[band_name], os.path.join(save_path, band_name + ".tif")
)
[docs] def get_raw_bbox(self, l1c_tif, bbox, mode="standard"):
"""Changing bbox coordinates from green system (L1C) to raw.
Args:
l1c_tif: tensor of the l1c image
bbox: (list): [x_top_left, y_top_left, x_bottom_right, y_bottom_right]
Ret:
list: corrected bbox list
"""
def get_points_from_bbox(bbox):
p1 = bbox[:2]
p2 = bbox[2:]
bottleft = (p2[0], p1[1])
topright = (p1[0], p2[1])
return (p1, topright, p2, bottleft)
def get_srcPoints(l1c_tif):
for k in range(l1c_tif.shape[1]):
y = l1c_tif[:, k, 0]
dy = torch.where(y != 0)[0]
if len(dy) != 0:
dy = dy[0]
break
for m in range(l1c_tif.shape[0]):
x = l1c_tif[m, :, 0]
dx = torch.where(x != 0)[0]
if len(dx) != 0:
dx = dx[0]
break
for k in range(l1c_tif.shape[1] - 1, 0, -1):
y_end = l1c_tif[:, k, 0]
dy_end = torch.where(y_end != 0)[0]
if len(dy_end) != 0:
dy_end = dy_end[0]
break
for q in range(l1c_tif.shape[0] - 1, 0, -1):
x_end = l1c_tif[q, :, 0]
dx_end = torch.where(x_end != 0)[0]
if len(dx_end) != 0:
dx_end = dx_end[0]
break
if mode == "standard":
return [np.array([dy, k]), np.array([m, dx]), np.array([q, dx_end])]
else:
return [np.array([m, dx]), np.array([dy_end, k]), np.array([dy, k])]
def prep_point(point):
arr = np.array([point[0], point[1], 1])
return arr
raw_shape = self.as_tensor(downsampling=True).shape
r_S1, c_S1, _ = raw_shape
dstTri = np.array([(r_S1, 0), (0, 0), (r_S1, c_S1)]).astype(np.float32)
srcTri = np.array(get_srcPoints(l1c_tif)).astype(np.float32)
bbox_points = get_points_from_bbox(bbox)
M = cv2.getAffineTransform(srcTri, dstTri)
transformed_points = []
for x in bbox_points:
transformed_points.append(M @ prep_point(np.array(x)))
return transformed_points