Skip to content

Commit

Permalink
align groupby q9, followup of #40
Browse files Browse the repository at this point in the history
  • Loading branch information
jangorecki committed Jan 4, 2021
1 parent 59d5e41 commit 73647e5
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 23 deletions.
14 changes: 7 additions & 7 deletions _benchplot/benchplot-dict.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ groupby.syntax.dict = {list(
"median v3 sd v3 by id4 id5" = "DF.groupby(['id4','id5'], as_index=False, sort=False, observed=True, dropna=False).agg({'v3': ['median','std']})",
"max v1 - min v2 by id3" = "DF.groupby('id3', as_index=False, sort=False, observed=True, dropna=False).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['id3','range_v1_v2']]",
"largest two v3 by id6" = "DF[~x['v3'].isna()][['id6','v3']].sort_values('v3', ascending=False).groupby('id6', as_index=False, sort=False, observed=True, dropna=False).head(2)",
"regression v1 v2 by id2 id4" = "DF[~x['v1'].isna() & ~x['v2'].isna()][['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, observed=True, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))",
"regression v1 v2 by id2 id4" = "DF[['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, observed=True, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))",
"sum v3 count by id1:id6" = "DF.groupby(['id1','id2','id3','id4','id5','id6'], as_index=False, sort=False, observed=True, dropna=False).agg({'v3':'sum', 'v1':'size'})"
)},
"pydatatable" = {c(
Expand All @@ -103,7 +103,7 @@ groupby.syntax.dict = {list(
"median v3 sd v3 by id4 id5" = "DT[:, {'median_v3': median(f.v3), 'sd_v3': sd(f.v3)}, by(f.id4, f.id5)]",
"max v1 - min v2 by id3" = "DT[:, {'range_v1_v2': max(f.v1)-min(f.v2)}, by(f.id3)]",
"largest two v3 by id6" = "DT[~isna(f.v3),:][:2, {'largest2_v3': f.v3}, by(f.id6), sort(-f.v3)]",
"regression v1 v2 by id2 id4" = "DT[~isna(f.v1) & ~isna(f.v2),:][:, {'r2': corr(f.v1, f.v2)**2}, by(f.id2, f.id4)]",
"regression v1 v2 by id2 id4" = "DT[:, {'r2': corr(f.v1, f.v2)**2}, by(f.id2, f.id4)]",
"sum v3 count by id1:id6" = "DT[:, {'v3': sum(f.v3), 'count': count()}, by(f.id1, f.id2, f.id3, f.id4, f.id5, f.id6)]"
)},
"dask" = {c(
Expand All @@ -115,7 +115,7 @@ groupby.syntax.dict = {list(
"median v3 sd v3 by id4 id5" = "", # DF.groupby(['id4','id5'], dropna=False).agg({'v3': ['median','std']}).compute()"
"max v1 - min v2 by id3" = "DF.groupby('id3', dropna=False).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']].compute()",
"largest two v3 by id6" = "DF[~x['v3'].isna()][['id6','v3']].groupby('id6', dropna=False).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']].compute()",
"regression v1 v2 by id2 id4" = "", # "DF[~x['v1'].isna() & ~x['v2'].isna()][['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()"
"regression v1 v2 by id2 id4" = "", # "DF[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()"
"sum v3 count by id1:id6" = "DF.groupby(['id1','id2','id3','id4','id5','id6'], dropna=False).agg({'v3':'sum', 'v1':'size'}).compute()"
)},
"spark" = {c(
Expand All @@ -127,7 +127,7 @@ groupby.syntax.dict = {list(
"median v3 sd v3 by id4 id5" = "", # "select id4, id5, median(v3) as median_v3, stddev(v3) as sd_v3 from x group by id4, id5"
"max v1 - min v2 by id3" = "select id3, max(v1)-min(v2) as range_v1_v2 from x group by id3",
"largest two v3 by id6" = "select id6, largest2_v3 from (select id6, v3 as largest2_v3, row_number() over (partition by id6 order by v3 desc) as order_v3 from x where v3 is not null) sub_query where order_v3 <= 2",
"regression v1 v2 by id2 id4" = "select id2, id4, pow(corr(v1, v2), 2) as r2 from x where v1 is not null and v2 is not null group by id2, id4",
"regression v1 v2 by id2 id4" = "select id2, id4, pow(corr(v1, v2), 2) as r2 from x group by id2, id4",
"sum v3 count by id1:id6" = "select id1, id2, id3, id4, id5, id6, sum(v3) as v3, count(*) as count from x group by id1, id2, id3, id4, id5, id6"
)},
"juliadf" = {c(
Expand All @@ -139,7 +139,7 @@ groupby.syntax.dict = {list(
"median v3 sd v3 by id4 id5" = "combine(groupby(x, [:id4, :id5]), :v3 => median∘skipmissing => :median_v3, :v3 => std∘skipmissing => :sd_v3)",
"max v1 - min v2 by id3" = "combine(groupby(x, :id3), [:v1, :v2] => ((v1, v2) -> maximum(skipmissing(v1))-minimum(skipmissing(v2))) => :range_v1_v2)",
"largest two v3 by id6" = "combine(groupby(dropmissing(x, :v3), :id6), :v3 => (x -> partialsort!(x, 1:min(2, length(x)), rev=true)) => :largest2_v3)",
"regression v1 v2 by id2 id4" = "combine(groupby(dropmissing(x, [:v1, :v2]), [:id2, :id4]), [:v1, :v2] => ((v1,v2) -> cor(v1, v2)^2) => :r2)",
"regression v1 v2 by id2 id4" = "combine(groupby(x, [:id2, :id4]), [:v1, :v2] => ((v1,v2) -> cor(v1, v2)^2) => :r2)",
"sum v3 count by id1:id6" = "combine(groupby(x, [:id1, :id2, :id3, :id4, :id5, :id6]), :v3 => sum∘skipmissing => :v3, :v3 => length => :count)"
)},
"cudf" = {c(
Expand All @@ -151,7 +151,7 @@ groupby.syntax.dict = {list(
"median v3 sd v3 by id4 id5" = "DF.groupby(['id4','id5'], as_index=False, sort=False, dropna=False).agg({'v3': ['median','std']})",
"max v1 - min v2 by id3" = "", # "DF.groupby('id3', as_index=False, sort=False, dropna=False).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']]"
"largest two v3 by id6" = "", # "DF[~x['v3'].isna()][['id6','v3']].sort_values('v3', ascending=False).groupby('id6', as_index=False, sort=False, dropna=False).head(2)"
"regression v1 v2 by id2 id4" = "", # "DF[~x['v1'].isna() & ~x['v2'].isna()][['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))"
"regression v1 v2 by id2 id4" = "", # "DF[['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))"
"sum v3 count by id1:id6" = "DF.groupby(['id1','id2','id3','id4','id5','id6'], as_index=False, sort=False, dropna=False).agg({'v3':'sum', 'v1':'size'})"
)},
"clickhouse" = {c(
Expand All @@ -163,7 +163,7 @@ groupby.syntax.dict = {list(
"median v3 sd v3 by id4 id5" = "SELECT id4, id5, medianExact(v3) AS median_v3, stddevPop(v3) AS sd_v3 FROM x GROUP BY id4, id5",
"max v1 - min v2 by id3" = "SELECT id3, max(v1) - min(v2) AS range_v1_v2 FROM x GROUP BY id3",
"largest two v3 by id6" = "SELECT id6, arrayJoin(arraySlice(arrayReverseSort(groupArray(v3)), 1, 2)) AS v3 FROM (SELECT id6, v3 FROM x WHERE v3 IS NOT NULL) AS subq GROUP BY id6",
"regression v1 v2 by id2 id4" = "SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM x WHERE v1 IS NOT NULL AND v2 IS NOT NULL GROUP BY id2, id4",
"regression v1 v2 by id2 id4" = "SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM x GROUP BY id2, id4",
"sum v3 count by id1:id6" = "SELECT id1, id2, id3, id4, id5, id6, sum(v3) AS v3, count() AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6"
)}
)}
Expand Down
8 changes: 4 additions & 4 deletions clickhouse/groupby-clickhouse.sql.in
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,19 @@ DROP TABLE ans;
/* q9: question='regression v1 v2 by id2 id4' */

SET log_queries = 1;
CREATE TABLE ans ENGINE = Memory AS SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM DATA_NAME WHERE v1 IS NOT NULL AND v2 IS NOT NULL GROUP BY id2, id4;
CREATE TABLE ans ENGINE = Memory AS SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM DATA_NAME GROUP BY id2, id4;
SET log_queries = 0;
SYSTEM FLUSH LOGS;
SELECT 1 AS run, toUnixTimestamp(now()) AS timestamp, 'groupby' AS task, 'DATA_NAME' AS data_name, NULL AS in_rows, 'regression v1 v2 by id2 id4' AS question, result_rows AS out_rows, NULL AS out_cols, 'clickhouse' AS solution, version() AS version, NULL AS git, 'select group by' AS fun, query_duration_ms/1000 AS time_sec, memory_usage/1073741824 AS mem_gb, 1 AS cache, NULL AS chk, NULL AS chk_time_sec, 1 AS on_disk
FROM system.query_log WHERE type=2 AND query='CREATE TABLE ans ENGINE = Memory AS SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM DATA_NAME WHERE v1 IS NOT NULL AND v2 IS NOT NULL GROUP BY id2, id4;\n' ORDER BY query_start_time DESC LIMIT 1 INTO OUTFILE 'clickhouse/log/groupby_DATA_NAME_q9_r1.csv' FORMAT CSVWithNames;
FROM system.query_log WHERE type=2 AND query='CREATE TABLE ans ENGINE = Memory AS SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM DATA_NAME GROUP BY id2, id4;\n' ORDER BY query_start_time DESC LIMIT 1 INTO OUTFILE 'clickhouse/log/groupby_DATA_NAME_q9_r1.csv' FORMAT CSVWithNames;
DROP TABLE ans;

SET log_queries = 1;
CREATE TABLE ans ENGINE = Memory AS SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM DATA_NAME WHERE v1 IS NOT NULL AND v2 IS NOT NULL GROUP BY id2, id4;
CREATE TABLE ans ENGINE = Memory AS SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM DATA_NAME GROUP BY id2, id4;
SET log_queries = 0;
SYSTEM FLUSH LOGS;
SELECT 2 AS run, toUnixTimestamp(now()) AS timestamp, 'groupby' AS task, 'DATA_NAME' AS data_name, NULL AS in_rows, 'regression v1 v2 by id2 id4' AS question, result_rows AS out_rows, NULL AS out_cols, 'clickhouse' AS solution, version() AS version, NULL AS git, 'select group by' AS fun, query_duration_ms/1000 AS time_sec, memory_usage/1073741824 AS mem_gb, 1 AS cache, NULL AS chk, NULL AS chk_time_sec, 1 AS on_disk
FROM system.query_log WHERE type=2 AND query='CREATE TABLE ans ENGINE = Memory AS SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM DATA_NAME WHERE v1 IS NOT NULL AND v2 IS NOT NULL GROUP BY id2, id4;\n' ORDER BY query_start_time DESC LIMIT 1 INTO OUTFILE 'clickhouse/log/groupby_DATA_NAME_q9_r2.csv' FORMAT CSVWithNames;
FROM system.query_log WHERE type=2 AND query='CREATE TABLE ans ENGINE = Memory AS SELECT id2, id4, pow(corr(v1, v2), 2) AS r2 FROM DATA_NAME GROUP BY id2, id4;\n' ORDER BY query_start_time DESC LIMIT 1 INTO OUTFILE 'clickhouse/log/groupby_DATA_NAME_q9_r2.csv' FORMAT CSVWithNames;
SELECT * FROM ans LIMIT 3;
DROP TABLE ans;

Expand Down
4 changes: 2 additions & 2 deletions cudf/groupby-cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@
#question = "regression v1 v2 by id2 id4" # q9 # not yet implemented: https://github.com/rapidsai/cudf/issues/1267
#gc.collect()
#t_start = timeit.default_timer()
#ans = x[~x['v1'].isna() & ~x['v2'].isna()][['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))
#ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
Expand All @@ -258,7 +258,7 @@
#del ans
#gc.collect()
#t_start = timeit.default_timer()
#ans = x[~x['v1'].isna() & ~x['v2'].isna()][['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))
#ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
Expand Down
4 changes: 2 additions & 2 deletions dask/groupby-dask2.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@
#question = "regression v1 v2 by id2 id4" # q9 - https://github.com/dask/dask/issues/4828
#gc.collect()
#t_start = timeit.default_timer()
#ans = x[~x['v1'].isna() & ~x['v2'].isna()][['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()
#ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()
#ans.reset_index(inplace=True)
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
Expand All @@ -290,7 +290,7 @@
#del ans
#gc.collect()
#t_start = timeit.default_timer()
#ans = x[~x['v1'].isna() & ~x['v2'].isna()][['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()
#ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()
#ans.reset_index(inplace=True)
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
Expand Down
4 changes: 2 additions & 2 deletions juliadf/groupby-juliadf.jl
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ ANS = 0;

question = "regression v1 v2 by id2 id4"; # q9
GC.gc();
t = @elapsed (ANS = combine(groupby(dropmissing(x, [:v1, :v2]), [:id2, :id4]), [:v1, :v2] => ((v1,v2) -> cor(v1, v2)^2) => :r2); println(size(ANS)); flush(stdout));
t = @elapsed (ANS = combine(groupby(x, [:id2, :id4]), [:v1, :v2] => ((v1,v2) -> cor(v1, v2)^2) => :r2); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.r2);
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(groupby(dropmissing(x, [:v1, :v2]), [:id2, :id4]), [:v1, :v2] => ((v1,v2) -> cor(v1, v2)^2) => :r2); println(size(ANS)); flush(stdout));
t = @elapsed (ANS = combine(groupby(x, [:id2, :id4]), [:v1, :v2] => ((v1,v2) -> cor(v1, v2)^2) => :r2); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.r2);
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
Expand Down
4 changes: 2 additions & 2 deletions pandas/groupby-pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@
#corr().iloc[0::2][['v2']]**2 # on 1e8,k=1e2 slower, 76s vs 47s
gc.collect()
t_start = timeit.default_timer()
ans = x[~x['v1'].isna() & ~x['v2'].isna()][['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, observed=True, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))
ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, observed=True, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -270,7 +270,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x[~x['v1'].isna() & ~x['v2'].isna()][['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, observed=True, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))
ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, observed=True, dropna=False).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}))
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand Down
4 changes: 2 additions & 2 deletions pydatatable/groupby-pydatatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@
question = 'regression v1 v2 by id2 id4' # q9
gc.collect()
t_start = timeit.default_timer()
ans = x[~isna(f.v1) & ~isna(f.v2),:][:, {'r2': corr(f.v1, f.v2)**2}, by(f.id2, f.id4)]
ans = x[:, {'r2': corr(f.v1, f.v2)**2}, by(f.id2, f.id4)]
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -261,7 +261,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x[~isna(f.v1) & ~isna(f.v2),:][:, {'r2': corr(f.v1, f.v2)**2}, by(f.id2, f.id4)]
ans = x[:, {'r2': corr(f.v1, f.v2)**2}, by(f.id2, f.id4)]
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand Down
4 changes: 2 additions & 2 deletions spark/groupby-spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@
question = "regression v1 v2 by id2 id4" # q9
gc.collect()
t_start = timeit.default_timer()
ans = spark.sql("select id2, id4, pow(corr(v1, v2), 2) as r2 from x where v1 is not null and v2 is not null group by id2, id4").persist(pyspark.StorageLevel.MEMORY_ONLY)
ans = spark.sql("select id2, id4, pow(corr(v1, v2), 2) as r2 from x group by id2, id4").persist(pyspark.StorageLevel.MEMORY_ONLY)
print((ans.count(), len(ans.columns)), flush=True) # shape
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -318,7 +318,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = spark.sql("select id2, id4, pow(corr(v1, v2), 2) as r2 from x where v1 is not null and v2 is not null group by id2, id4").persist(pyspark.StorageLevel.MEMORY_ONLY)
ans = spark.sql("select id2, id4, pow(corr(v1, v2), 2) as r2 from x group by id2, id4").persist(pyspark.StorageLevel.MEMORY_ONLY)
print((ans.count(), len(ans.columns)), flush=True) # shape
t = timeit.default_timer() - t_start
m = memory_usage()
Expand Down

0 comments on commit 73647e5

Please sign in to comment.