Browse Source

Add total production to bulk processing

stephenjboyle 7 months ago
parent
commit
08615d2a92

+ 31 - 23
dodo.py

@@ -58,6 +58,12 @@ DATA_FILES = [
     ("prodcom_list", 'data/PRD_2016_20200617_185122.csv'),
 ]
 
+DATA_BULK = [
+    ("prodcom_bulk_sold" , 'bulk_data/sold_production', 'outputs/sold_production'),
+    ("prodcom_bulk_total", 'bulk_data/total_production', 'outputs/total_production'),
+]
+
+
 def task_convert_data():
     """Reads CSV files, runs all the rules, and converts all of them into RDF."""
     for data_type, csv_file in DATA_FILES:
@@ -86,28 +92,30 @@ def task_convert_data():
 
 
 def task_convert_bulk():
-    bulk_data_path = Path("bulk_data")
-    bulk_data_files = bulk_data_path.glob("*.csv")
-    for csv_file in bulk_data_files:
-        target_file = Path("outputs") / (csv_file.stem + ".nt.gz")
-        yield {
-            'name': csv_file.stem,
-            'file_dep': [
-                csv_file,
-                f"scripts/load_data_prodcom_new.rdfox",
-                f"scripts/map_prodcom_new.dlog",
-            ],
-            'targets': [
-                target_file
-            ],
-            'actions': [
-                [
-                    "python",
-                    "scripts/convert_data.py",
-                    "prodcom_new",
+    for data_type, csv_dir, output_dir in DATA_BULK:
+        bulk_data_path = Path(csv_dir)
+        bulk_data_files = bulk_data_path.glob("*.csv")
+        for csv_file in bulk_data_files:
+            target_file = Path(output_dir) / (csv_file.stem + ".nt.gz")
+            yield {
+                'name': csv_file.stem,
+                'file_dep': [
                     csv_file,
-                    target_file,
-                ]
-            ],
-        }
+                    "scripts/load_data_prodcom_bulk.rdfox",
+                    f"scripts/map_{data_type}.dlog",
+                ],
+                'targets': [
+                target_file
+                ],
+                'actions': [
+                    [
+                        "python",
+                        "scripts/convert_data.py",
+                        data_type,
+                        csv_file,
+                        target_file,
+                    ]
+                ],
+           }
+    
     

+ 5 - 3
scripts/convert_data.py

@@ -40,7 +40,10 @@ def convert_data_prodcom(data_type, data_csv, output_path):
     #
     # Assume that there is a file named e.g. "PRODCOM2016DATA_defs.dlog" with
     # definitions of the prefixes and time period and region to use.
-    load_script = CODE_DIR / f"load_data_{data_type}.rdfox"
+    if data_type in ("prodcom_bulk_sold", "prodcom_bulk_total"):
+        load_script = CODE_DIR / "load_data_prodcom_bulk.rdfox"
+    else:
+        load_script = CODE_DIR / f"load_data_{data_type}.rdfox"
     load_units = CODE_DIR / "load_data_units.rdfox"
     load_geonames = CODE_DIR / "load_data_geonames.rdfox"
     map_file = f"map_{data_type}.dlog"
@@ -53,9 +56,8 @@ def convert_data_prodcom(data_type, data_csv, output_path):
         "metrics_units.csv": CODE_DIR / "metrics_units.csv",
 #        "probs-prodcom.ttl" : ONTOLOGY_DIR / "probs-prodcom.ttl",
         map_file : CODE_DIR / map_file,
-#        defs_file : data_csv.parent / defs_file,
     }
-    if data_type in ("prodcom", "prodcom_new"):
+    if data_type in ("prodcom", "prodcom_bulk_sold", "prodcom_bulk_total"):
        load_data.append(load_units)
        input_files[units_file] = CODE_DIR / units_file
     if data_type in ("prodcom", "prodcom_list", "prodcom_correspondence"):

scripts/load_data_prodcom_new.rdfox → scripts/load_data_prodcom_bulk.rdfox


scripts/map_prodcom_new.dlog → scripts/map_prodcom_bulk_sold.dlog


+ 68 - 0
scripts/map_prodcom_bulk_total.dlog

