Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions RecommenderSystems/dlrm/tools/criteo1t_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ def make_dlrm_parquet(
dense_cols = [make_dense(Ii).alias(Ii) for i, Ii in enumerate(dense_names)]

if mod_idx <= 0:
sparse_cols = [xxhash64(Ci, lit(i - 1)).alias(Ci) for i, Ci in enumerate(sparse_names)]
sparse_cols = [xxhash64(Ci, lit(i)).alias(Ci) for i, Ci in enumerate(sparse_names)]
else:
make_sparse = udf(
lambda s, i: mod_idx * i if s is None else int(s, 16) % mod_idx + mod_idx * i,
lambda s, i: mod_idx * (i + 1) if s is None else int(s, 16) % mod_idx + mod_idx * i,
LongType(),
)
sparse_cols = [make_sparse(Ci, lit(i - 1)).alias(Ci) for i, Ci in enumerate(sparse_names)]
sparse_cols = [make_sparse(Ci, lit(i)).alias(Ci) for i, Ci in enumerate(sparse_names)]

cols = [label_col] + dense_cols + sparse_cols

Expand Down
19 changes: 11 additions & 8 deletions RecommenderSystems/dlrm/tools/criteo1t_parquet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,28 @@ val dense_names = (1 to 13).map{id=>s"I$id"}
val integer_names = Seq("label") ++ dense_names
val col_names = integer_names ++ categorical_names

val mod_idx = 40000000L
val src_dir = "/path/to/unziped/criteo1t"
val dst_dir = "/path/to/output"
val tmp_dir = "/path/to/tmp_spark"

// split day_23() to test.csv and val.csv
// # head -n 89137319 src/day_23 > dst/test.csv
// # tail -n +89137320 src/day_23 > dst/val.csv
// total 178274637, test 89137319, val 89137318

val day_23 = s"${src_dir}/day_23"
val test_csv = s"${tmp_dir}/test.csv"
val val_csv = s"${tmp_dir}/val.csv"

val make_label = udf((str:String) => str.toFloat)
val make_dense = udf((str:String) => if (str == null) 1 else str.toFloat + 1)
val make_sparse = udf((str:String, i:Long) => (if (str == null) (i+1) * 40000000L else Math.floorMod(Integer.parseUnsignedInt(str, 16).toLong, 40000000L)) + i * 40000000L)
val label_cols = Seq(make_label($"label").as("label"))

val make_dense = udf((str:String) => if (str == null) 1 else str.toFloat + 1)
val dense_cols = 1.to(13).map{i=>make_dense(col(s"I$i")).as(s"I${i}")}
val sparse_cols = 1.to(26).map{i=>make_sparse(col(s"C$i"), lit(i-1)).as(s"C${i}")}

var sparse_cols = if (mod_idx > 0){
def make_sparse = udf((str:String, i:Long, mod:Long) => (if (str == null) (i+1) * mod else Math.floorMod(Integer.parseUnsignedInt(str, 16).toLong, mod)) + i * mod)
1.to(26).map{i=>make_sparse(col(s"C$i"), lit(i-1), lit(mod_idx)).as(s"C${i}")}
} else {
1.to(26).map{i=>xxhash64(lit(i), col(s"C$i")).as(s"C${i}")}
}

val cols = label_cols ++ dense_cols ++ sparse_cols

spark.read.option("delimiter", "\t").csv(test_csv).toDF(col_names: _*).select(cols:_*).repartition(256).write.parquet(s"${dst_dir}/test")
Expand Down
5 changes: 5 additions & 0 deletions RecommenderSystems/dlrm/tools/launch_spark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export SPARK_LOCAL_DIRS=/tmp/tmp_spark
spark-shell \
--master "local[*]" \
--conf spark.driver.maxResultSize=0 \
--driver-memory 360G
7 changes: 7 additions & 0 deletions RecommenderSystems/dlrm/tools/split_day_23.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# split day_23 to test.csv and val.csv
src_dir="/path/to/unziped/criteo1t"
tmp_dir="/path/to/tmp_spark"

# total 178274637, test 89137319, val 89137318
head -n 89137319 $src_dir/day_23 > $tmp_dir/test.csv
tail -n +89137320 $src_dir/day_23 > $tmp_dir/val.csv