read_di_unfccc.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. __license__ = """
  2. Copyright 2020 Potsdam-Institut für Klimafolgenforschung e.V.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. """
  13. import itertools
  14. import logging
  15. import typing
  16. import pandas as pd
  17. import requests
  18. import treelib
  19. class UNFCCCApiReader:
  20. """Provides simplified unified access to the Flexible Query API of the UNFCCC data access for all parties.
  21. Essentially encapsulates https://di.unfccc.int/flex_non_annex1 and https://di.unfccc.int/flex_annex1 ."""
  22. def __init__(self, *, base_url="https://di.unfccc.int/api/"):
  23. self.annex_one_reader = UNFCCCSingleCategoryApiReader(
  24. party_category="annexOne", base_url=base_url
  25. )
  26. self.non_annex_one_reader = UNFCCCSingleCategoryApiReader(
  27. party_category="nonAnnexOne", base_url=base_url
  28. )
  29. self.parties = pd.concat(
  30. [self.annex_one_reader.parties, self.non_annex_one_reader.parties]
  31. ).sort_index()
  32. self.gases = pd.concat(
  33. [self.annex_one_reader.gases, self.non_annex_one_reader.gases]
  34. ).sort_index()
  35. self.gases = self.gases[
  36. ~self.gases.index.duplicated(keep="first")
  37. ] # drop duplicated gases
  38. def query(
  39. self,
  40. *,
  41. party_code: str,
  42. gases: typing.Union[typing.List[str], None] = None,
  43. progress: bool = False,
  44. ):
  45. """Query the UNFCCC for data.
  46. :param party_code: ISO codes of a party for which to query.
  47. For possible values, see .parties .
  48. :param gases: list of gases to query for. For possible values, see .gases .
  49. Default: query for all gases.
  50. :param progress: Display a progress bar. Requires tqdm.
  51. If you need more fine-grained control over which variables to query for, including restricting the query
  52. to specific measures, categories, or classifications or to query for multiple parties at once, please see the
  53. corresponding methods .annex_one_reader.query and .non_annex_one_reader.query .
  54. """
  55. if party_code in self.annex_one_reader.parties["code"].values:
  56. reader = self.annex_one_reader
  57. elif party_code in self.non_annex_one_reader.parties["code"].values:
  58. reader = self.non_annex_one_reader
  59. else:
  60. raise KeyError(party_code)
  61. return reader.query(party_codes=[party_code], gases=gases, progress=progress)
  62. class UNFCCCSingleCategoryApiReader:
  63. """Provides access to the Flexible Query API of the UNFCCC data access for a single category like nonAnnexOne.
  64. Essentially encapsulates https://di.unfccc.int/flex_non_annex1 or https://di.unfccc.int/flex_annex1 ."""
  65. def __init__(self, *, party_category: str, base_url="https://di.unfccc.int/api/"):
  66. """
  67. :param party_category: either 'nonAnnexOne' or 'annexOne'.
  68. :param base_url: URL where the API is accessible (default: https://di.unfccc.int/api/).
  69. """
  70. self.base_url = base_url
  71. parties_raw = self._get(f"parties/{party_category}")
  72. parties_entries = []
  73. for entry in parties_raw:
  74. if entry["categoryCode"] == party_category and entry["name"] != "Groups":
  75. parties_entries.append(entry["parties"])
  76. if not parties_entries:
  77. raise ValueError(
  78. f"Could not find parties for the party_category {party_category!r}."
  79. )
  80. self.parties = (
  81. pd.DataFrame(itertools.chain(*parties_entries))
  82. .set_index("id")
  83. .sort_index()
  84. .drop_duplicates()
  85. )
  86. self._parties_dict = dict(self.parties["code"])
  87. self.years = (
  88. pd.DataFrame(self._get("years/single")[party_category])
  89. .set_index("id")
  90. .sort_index()
  91. )
  92. self._years_dict = dict(self.years["name"])
  93. for i in self._years_dict:
  94. if self._years_dict[i].startswith("Last Inventory Year"):
  95. self._years_dict[i] = self._years_dict[i][-5:-1]
  96. # note that category names are not unique!
  97. category_hierarchy = self._get("dimension-instances/category")[party_category][
  98. 0
  99. ]
  100. self.category_tree = self._walk(category_hierarchy)
  101. self.classifications = (
  102. pd.DataFrame(
  103. self._get("dimension-instances/classification")[party_category]
  104. )
  105. .set_index("id")
  106. .sort_index()
  107. )
  108. self._classifications_dict = dict(self.classifications["name"])
  109. measure_hierarchy = self._get("dimension-instances/measure")[party_category]
  110. self.measure_tree = treelib.Tree()
  111. sr = self.measure_tree.create_node("__root__")
  112. for i in range(len(measure_hierarchy)):
  113. self._walk(measure_hierarchy[i], tree=self.measure_tree, parent=sr)
  114. self.gases = (
  115. pd.DataFrame(self._get("dimension-instances/gas")[party_category])
  116. .set_index("id")
  117. .sort_index()
  118. )
  119. self._gases_dict = dict(self.gases["name"])
  120. unit_info = self._get("conversion/fq")
  121. self.units = pd.DataFrame(unit_info["units"]).set_index("id").sort_index()
  122. self._units_dict = dict(self.units["name"])
  123. self.conversion_factors = pd.DataFrame(unit_info[party_category])
  124. # variable IDs are not unique, because category names are not unique
  125. # just give up and delete the duplicated ones
  126. variables_raw = self._get(f"variables/fq/{party_category}")
  127. self.variables = (
  128. pd.DataFrame(variables_raw).set_index("variableId").sort_index()
  129. )
  130. self.variables = self.variables[~self.variables.index.duplicated(keep="first")]
  131. self._variables_dict = {x["variableId"]: x for x in variables_raw}
  132. def _flexible_query(
  133. self,
  134. *,
  135. variable_ids: typing.List[int],
  136. party_ids: typing.List[int],
  137. year_ids: typing.List[int],
  138. ) -> typing.List[dict]:
  139. if len(variable_ids) > 3000:
  140. logging.warning(
  141. "Your query parameters lead to a lot of variables selected at once. "
  142. "If the query fails, try restricting your query more."
  143. )
  144. return self._post(
  145. "records/flexible-queries",
  146. json={
  147. "variableIds": variable_ids,
  148. "partyIds": party_ids,
  149. "yearIds": year_ids,
  150. },
  151. )
  152. def query(
  153. self,
  154. *,
  155. party_codes: typing.List[str],
  156. category_ids: typing.Union[None, typing.List[int]] = None,
  157. classifications: typing.Union[None, typing.List[str]] = None,
  158. measure_ids: typing.Union[None, typing.List[int]] = None,
  159. gases: typing.Union[None, typing.List[str]] = None,
  160. batch_size: int = 1000,
  161. progress: bool = False,
  162. ) -> pd.DataFrame:
  163. """Query the UNFCCC for data.
  164. :param party_codes: list of ISO codes of the parties to query.
  165. For possible values, see .parties .
  166. :param category_ids: list of category IDs to query. For possible values, see .show_category_hierarchy().
  167. Default: query for all categories.
  168. :param classifications: list of classifications to query. For possible values, see .classifications .
  169. Default: query for all classifications.
  170. :param measure_ids: list of measure IDs to query. For possible values, see .show_measure_hierarchy().
  171. Default: query for all measures.
  172. :param gases: list of gases to query. For possible values, see .gases .
  173. Default: query for all gases.
  174. :param batch_size: number of variables to query in a single API query in the same batch to avoid internal
  175. server errors. Larger queries are split automatically.
  176. :param progress: show a progress bar. Requires tqdm.
  177. """
  178. party_ids = []
  179. for code in party_codes:
  180. party_ids.append(self._name_id(self.parties, code, key="code"))
  181. # always query all years
  182. year_ids = list(self.years.index)
  183. variable_ids = self._select_variable_ids(
  184. classifications, category_ids, measure_ids, gases
  185. )
  186. i = 0
  187. raw_response = []
  188. if progress:
  189. import tqdm
  190. pbar = tqdm.tqdm(total=len(variable_ids))
  191. while i < len(variable_ids):
  192. batched_variable_ids = variable_ids[i : i + batch_size]
  193. i += batch_size
  194. batched_response = self._flexible_query(
  195. variable_ids=batched_variable_ids,
  196. party_ids=party_ids,
  197. year_ids=year_ids,
  198. )
  199. raw_response += batched_response
  200. if progress:
  201. pbar.update(len(batched_variable_ids))
  202. if progress:
  203. pbar.close()
  204. return self._parse_raw_answer(raw_response)
  205. def _parse_raw_answer(self, raw: typing.List[dict]) -> pd.DataFrame:
  206. data = []
  207. for dp in raw:
  208. variable = self._variables_dict[dp["variableId"]]
  209. try:
  210. category = self.category_tree[variable["categoryId"]].tag
  211. except treelib.tree.NodeIDAbsentError:
  212. category = f'unknown category nr. {variable["categoryId"]}'
  213. row = {
  214. "party": self._parties_dict[dp["partyId"]],
  215. "category": category,
  216. "classification": self._classifications_dict[
  217. variable["classificationId"]
  218. ],
  219. "measure": self.measure_tree[variable["measureId"]].tag,
  220. "gas": self._gases_dict[variable["gasId"]],
  221. "unit": self._units_dict[variable["unitId"]],
  222. "year": self._years_dict[dp["yearId"]],
  223. "numberValue": dp["numberValue"],
  224. "stringValue": dp["stringValue"],
  225. }
  226. data.append(row)
  227. df = pd.DataFrame(data)
  228. df.sort_values(
  229. ["party", "category", "classification", "measure", "gas", "unit", "year"],
  230. inplace=True,
  231. )
  232. df.drop_duplicates(inplace=True)
  233. df.reset_index(inplace=True, drop=True)
  234. return df
  235. def _select_variable_ids(
  236. self, classifications, category_ids, measure_ids, gases
  237. ) -> typing.List[int]:
  238. # select variables from classification
  239. if classifications is None:
  240. classification_mask = pd.Series(
  241. data=[True] * len(self.variables), index=self.variables.index
  242. )
  243. else:
  244. classification_mask = pd.Series(
  245. data=[False] * len(self.variables), index=self.variables.index
  246. )
  247. for classification in classifications:
  248. cid = self._name_id(self.classifications, classification)
  249. classification_mask[self.variables["classificationId"] == cid] = True
  250. # select variables from categories
  251. if category_ids is None:
  252. category_mask = pd.Series(
  253. data=[True] * len(self.variables), index=self.variables.index
  254. )
  255. else:
  256. category_mask = pd.Series(
  257. data=[False] * len(self.variables), index=self.variables.index
  258. )
  259. for cid in category_ids:
  260. category_mask[self.variables["categoryId"] == cid] = True
  261. # select variables from measures
  262. if measure_ids is None:
  263. measure_mask = pd.Series(
  264. data=[True] * len(self.variables), index=self.variables.index
  265. )
  266. else:
  267. measure_mask = pd.Series(
  268. data=[False] * len(self.variables), index=self.variables.index
  269. )
  270. for mid in measure_ids:
  271. measure_mask[self.variables["measureId"] == mid] = True
  272. # select variables from gases
  273. if gases is None:
  274. gas_mask = pd.Series(
  275. data=[True] * len(self.variables), index=self.variables.index
  276. )
  277. else:
  278. gas_mask = pd.Series(
  279. data=[False] * len(self.variables), index=self.variables.index
  280. )
  281. for gas in gases:
  282. gid = self._name_id(self.gases, gas)
  283. gas_mask[self.variables["gasId"] == gid] = True
  284. selected_variables = self.variables[
  285. classification_mask & category_mask & measure_mask & gas_mask
  286. ]
  287. return [int(x) for x in selected_variables.index]
  288. @staticmethod
  289. def _name_id(df, name, key="name"):
  290. try:
  291. return int(df[df[key] == name].index[0])
  292. except IndexError:
  293. raise KeyError(name)
  294. def show_category_hierarchy(self):
  295. return self.category_tree.show(idhidden=False)
  296. def show_measure_hierarchy(self):
  297. return self.measure_tree.show(idhidden=False)
  298. @classmethod
  299. def _walk(cls, node: dict, tree: treelib.Tree = None, parent=None) -> treelib.Tree:
  300. if tree is None:
  301. tree = treelib.Tree()
  302. tree.create_node(tag=node["name"], identifier=node["id"], parent=parent)
  303. if "children" in node:
  304. for child in node["children"]:
  305. cls._walk(child, tree=tree, parent=node["id"])
  306. return tree
  307. def _get(self, component: str) -> typing.Union[dict, list]:
  308. resp = requests.get(self.base_url + component)
  309. resp.raise_for_status()
  310. return resp.json()
  311. def _post(self, component: str, json: dict) -> typing.List[dict]:
  312. resp = requests.post(self.base_url + component, json=json)
  313. resp.raise_for_status()
  314. return resp.json()
  315. def _smoketest_non_annex_one():
  316. r = UNFCCCSingleCategoryApiReader(party_category="nonAnnexOne")
  317. ans = r.query(party_codes=["MMR"])
  318. def _smoketest_annex_one():
  319. r = UNFCCCSingleCategoryApiReader(party_category="annexOne")
  320. ans = r.query(party_codes=["DEU"], gases=["N₂O"])
  321. def _smoketest_unified():
  322. r = UNFCCCApiReader()
  323. ans = r.query(party_code="AFG")
  324. if __name__ == "__main__":
  325. _smoketest_annex_one()