@@ -0,0 +1,68 @@
+# Basic info from "PRODQNT"
+
+:DirectObservation[?ID] ,
+[?ID, :objectDirectlyDefinedBy, ?Object] ,
+[?ID, :hasTimePeriod, ?TimePeriod] ,
+[?ID, :hasRole, :TotalProduction] ,
+[?ID, :partOfDataset, ufpc:PRODCOM] ,
+[?ID, :bound, :ExactBound]
+        :-
+        ufrd:PRODCOM_DATA_NEW(?Decl, ?Year, ?PRCCODE, ?Indicators, ?ObsValue, ?ObsFlag),
+        FILTER(?Indicators = "PRODQNT" || (?Indicators = "PQNTFLAG" && ?ObsValue = ":C")),
+        BIND(IRI(CONCAT(STR(ufpcd:), ?Year, "/Observation-", SHA256(CONCAT(?Decl, ?Year, ?PRCCODE, "PRODQNT")))) AS ?ID),
+        BIND(IRI(CONCAT(STR(ufpc2017:), "Object-", ?PRCCODE)) AS ?Object) ,
+        BIND(IRI(CONCAT(STR(:), "TimePeriod_YearOf", ?Year)) AS ?TimePeriod) .
+
+
+[?ID, :hasRegion, ?Region]
+       :-
+       ufrd:PRODCOM_DATA_NEW(?Decl, ?Year, ?PRCCODE, ?Indicators, ?ObsValue, ?ObsFlag),
+       ufrd:GEONAMES(?Decl, ?RegionID) ,
+       FILTER(?RegionID != "0"),
+       BIND(IRI(CONCAT(STR(ufpcd:), ?Year, "/Observation-", SHA256(CONCAT(?Decl, ?Year, ?PRCCODE, ?Indicators)))) AS ?ID),
+       BIND(IRI(CONCAT(STR(gnd:), STR(?RegionID))) AS ?Region) .
+
+[?ID, :hasRegion, ?Region]
+       :-
+       ufrd:PRODCOM_DATA_NEW(?Decl, ?Year, ?PRCCODE, ?Indicators, ?ObsValue, ?ObsFlag),
+       ufrd:GEONAMES(?Decl, ?RegionID) ,
+       FILTER(?RegionID = "0"),
+       BIND(IRI(CONCAT(STR(ufpcd:), ?Year, "/Observation-", SHA256(CONCAT(?Decl, ?Year, ?PRCCODE, ?Indicators)))) AS ?ID),
+       BIND(IRI(CONCAT(STR(ufpc:), "Region-", STR(?Decl))) AS ?Region) .
+
+
+# Additional info from other rows about units and flags
+
+ufu:NG(?ID, ufu:measurementUnit, ?Measurement)
+        :-
+        ufrd:PRODCOM_DATA_NEW(?Decl, ?Year, ?PRCCODE, ?Indicators, ?ObsValue, ?ObsFlag),
+        FILTER(?Indicators = "PRODQNT"),
+        BIND(IRI(CONCAT(STR(ufpcd:), ?Year, "/Observation-", SHA256(CONCAT(?Decl, ?Year, ?PRCCODE, ?Indicators)))) AS ?ID) ,
+        :DirectObservation[?ID] ,
+
+        # Don't use the measurement value when there is a "confidential" flag,
+        # this means that a zero value really means "missing"
+        NOT [?ID, :measurementFlag, ":C"],
+
+        BIND(xsd:decimal(?ObsValue) AS ?Measurement).
+
+
+ufu:NG(?ID, ufu:unit, ?UnitID)
+        :-
+        ufrd:PRODCOM_DATA_NEW(?Decl, ?Year, ?PRCCODE, ?Indicators, ?ObsValue, ?ObsFlag),
+        FILTER(?Indicators = "QNTUNIT"),
+        BIND(IRI(CONCAT(STR(ufpcd:), ?Year, "/Observation-", SHA256(CONCAT(?Decl, ?Year, ?PRCCODE, "PRODQNT")))) AS ?ID) ,
+        :DirectObservation[?ID] ,
+        BIND(IRI(CONCAT(STR(:), "Unit-", SHA256(?ObsValue))) AS ?UnitID) .
+
+
+[?ID, :measurementFlag, ?ObsValue]
+        :-
+        ufrd:PRODCOM_DATA_NEW(?Decl, ?Year, ?PRCCODE, ?Indicators, ?ObsValue, ?ObsFlag),
+        FILTER(?Indicators = "PQNTFLAG"),
+        BIND(IRI(CONCAT(STR(ufpcd:), ?Year, "/Observation-", SHA256(CONCAT(?Decl, ?Year, ?PRCCODE, "PRODQNT")))) AS ?ID) ,
+        :DirectObservation[?ID] .
+
+
+# EU prefix: "http://data.europa.eu/qw1/prodcom2021/"
+

