-
Notifications
You must be signed in to change notification settings - Fork 152
Expand file tree
/
Copy pathq02_minimum_cost_supplier.py
More file actions
183 lines (157 loc) · 5.81 KB
/
q02_minimum_cost_supplier.py
File metadata and controls
183 lines (157 loc) · 5.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
TPC-H Problem Statement Query 2:
The Minimum Cost Supplier Query finds, in a given region, for each part of a certain type and size,
the supplier who can supply it at minimum cost. If several suppliers in that region offer the
desired part type and size at the same (minimum) cost, the query lists the parts from suppliers with
the 100 highest account balances. For each supplier, the query lists the supplier's account balance,
name and nation; the part's number and manufacturer; the supplier's address, phone number and
comment information.
The above problem statement text is copyrighted by the Transaction Processing Performance Council
as part of their TPC Benchmark H Specification revision 2.18.0.
Reference SQL (from TPC-H specification, used by the benchmark suite)::
select
s_acctbal,
s_name,
n_name,
p_partkey,
p_mfgr,
s_address,
s_phone,
s_comment
from
part,
supplier,
partsupp,
nation,
region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and p_size = 15
and p_type like '%BRASS'
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'EUROPE'
and ps_supplycost = (
select
min(ps_supplycost)
from
partsupp,
supplier,
nation,
region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'EUROPE'
)
order by
s_acctbal desc,
n_name,
s_name,
p_partkey limit 100;
"""
import datafusion
from datafusion import SessionContext, col, lit
from datafusion import functions as F
from datafusion.expr import Window
from util import get_data_path
# This is the part we're looking for. Values selected here differ from the spec in order to run
# unit tests on a small data set.
SIZE_OF_INTEREST = 15
TYPE_OF_INTEREST = "BRASS"
REGION_OF_INTEREST = "EUROPE"
# Load the dataframes we need
ctx = SessionContext()
df_part = ctx.read_parquet(get_data_path("part.parquet")).select(
"p_partkey", "p_mfgr", "p_type", "p_size"
)
df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select(
"s_acctbal",
"s_name",
"s_address",
"s_phone",
"s_comment",
"s_nationkey",
"s_suppkey",
)
df_partsupp = ctx.read_parquet(get_data_path("partsupp.parquet")).select(
"ps_partkey", "ps_suppkey", "ps_supplycost"
)
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select(
"n_nationkey", "n_regionkey", "n_name"
)
df_region = ctx.read_parquet(get_data_path("region.parquet")).select(
"r_regionkey", "r_name"
)
# Filter down parts. The reference SQL uses ``p_type like '%BRASS'`` which
# is an ``ends_with`` check; use the dedicated string function rather than
# a manual substring match.
df_part = df_part.filter(
F.ends_with(col("p_type"), lit(TYPE_OF_INTEREST)),
col("p_size") == SIZE_OF_INTEREST,
)
# Filter regions down to the one of interest
df_region = df_region.filter(col("r_name") == REGION_OF_INTEREST)
# Now that we have the region, find suppliers in that region. Suppliers are tied to their nation
# and nations are tied to the region.
df_nation = df_nation.join(df_region, left_on="n_regionkey", right_on="r_regionkey")
df_supplier = df_supplier.join(df_nation, left_on="s_nationkey", right_on="n_nationkey")
# Now that we know who the potential suppliers are for the part, we can limit out part
# supplies table down. We can further join down to the specific parts we've identified
# as matching the request
df = df_partsupp.join(df_supplier, left_on="ps_suppkey", right_on="s_suppkey")
# Locate the minimum cost across all suppliers. There are multiple ways you could do this,
# but one way is to create a window function across all suppliers, find the minimum, and
# create a column of that value. We can then filter down any rows for which the cost and
# minimum do not match.
# The default window frame as of 5/6/2024 is from unbounded preceding to the current row.
# We want to evaluate the entire data frame, so we specify this.
window_frame = datafusion.WindowFrame("rows", None, None)
df = df.with_column(
"min_cost",
F.min(col("ps_supplycost")).over(
Window(partition_by=[col("ps_partkey")], window_frame=window_frame)
),
)
df = df.filter(col("min_cost") == col("ps_supplycost")).join(
df_part, left_on="ps_partkey", right_on="p_partkey"
)
# From the problem statement, these are the values we wish to output
df = df.select(
"s_acctbal",
"s_name",
"n_name",
"p_partkey",
"p_mfgr",
"s_address",
"s_phone",
"s_comment",
)
# Sort and display 100 entries
df = df.sort(
col("s_acctbal").sort(ascending=False),
"n_name",
"s_name",
"p_partkey",
).limit(100)
# Show results
df.show()