hdx.scraper.configurable.scraper
ConfigurableScraper Objects
class ConfigurableScraper(BaseScraper)
Each configurable scraper is configured from dataset information that can come from a YAML file for example. When run, it works out headers and values. It also overrides add_sources where sources are compiled and returned. If dealing with subnational data, adminlevel must be supplied.
Arguments:
name
str - Name of scraperdatasetinfo
Dict - Information about datasetlevel
str - Can be national, subnational or singlecountryiso3s
List[str] - List of ISO3 country codes to processadminlevel
Optional[AdminLevel] - AdminLevel object from HDX Python Country. Defaults to None.level_name
Optional[str] - Customised level_name name. Defaults to None (level).source_configuration
Dict - Configuration for sources. Defaults to empty dict (use defaults).today
datetime - Value to use for today. Defaults to now_utc().errors_on_exit
Optional[ErrorsOnExit] - ErrorsOnExit object that logs errors on exit**kwargs
- Variables to use when evaluating template arguments in urls
get_subsets_from_datasetinfo
@staticmethod
def get_subsets_from_datasetinfo(datasetinfo: Dict) -> List[Dict]
Get subsets from dataset information
Arguments:
datasetinfo
Dict - Information about dataset
Returns:
List[Dict]
- List of subsets
get_iterator
def get_iterator() -> Tuple[List[str], Iterator[Dict]]
Get the iterator from the preconfigured reader for this scraper
Returns:
Tuple[List[str],Iterator[Dict]]
- Tuple (headers, iterator where each row is a dictionary)
add_sources
def add_sources() -> None
Add source for each HXL hashtag
Returns:
None
read_hxl
def read_hxl(iterator: Iterator[Dict]) -> Optional[Dict[str, str]]
Read HXL tags if use_hxl is True and return the mapping as a dictionary. If use_hxl if False, return None.
Arguments:
iterator
Iterator[Dict] - Iterator where each row is a dictionary
Returns:
Optional[Dict[str, str]]: Dictionary mapping from headers to HXL hash tags or None
use_hxl
def use_hxl(headers, file_headers: List[str],
iterator: Iterator[Dict]) -> Optional[Dict]
If the configurable scraper configuration defines that HXL is used (use_hxl
is True), then read the mapping from headers to HXL hash tags. Since each row is
a dictionary from header to value, the HXL row will be a dictionary from header
to HXL hashtag. Label country
and adm1
columns as admin columns. If the
input columns to use are not specified, use all that have HXL hashtags. If the
output column headers or hashtags are not specified, use the ones from the
original file.
Arguments:
file_headers
List[str] - List of all headers of input fileiterator
Iterator[Dict] - Iterator over the rows
Returns:
Optional[Dict]
- Dictionary that maps from header to HXL hashtag or None
run_scraper
def run_scraper(iterator: Iterator[Dict]) -> None
Run one configurable scraper given an iterator over the rows
Arguments:
iterator
Iterator[Dict] - Iterator over the rows
Returns:
None
run
def run() -> None
Runs one configurable scraper given dataset information
Returns:
None
hdx.scraper.configurable.rowparser
RowParser Objects
class RowParser()
RowParser class for parsing each row.
Arguments:
name
str - Name of scrapercountryiso3s
List[str] - List of ISO3 country codes to processadminlevel
Optional[AdminLevel] - AdminLevel object from HDX Python Country librarylevel
str - Can be national, subnational or singledatelevel
str - Can be global, regional, national, subnationaltoday
datetime - Date todaydatasetinfo
Dict - Dictionary of information about datasetheaders
List[str] - Row headersheader_to_hxltag
Optional[Dict[str, str]] - Mapping from headers to HXL hashtags or Nonesubsets
List[Dict] - List of subset definitionsmaxdateonly
bool - Whether to only take the most recent date. Defaults to True.
read_external_filter
def read_external_filter(external_filter: Optional[Dict]) -> None
Read filter list from external url pointing to a HXLated file
Arguments:
external_filter
Optional[Dict] - External filter information in dictionary
Returns:
None
get_filter_str_for_eval
def get_filter_str_for_eval(filter: str) -> str
Replace filter string variables with columns in row of data
Arguments:
filter
str - Filter string
Returns:
str
- Filter string with variables replaced
filter_sort_rows
def filter_sort_rows(iterator: Iterator[Dict]) -> Iterator[Dict]
Apply prefilter and sort the input data before processing. If date_col is specified along with any of sum or process, and sorting is not specified, then apply a sort by date to ensure correct results.
Arguments:
iterator
Iterator[Dict] - Input data
Returns:
Iterator[Dict]
- Input data with prefilter applied if specified and sorted if specified or deemed necessary
flatten
def flatten(row: Dict) -> Generator[Dict, None, None]
Flatten a wide spreadsheet format into a long one
Arguments:
row
Dict - Row to flatten
Returns:
Generator[Dict]
- Flattened row(s)
get_maxdate
def get_maxdate() -> datetime
Get the most recent date of the rows so far
Returns:
datetime
- Most recent date in processed rows
filtered
def filtered(row: Dict) -> bool
Check if the row should be filtered out
Arguments:
row
Dict - Row to check for filters
Returns:
bool
- Whether row is filtered out or not
parse
def parse(row: Dict) -> Tuple[Optional[str], Optional[List[bool]]]
Parse row checking for valid admin information and if the row should be filtered out in each subset given its definition.
Arguments:
row
Dict - Row to parse
Returns:
Tuple[Optional[str], Optional[List[bool]]]: (admin name, should process subset list) or (None, None)
hdx.scraper.configurable.resource_downloader
ResourceDownloader Objects
class ResourceDownloader(BaseScraper)
Each resource downloader is configured from dataset information that can come from a YAML file for example. When run it downloads the resource described in the dataset information from HDX and puts it in the given folder.
Arguments:
datasetinfo
Dict - Information about datasetfolder
str - Folder to which to download. Defaults to "".
run
def run() -> None
Runs one resource downloader given dataset information
Returns:
None
add_sources
def add_sources() -> None
Add source for resource download
Returns:
None
hdx.scraper.configurable.timeseries
TimeSeries Objects
class TimeSeries(BaseScraper)
Each time series scraper is configured from dataset information that can come from a YAML file for example. When run, it populates the given outputs with time series data. It also overrides add_sources where sources are compiled and returned.
Arguments:
name
str - Name of scraperdatasetinfo
Dict - Information about datasetoutputs
Dict[str, BaseOutput] - Mapping from names to output objectstoday
datetime - Value to use for today. Defaults to now_utc().
run
def run() -> None
Runs one time series scraper given dataset information and outputs to whatever outputs were specified in the constructor
Returns:
None
add_sources
def add_sources() -> None
Add source for each HXL hashtag
Returns:
None
hdx.scraper.configurable.aggregator
Aggregator Objects
class Aggregator(BaseScraper)
Each aggregator is configured from dataset information that can come from a YAML file for example. When run, it works out headers and aggregated values. The mapping from input admins to aggregated output admins adm_aggregation is of form: {"AFG": ("ROAP",), "MMR": ("ROAP",)}. If the mapping is to the top level, then it is a list of input admins like: ("AFG", "MMR"). The input_values are a mapping from headers (if use_hxl is False) or HXL tags (if use_hxl is True) to column values expressed as a dictionary mapping from input admin to value. If any formulae require the result of other aggregators, these can be passed in using the aggregation_scrapers parameter.
Arguments:
name
str - Name of aggregatordatasetinfo
Dict - Information about datasetadm_aggregation
Union[Dict, ListTuple] - Mapping from input admins to aggregated output adminsheaders
Dict[str, Tuple] - Column headers and HXL hashtagsuse_hxl
bool - Whether to map from headers or from HXL tagssource_configuration
Dict - Configuration for sources. Defaults to empty dict (use defaults).aggregation_scrapers
List["Aggregator"] - Other aggregations needed. Defaults to [].
get_scraper
@classmethod
def get_scraper(
cls,
use_hxl: bool,
header_or_hxltag: str,
datasetinfo: Dict,
input_level: str,
output_level: str,
adm_aggregation: Union[Dict, ListTuple],
input_headers: Tuple[ListTuple, ListTuple],
source_configuration: Dict = {},
aggregation_scrapers: List["Aggregator"] = []
) -> Optional["Aggregator"]
Gets one aggregator given dataset information and returns headers, values and sources. The mapping from input admins to aggregated output admins adm_aggregation is of form: {"AFG": ("ROAP",), "MMR": ("ROAP",)}. If the mapping is to the top level, then it is a list of input admins like: ("AFG", "MMR"). The input_values are a mapping from headers or HXL tags to column values expressed as a dictionary mapping from input admin to value.
Arguments:
use_hxl
bool - Whether to map from headers or from HXL tagsheader_or_hxltag
str - Column header or HXL hashtag depending on use_hxldatasetinfo
Dict - Information about datasetinput_level
str - Input level to aggregate like national or subnationaloutput_level
str - Output level of aggregated data like regionaladm_aggregation
Union[Dict, ListTuple] - Mapping from input admins to aggregated output adminsinput_headers
Tuple[ListTuple, ListTuple] - Column headers and HXL hashtagsrunner(Runner)
- Runner objectsource_configuration
Dict - Configuration for sources. Defaults to empty dict (use defaults).aggregation_scrapers
List["Aggregator"] - Other aggregations needed. Defaults to [].
Returns:
Optional["Aggregator"]
- The aggregation scraper or None if it couldn't be created
get_float_or_int
@staticmethod
def get_float_or_int(valuestr: str) -> Union[float, int, None]
Convert value string to float, int or None
Arguments:
valuestr
str - Value string
Returns:
Union[float, int, None]: Converted value
get_numeric
@classmethod
def get_numeric(cls, valueinput: Any) -> Union[str, float, int]
Convert value input to float or int. Values in pipe separated strings are summed. If any values in a pipe separated string are empty, an empty string is returned.
Arguments:
valueinput
Any - Value string
Returns:
Union[str, float, int]: Converted value
process
def process(output_level: str, output_values: Dict) -> None
Perform aggregation putting results in output_values
Arguments:
output_level
str - Output level of aggregated data like regionaloutput_values
Dict - Mapping from admin name to value
Returns:
None
run
def run() -> None
Runs one aggregator given dataset information
Returns:
None
add_sources
def add_sources() -> None
There is no need to add any sources since the disaggregated values should already have sources
Returns:
None