+ 38 - 20
scripts/split_by_country_year.py

@@ -17,26 +17,44 @@ def parse_arguments():
 
 
 def split_bulk_csv(csvfile, output_path):
-    for year in range(1995,2023):
-        output_name = Path(csvfile).stem 
-        with open(csvfile) as fin:    
-            csvin = csv.DictReader(fin)
-            # Category -> open file lookup
-            outputs = {}
-            for row in csvin:
-                if row['TIME_PERIOD'] == str(year):
-                    cat = row['decl']
-                    # if necessary open a new file and write the header
-                    if cat not in outputs:
-                        fout = open(output_path / '{}_{}-{}.csv'.format(output_name, cat, year), 'w')
-                        dw = csv.DictWriter(fout, fieldnames=csvin.fieldnames)
-                        dw.writeheader()
-                        outputs[cat] = fout, dw
-                    # write the row
-                    outputs[cat][1].writerow(row)
-            # close all the files
-            for fout, _ in outputs.values():
-                fout.close()
+    # Category -> open file lookup
+    outputs = {}
+    output_name = Path(csvfile).stem
+    last_decl = None
+    fixed_values = None
+    fixed_value_columns = ["DATAFLOW", "LAST UPDATE", "freq"]
+    with open(csvfile) as fin:
+        csvin = csv.DictReader(fin)
+        assert csvin.fieldnames is not None
+        # These are always the same so don't bother keeping
+        assert csvin.fieldnames[:3] == fixed_value_columns
+        var_columns = csvin.fieldnames[3:]
+        for row in csvin:
+            decl = row['decl']
+            year = row['TIME_PERIOD']
+
+            if fixed_values is None:
+                fixed_values = {x: row[x] for x in fixed_value_columns}
+            else:
+                if any(row[x] != fixed_values[x] for x in fixed_value_columns):
+                    raise ValueError("Unexpected 'fixed' value: %s", row)
+
+            if decl != last_decl:
+                # close all the files to prevent too many open files
+                for fout, _ in outputs.values():
+                    fout.close()
+                print("done", decl)
+            last_decl = decl
+
+            # if necessary open a new file and write the header
+            if (decl, year) not in outputs:
+                fout = open(output_path / '{}_{}-{}.csv'.format(output_name, decl, year), 'w')
+                dw = csv.DictWriter(fout, fieldnames=var_columns)
+                dw.writeheader()
+                outputs[decl, year] = fout, dw
+
+            # write the row
+            outputs[decl, year][1].writerow({k: row[k] for k in var_columns})
 
 
 if __name__ == "__main__":

+ 51 - 5
tests/test_prodcom_bulk.py

@@ -34,7 +34,7 @@ def obj_id(obj_code):
 class TestProdcom2016Data:
     """Test PRODCOM2016 DATA conversion"""
 
