Revisão | 2aeecd2bac13f8a430891b9f42f01bedcc43d02c (tree) |
---|---|
Hora | 2024-09-18 04:08:05 |
Autor | Lorenzo Isella <lorenzo.isella@gmai...> |
Commiter | Lorenzo Isella |
A cleanup of the code to merge the various parquet files.
@@ -11,16 +11,16 @@ | ||
11 | 11 | |
12 | 12 | add_tctf <- 1 |
13 | 13 | |
14 | -read_all <- 0 | |
14 | +## read_all <- 1 | |
15 | 15 | |
16 | 16 | |
17 | 17 | |
18 | -tctf <- open_dataset("TCTF_cases.csv", format="csv") | |
18 | +tctf <- open_dataset("../input/csv_files/TCTF_cases.csv", format="csv") | |
19 | 19 | |
20 | 20 | |
21 | -if (read_all==1){ | |
21 | +## if (read_all==1){ | |
22 | 22 | |
23 | -df <- open_dataset("./input_all/", unify_schemas = TRUE) |> | |
23 | +df <- open_dataset("../input/parquet-files/", unify_schemas = TRUE) |> | |
24 | 24 | select(everything()) ### I need to select the columns in any case. |
25 | 25 | |
26 | 26 |
@@ -32,10 +32,13 @@ | ||
32 | 32 | |
33 | 33 | } |
34 | 34 | |
35 | + | |
36 | +remove_files_with_pattern("../output/*parquet") | |
37 | + | |
35 | 38 | write_dataset( |
36 | 39 | df, |
37 | 40 | format = "parquet", |
38 | - path = "./data_output_all/", | |
41 | + path = "../output/", | |
39 | 42 | basename_template="tam_finale_20-03-2024-{i}.parquet" , |
40 | 43 | max_rows_per_file = 5e5 |
41 | 44 | ) |
@@ -48,82 +51,89 @@ | ||
48 | 51 | |
49 | 52 | print("I start writing the csv files") |
50 | 53 | |
54 | +remove_files_with_pattern("../output/flat_files/*csv") | |
55 | + | |
51 | 56 | write_dataset( |
52 | 57 | df, |
53 | 58 | format = "csv", |
54 | - path = "./data_output_all/flat_files/", | |
59 | + path = "../output/flat_files/", | |
55 | 60 | basename_template="tam_finale_20-03-2024-{i}.csv" , |
56 | 61 | max_rows_per_file = 5e5, |
57 | 62 | max_open_files=5) |
58 | 63 | |
64 | +system("csvstack ../output/flat_files/*csv > ../output/flat_files/tam_complete.csv") | |
59 | 65 | |
60 | -system("gzip ./data_output_all/flat_files/*csv") | |
61 | 66 | |
67 | + | |
68 | +system("gzip -f -9 ../output/flat_files/tam_complete.csv") | |
69 | + | |
70 | +remove_files_with_pattern("../output/flat_files/*csv") | |
71 | + | |
62 | 72 | |
63 | 73 | } |
64 | 74 | |
65 | 75 | |
66 | 76 | |
67 | 77 | |
68 | -} else{ | |
69 | - | |
70 | -df <- open_dataset("./input_all/extra", unify_schemas = TRUE) |> | |
71 | - select(everything()) ### I need to select the columns in any case. | |
72 | - | |
73 | -if (add_tctf==1){ | |
74 | - | |
75 | - df <- df |> | |
76 | - left_join(y=tctf, by=c("case_reference"="sa_case_number")) |> | |
77 | - mutate(is_tctf_case=if_else(is.na(is_tctf_case), "No", is_tctf_case)) | |
78 | -} | |
78 | +## } else{ | |
79 | 79 | |
80 | -write_dataset( | |
81 | - df, | |
82 | - format = "parquet", | |
83 | - path = "./data_output_all/TAM_ES_RO_SI/parquet/", | |
84 | - basename_template="tam_tctf_20-03-2024-{i}.parquet" , | |
85 | - max_rows_per_file = 5e5 | |
86 | -) | |
80 | +## df <- open_dataset("../input_all/extra", unify_schemas = TRUE) |> | |
81 | +## select(everything()) ### I need to select the columns in any case. | |
87 | 82 | |
88 | -print("writing the TAM compressed csv file") | |
83 | +## if (add_tctf==1){ | |
89 | 84 | |
90 | -write_csv_arrow( | |
91 | - df,"./data_output_all/TAM_ES_RO_SI/flat_file/transparency_tctf-20-03-2024.csv.gz") | |
85 | +## df <- df |> | |
86 | +## left_join(y=tctf, by=c("case_reference"="sa_case_number")) |> | |
87 | +## mutate(is_tctf_case=if_else(is.na(is_tctf_case), "No", is_tctf_case)) | |
88 | +## } | |
89 | + | |
90 | +## write_dataset( | |
91 | +## df, | |
92 | +## format = "parquet", | |
93 | +## path = "../data_output_all/TAM_ES_RO_SI/parquet/", | |
94 | +## basename_template="tam_tctf_20-03-2024-{i}.parquet" , | |
95 | +## max_rows_per_file = 5e5 | |
96 | +## ) | |
97 | + | |
98 | +## print("writing the TAM compressed csv file") | |
99 | + | |
100 | +## write_csv_arrow( | |
101 | +## df,"../data_output_all/TAM_ES_RO_SI/flat_file/transparency_tctf-20-03-2024.csv.gz") | |
92 | 102 | |
93 | 103 | |
94 | 104 | |
95 | 105 | |
96 | -### now we do Poland | |
106 | +## ### now we do Poland | |
97 | 107 | |
98 | -df<- open_dataset("./input_all/poland/", unify_schemas = TRUE) |> | |
99 | - select(everything()) ### I need to select the columns in any case. | |
108 | +## df<- open_dataset("../input_all/poland/", unify_schemas = TRUE) |> | |
109 | +## select(everything()) ### I need to select the columns in any case. | |
100 | 110 | |
101 | 111 | |
102 | - if (add_tctf==1) { | |
103 | - | |
104 | - df <- df |> | |
105 | - left_join(y=tctf, by=c("case_reference"="sa_case_number")) |> | |
106 | - mutate(is_tctf_case=if_else(is.na(is_tctf_case), "No", is_tctf_case)) | |
107 | - | |
108 | -} | |
112 | +## if (add_tctf==1) { | |
109 | 113 | |
110 | -write_dataset( | |
111 | - df, | |
112 | - format = "parquet", | |
113 | - path = "./data_output_all/poland/arrow/", | |
114 | - basename_template="poland_tctf_20-03-2024-{i}.parquet" , | |
115 | - max_rows_per_file = 5e5 | |
116 | -) | |
114 | +## df <- df |> | |
115 | +## left_join(y=tctf, by=c("case_reference"="sa_case_number")) |> | |
116 | +## mutate(is_tctf_case=if_else(is.na(is_tctf_case), "No", is_tctf_case)) | |
117 | 117 | |
118 | -print("writing Poland compressed csv file") | |
118 | +## } | |
119 | 119 | |
120 | -write_csv_arrow( | |
121 | - df,"./data_output_all/poland/flat_file/poland_tctf-20-03-2024.csv.gz") | |
120 | +## write_dataset( | |
121 | +## df, | |
122 | +## format = "parquet", | |
123 | +## path = "../data_output_all/poland/arrow/", | |
124 | +## basename_template="poland_tctf_20-03-2024-{i}.parquet" , | |
125 | +## max_rows_per_file = 5e5 | |
126 | +## ) | |
127 | + | |
128 | +## print("writing Poland compressed csv file") | |
129 | + | |
130 | +## write_csv_arrow( | |
131 | +## df,"../data_output_all/poland/flat_file/poland_tctf-20-03-2024.csv.gz") | |
122 | 132 | |
123 | 133 | |
124 | 134 | |
125 | 135 | |
126 | -} | |
136 | +## } | |
127 | 137 | |
128 | 138 | |
129 | 139 |