# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 sat-bucket developers
#
# This file is part of sat-bucket.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module implements tools for dataframe filtering."""
import datetime
import numpy as np
import polars as pl
import pyproj
from satbucket.checks import check_filepaths, check_start_end_time, get_current_utc_time
from satbucket.dataframe import (
df_add_column,
df_get_column,
df_select_valid_rows,
)
from satbucket.info import get_info_from_filepath
[docs]
def get_geodesic_distance_from_point(lons, lats, lon, lat):
lons = np.asanyarray(lons)
lats = np.asanyarray(lats)
geod = pyproj.Geod(ellps="WGS84")
_, _, distance = geod.inv(lons, lats, np.ones(lons.shape) * lon, np.ones(lats.shape) * lat, radians=False)
return distance
[docs]
def filter_around_point(df, lon, lat, distance):
# https://stackoverflow.com/questions/76262681/i-need-to-create-a-column-with-the-distance-between-two-coordinates-in-polars
# Retrieve coordinates
lons = df_get_column(df, column="lon")
lats = df_get_column(df, column="lat")
# Compute geodesic distance
distances = get_geodesic_distance_from_point(lons=lons, lats=lats, lon=lon, lat=lat)
valid_indices = distances <= distance
# Add distance
df = df_add_column(df, column="distance", values=distances)
# Select only valid rows
df = df_select_valid_rows(df, valid_rows=valid_indices)
return df
[docs]
def filter_by_extent(df, extent, x="lon", y="lat"):
if isinstance(df, (pl.DataFrame, pl.LazyFrame)):
df = df.filter(
pl.col(x) >= extent[0],
pl.col(x) <= extent[1],
pl.col(y) >= extent[2],
pl.col(y) <= extent[3],
)
else: # pandas
idx_valid = (df[x] >= extent[0]) & (df[x] <= extent[1]) & (df[y] >= extent[2]) & (df[y] <= extent[3])
df = df.loc[idx_valid]
return df
[docs]
def apply_spatial_filters(df, filters=None):
if filters is None:
filters = {}
if "extent" in filters:
df = filter_by_extent(df, extent=filters["extent"], x="lon", y="lat")
if "point_radius" in filters:
lon, lat, distance = filters["point_radius"]
df = filter_around_point(df, lon=lon, lat=lat, distance=distance)
return df
[docs]
def is_within_time_period(l_start_time, l_end_time, start_time, end_time):
"""Assess which files are within the start and end time."""
# - Case 1
# s e
# | |
# ---------> (-------->)
idx_select1 = np.logical_and(l_start_time <= start_time, l_end_time > start_time)
# - Case 2
# s e
# | |
# ---------(-.)
idx_select2 = np.logical_and(l_start_time >= start_time, l_end_time <= end_time)
# - Case 3
# s e
# | |
# -------------
idx_select3 = np.logical_and(l_start_time < end_time, l_end_time > end_time)
# - Get idx where one of the cases occur
idx_select = np.logical_or.reduce([idx_select1, idx_select2, idx_select3])
return idx_select
[docs]
def is_granule_within_time(start_time, end_time, file_start_time, file_end_time):
"""Check if a granule is within start_time and end_time."""
# - Case 1
# s e
# | |
# ---------> (-------->)
is_case1 = file_start_time <= start_time and file_end_time > start_time
# - Case 2
# s e
# | |
# --------
is_case2 = file_start_time >= start_time and file_end_time < end_time
# - Case 3
# s e
# | |
# ------------->
is_case3 = file_start_time < end_time and file_end_time > end_time
# - Check if one of the conditions occurs
return is_case1 or is_case2 or is_case3
def _filter_filepath(filepath, filename_pattern, start_time=None, end_time=None):
"""Check if a single filepath pass the filtering parameters.
If do not match the filtering criteria, it returns ``None``.
Parameters
----------
filepath : str
Filepath string.
filename_pattern: int
Filename pattern for extraction of time information.
start_time : datetime.datetime
Start time
The default is ``None``.
end_time : datetime.datetime
End time.
The default is ``None``.
Returns
-------
filepaths : list
Returns the filepaths subset.
If no valid filepaths, return an empty list.
"""
try:
info_dict = get_info_from_filepath(filepath, filename_pattern)
except ValueError:
return None
# Filter by start_time and end_time
if start_time is not None and end_time is not None:
file_start_time = info_dict["start_time"]
file_end_time = info_dict["end_time"]
if not is_granule_within_time(start_time, end_time, file_start_time, file_end_time):
return None
return filepath
[docs]
def filter_filepaths(
filepaths,
filename_pattern,
start_time=None,
end_time=None,
):
"""Filter the Satellite filepaths based on specific parameters.
Parameters
----------
filepaths : list
List of filepaths.
filename_pattern: int
Filename pattern for extraction of time information.
start_time : datetime.datetime
Start time
The default is ``None``.
end_time : datetime.datetime
End time.
The default is ``None``.
Returns
-------
filepaths : list
Returns the filepaths subset.
If no valid filepaths, return an empty list.
"""
# Check filepaths
if isinstance(filepaths, type(None)):
return []
filepaths = check_filepaths(filepaths)
if len(filepaths) == 0:
return []
# Check start_time and end_time
if start_time is not None or end_time is not None:
if start_time is None:
start_time = datetime.datetime(1998, 1, 1, 0, 0, 0) # Satellite start mission
if end_time is None:
end_time = get_current_utc_time() # Current time
start_time, end_time = check_start_end_time(start_time, end_time)
# Filter filepaths
filepaths = [
_filter_filepath(
filepath,
start_time=start_time,
end_time=end_time,
filename_pattern=filename_pattern,
)
for filepath in filepaths
]
# Remove None from the list
return [filepath for filepath in filepaths if filepath is not None]