-    endpoint_data = ["outputs/DS-056120_006-2016.nt.gz"]
+    endpoint_data = ["outputs/sold_production/DS-056120_006-2016.nt.gz"]
 
     @pytest.mark.parametrize("code,metric,expected_value", [
         ("10113250", QUANTITYKIND.Mass, 2717950),
@@ -56,8 +56,6 @@ class TestProdcom2016Data:
     @pytest.mark.parametrize("code,metric", [
         ("14391090", QUANTITYKIND.AmountOfSubstance),  # No PRDQNT, :C for PQNTFLAG
         ("20141325", QUANTITYKIND.Mass),               # No PRDQNT, :C for PQNTFLAG
-        # This one has no measurement or data at all     No PRDQNT,  - for PQNTFLAG
-        ("20421975", QUANTITYKIND.Unknown),
     ])
     def test_missing_measurements(self, rdfox, code, metric):
         # these codes have no data for PRDQNT
@@ -71,8 +69,8 @@ class TestProdcom2016Data:
         assert_no_result(result)
 
     @pytest.mark.parametrize("code,metric", [
-        ("14391090", QUANTITYKIND.Mass),
-        ("20141325", QUANTITYKIND.AmountOfSubstance),
+        ("10841270", QUANTITYKIND.Volume),
+        ("20132455", QUANTITYKIND.Mass),
     ])
     def test_no_result_with_wrong_metric(self, rdfox, code, metric):
         # same as above, but with the wrong metric
@@ -86,3 +84,51 @@ class TestProdcom2016Data:
         assert_no_result(result)
 
 
+class TestProdcom2017Data:
+    """Test PRODCOM2017 DATA conversion.
+
+    Just include a few expected values, since other behaviour is tested for the 2016 data.
+    """
+
+    endpoint_data = ["outputs/sold_production/DS-056120_006-2017.nt.gz"]
+
+    @pytest.mark.parametrize("code,metric,expected_value", [
+        ("10721253", QUANTITYKIND.Mass, 244871494),
+        ("16212400", QUANTITYKIND.Volume, 0),
+        ("23111290", QUANTITYKIND.Area, 28514781),
+        ("25711430", QUANTITYKIND.AmountOfSubstance, 329685),
+    ])
+    def test_expected_measurements(self, rdfox, code, metric, expected_value):
+        result = rdfox.get_observations(
+            PROBS.TimePeriod_YearOf2017,
+            GEONAMES["2635167"],
+            metric,
+            PROBS.SoldProduction,
+            object_=PRODCOM2017["Object-" + code]
+        )
+        assert_exact_results(result, expected_value)
+
+
+class TestProdcom2018Data:
+    """Test PRODCOM2018 DATA conversion.
+
+    Just include a few expected values, since other behaviour is tested for the 2016 data.
+    """
+
+    endpoint_data = ["outputs/sold_production/DS-056120_006-2018.nt.gz"]
+
+    @pytest.mark.parametrize("code,metric,expected_value", [
+        ("10131120", QUANTITYKIND.Mass, 104282828),
+        ("11052000", QUANTITYKIND.Mass, 737526732),
+        ("13921530", QUANTITYKIND.Area, 1197738),
+        ("15112200", QUANTITYKIND.Area, 0),
+    ])
+    def test_expected_measurements(self, rdfox, code, metric, expected_value):
+        result = rdfox.get_observations(
+            PROBS.TimePeriod_YearOf2018,
+            GEONAMES["2635167"],
+            metric,
+            PROBS.SoldProduction,
+            object_=PRODCOM2017["Object-" + code]
+        )
+        assert_exact_results(result, expected_value)

+ 135 - 0
tests/test_prodcom_bulk_total.py

@@ -0,0 +1,135 @@
+#!/usr/bin/env python3
+import pytest
+from numpy import isnan
+from hashlib import sha256
+
+from rdflib import Namespace
+from probs_runner import PROBS, QUANTITYKIND
+from numpy.testing import assert_allclose
+
+PRODCOM2016 = Namespace("http://w3id.org/probs-lab/data/prodcom/2016/")
+PRODCOM2017 = Namespace("http://w3id.org/probs-lab/data/prodcom/2017/")
+GEONAMES = Namespace("https://sws.geonames.org/")
+
+def assert_exact_results(result, expected_value):
+    assert len(result) == 1
+    assert result[0].bound == PROBS.ExactBound
+    assert_allclose(result[0].measurement, expected_value, rtol=1e-3)
+
+
+def assert_no_measurement(result):
+    assert len(result) == 1
+    assert isnan(result[0].measurement)
+
+def assert_no_result(result):
+    assert len(result) == 0
+
+
+def obj_id(obj_code):
+    return ("Object-" + sha256(obj_code.encode('utf-8')).hexdigest())
+
+
+# Do most tests for 2016; then spot check a few values for the other years
+
+class TestProdcom2016Data:
+    """Test PRODCOM2016 DATA conversion"""
+
+    endpoint_data = ["outputs/total_production/DS-056121_006-2016.nt.gz"]
+
+    @pytest.mark.parametrize("code,metric,expected_value", [
+        ("13103100", QUANTITYKIND.Mass, 224299),
+        ("13911910", QUANTITYKIND.Mass, 12566190),
+        ("15115100", QUANTITYKIND.Mass, 0),
+        ("20132455", QUANTITYKIND.MassAmountOfSubstance, 1369740),
+        ("20141290", QUANTITYKIND.Mass, 208123),
+    ])
+    def test_expected_measurements(self, rdfox, code, metric, expected_value):
+        result = rdfox.get_observations(
+            PROBS.TimePeriod_YearOf2016,
+            GEONAMES["2635167"],
+            metric,
+            PROBS.TotalProduction,
+            object_=PRODCOM2017["Object-" + code]
+        )
+        assert_exact_results(result, expected_value)
+
+    @pytest.mark.parametrize("code,metric", [
+        ("13202042", QUANTITYKIND.Area),              # No PRDQNT, :C for PQNTFLAG 
+        ("20141325", QUANTITYKIND.Mass),              # No PRDQNT, :C for PQNTFLAG
+    ])
+    def test_missing_measurements(self, rdfox, code, metric):
+        # these codes have no data for PRDQNT
+        result = rdfox.get_observations(
+            PROBS.TimePeriod_YearOf2016,
+            GEONAMES["2635167"] ,
+            metric,
+            PROBS.TotalProduction,
+            object_=PRODCOM2017["Object-" + code],
+        )
+        assert_no_result(result)
+        print(PRODCOM2017[obj_id(code)])
+
+    @pytest.mark.parametrize("code,metric", [
+        ("13201230", QUANTITYKIND.Mass),
+        ("20144370", QUANTITYKIND.AmountOfSubstance),
+    ])
+    def test_no_result_with_wrong_metric(self, rdfox, code, metric):
+        # same as above, but with the wrong metric
+        result = rdfox.get_observations(
+            PROBS.TimePeriod_YearOf2016,
+            GEONAMES["2635167"] ,
+            metric,
+            PROBS.TotalProduction,
+            object_=PRODCOM2017["Object-" + code],
+        )
+        assert_no_result(result)
+
+
+class TestProdcom2017Data:
+    """Test PRODCOM2017 DATA conversion.
+
+    Just include a few expected values, since other behaviour is tested for the 2016 data.
+    """
+
+    endpoint_data = ["outputs/total_production/DS-056121_006-2017.nt.gz"]
+
+    @pytest.mark.parametrize("code,metric,expected_value", [
+        ("08111250", QUANTITYKIND.Mass, 604132000),
+        ("20111120", QUANTITYKIND.Volume, 387410000),
+        ("13201230", QUANTITYKIND.Area, 8038713),
+        ("20122415", QUANTITYKIND.MassAmountOfSubstance, 31291132),
+    ])
+    def test_expected_measurements(self, rdfox, code, metric, expected_value):
+        result = rdfox.get_observations(
+            PROBS.TimePeriod_YearOf2017,
+            GEONAMES["2635167"],
+            metric,
+            PROBS.TotalProduction,
+            object_=PRODCOM2017["Object-" + code]
+        )
+        assert_exact_results(result, expected_value)
+
+
+class TestProdcom2018Data:
+    """Test PRODCOM2018 DATA conversion.
+
+    Just include a few expected values, since other behaviour is tested for the 2016 data.
+    """
+
+    endpoint_data = ["outputs/total_production/DS-056121_006-2018.nt.gz"]
+
+    @pytest.mark.parametrize("code,metric,expected_value", [
+        ("08114000", QUANTITYKIND.Mass, 41455000),
+        ("13106135", QUANTITYKIND.Mass, 9193633),
+        ("13203210", QUANTITYKIND.Area, 4300488),
+        ("15112200", QUANTITYKIND.Area, 0),
+    ])
+    def test_expected_measurements(self, rdfox, code, metric, expected_value):
+        result = rdfox.get_observations(
+            PROBS.TimePeriod_YearOf2018,
+            GEONAMES["2635167"],
+            metric,
+            PROBS.TotalProduction,
+            object_=PRODCOM2017["Object-" + code]
+        )
+        assert_exact_results(result, expected_value)