Skip to content

Commit 5654e0f

Browse files
committed
JAVA 17 BWARE COMMIT
debug no skip decodeRecode decoder passthough?? except hash columns do not subtract one to String inconsistency debug error remove hash columns in decode fix decode ? parallel convert decoder dummy sparse faster number parsing handle new exception number format exception reduce error logging fix ? check fix handle hash for init metadata decode dummy parallel decode +k for parallel cleanup GetCategorical Map k added stack quicksort dimensions log the failing operation support solve mote info Compressed Remove Empty decompress selection matrix fall back to uncompressed Compressed remove empty columns disable decompressing for not supported ops disable decompressing RMM for SDC disable the decompressing RMM
1 parent 0736c68 commit 5654e0f

File tree

85 files changed

+3207
-414
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+3207
-414
lines changed

src/main/java/org/apache/sysds/common/Builtins.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public enum Builtins {
154154
GARCH("garch", true),
155155
GAUSSIAN_CLASSIFIER("gaussianClassifier", true),
156156
GET_ACCURACY("getAccuracy", true),
157+
GET_CATEGORICAL_MASK("getCategoricalMask", false),
157158
GLM("glm", true),
158159
GLM_PREDICT("glmPredict", true),
159160
GLOVE("glove", true),

src/main/java/org/apache/sysds/common/Opcodes.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ public enum Opcodes {
197197
TRANSFORMMETA("transformmeta", InstructionType.ParameterizedBuiltin),
198198
TRANSFORMENCODE("transformencode", InstructionType.MultiReturnParameterizedBuiltin, InstructionType.MultiReturnBuiltin),
199199

200+
GET_CATEGORICAL_MASK("get_categorical_mask", InstructionType.Binary),
201+
200202
//Ternary instruction opcodes
201203
PM("+*", InstructionType.Ternary),
202204
MINUSMULT("-*", InstructionType.Ternary),

src/main/java/org/apache/sysds/common/Types.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,7 @@ public enum OpOp2 {
639639
MINUS_NZ(false), //sparse-safe minus: X-(mean*ppred(X,0,!=))
640640
LOG_NZ(false), //sparse-safe log; ppred(X,0,"!=")*log(X,0.5)
641641
MINUS1_MULT(false), //1-X*Y
642+
GET_CATEGORICAL_MASK(false), // get transformation mask
642643
QUANTIZE_COMPRESS(false), //quantization-fused compression
643644
UNION_DISTINCT(false);
644645

src/main/java/org/apache/sysds/hops/BinaryOp.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -763,8 +763,8 @@ protected ExecType optFindExecType(boolean transitive) {
763763

764764
checkAndSetForcedPlatform();
765765

766-
DataType dt1 = getInput().get(0).getDataType();
767-
DataType dt2 = getInput().get(1).getDataType();
766+
final DataType dt1 = getInput(0).getDataType();
767+
final DataType dt2 = getInput(1).getDataType();
768768

769769
if( _etypeForced != null ) {
770770
setExecType(_etypeForced);
@@ -812,18 +812,28 @@ else if ( dt1 == DataType.SCALAR && dt2 == DataType.MATRIX ) {
812812
checkAndSetInvalidCPDimsAndSize();
813813
}
814814

815-
//spark-specific decision refinement (execute unary scalar w/ spark input and
815+
// spark-specific decision refinement (execute unary scalar w/ spark input and
816816
// single parent also in spark because it's likely cheap and reduces intermediates)
817-
if(transitive && _etype == ExecType.CP && _etypeForced != ExecType.CP && _etypeForced != ExecType.FED &&
818-
getDataType().isMatrix() // output should be a matrix
819-
&& (dt1.isScalar() || dt2.isScalar()) // one side should be scalar
820-
&& supportsMatrixScalarOperations() // scalar operations
821-
&& !(getInput().get(dt1.isScalar() ? 1 : 0) instanceof DataOp) // input is not checkpoint
822-
&& getInput().get(dt1.isScalar() ? 1 : 0).getParent().size() == 1 // unary scalar is only parent
823-
&& !HopRewriteUtils.isSingleBlock(getInput().get(dt1.isScalar() ? 1 : 0)) // single block triggered exec
824-
&& getInput().get(dt1.isScalar() ? 1 : 0).optFindExecType() == ExecType.SPARK) {
825-
// pull unary scalar operation into spark
826-
_etype = ExecType.SPARK;
817+
if(transitive // we allow transitive Spark operations. continue sequences of spark operations
818+
&& _etype == ExecType.CP // The instruction is currently in CP
819+
&& _etypeForced != ExecType.CP // not forced CP
820+
&& _etypeForced != ExecType.FED // not federated
821+
&& (getDataType().isMatrix() || getDataType().isFrame()) // output should be a matrix or frame
822+
) {
823+
final boolean v1 = getInput(0).isScalarOrVectorBellowBlockSize();
824+
final boolean v2 = getInput(1).isScalarOrVectorBellowBlockSize();
825+
final boolean left = v1 == true; // left side is the vector or scalar
826+
final Hop sparkIn = getInput(left ? 1 : 0);
827+
if((v1 ^ v2) // XOR only one side is allowed to be a vector or a scalar.
828+
&& (supportsMatrixScalarOperations() || op == OpOp2.APPLY_SCHEMA) // supported operation
829+
&& sparkIn.getParent().size() == 1 // only one parent
830+
&& !HopRewriteUtils.isSingleBlock(sparkIn) // single block triggered exec
831+
&& sparkIn.optFindExecType() == ExecType.SPARK // input was spark op.
832+
&& !(sparkIn instanceof DataOp) // input is not checkpoint
833+
) {
834+
// pull operation into spark
835+
_etype = ExecType.SPARK;
836+
}
827837
}
828838

829839
if( OptimizerUtils.ALLOW_BINARY_UPDATE_IN_PLACE &&
@@ -853,7 +863,10 @@ else if( (op == OpOp2.CBIND && getDataType().isList())
853863
|| (op == OpOp2.RBIND && getDataType().isList())) {
854864
_etype = ExecType.CP;
855865
}
856-
866+
867+
if( op == OpOp2.GET_CATEGORICAL_MASK)
868+
_etype = ExecType.CP;
869+
857870
//mark for recompile (forever)
858871
setRequiresRecompileIfNecessary();
859872

src/main/java/org/apache/sysds/hops/Hop.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,12 @@ public final String toString() {
10451045
// ========================================================================================
10461046

10471047

1048+
protected boolean isScalarOrVectorBellowBlockSize(){
1049+
return getDataType().isScalar() || (dimsKnown() &&
1050+
(( _dc.getRows() == 1 && _dc.getCols() < ConfigurationManager.getBlocksize())
1051+
|| _dc.getCols() == 1 && _dc.getRows() < ConfigurationManager.getBlocksize()));
1052+
}
1053+
10481054
protected boolean isVector() {
10491055
return (dimsKnown() && (_dc.getRows() == 1 || _dc.getCols() == 1) );
10501056
}
@@ -1629,6 +1635,11 @@ protected void setMemoryAndComputeEstimates(Lop lop) {
16291635
lop.setComputeEstimate(ComputeCost.getHOPComputeCost(this));
16301636
}
16311637

1638+
protected boolean hasSparkOutput(){
1639+
return (this.optFindExecType() == ExecType.SPARK
1640+
|| (this instanceof DataOp && ((DataOp)this).hasOnlyRDD()));
1641+
}
1642+
16321643
/**
16331644
* Set parse information.
16341645
*

src/main/java/org/apache/sysds/hops/UnaryOp.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,11 @@ protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
366366
} else {
367367
sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz);
368368
}
369-
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity, getDataType());
369+
370+
if(getDataType() == DataType.FRAME)
371+
return OptimizerUtils.estimateSizeExactFrame(dim1, dim2);
372+
else
373+
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
370374
}
371375

372376
@Override
@@ -463,6 +467,13 @@ public boolean isMetadataOperation() {
463467
|| _op == OpOp1.CAST_AS_LIST;
464468
}
465469

470+
private boolean isDisallowedSparkOps(){
471+
return isCumulativeUnaryOperation()
472+
|| isCastUnaryOperation()
473+
|| _op==OpOp1.MEDIAN
474+
|| _op==OpOp1.IQM;
475+
}
476+
466477
@Override
467478
protected ExecType optFindExecType(boolean transitive)
468479
{
@@ -493,19 +504,22 @@ else if ( getInput().get(0).areDimsBelowThreshold() || getInput().get(0).isVecto
493504
checkAndSetInvalidCPDimsAndSize();
494505
}
495506

507+
496508
//spark-specific decision refinement (execute unary w/ spark input and
497509
//single parent also in spark because it's likely cheap and reduces intermediates)
498-
if( _etype == ExecType.CP && _etypeForced != ExecType.CP
499-
&& getInput().get(0).optFindExecType() == ExecType.SPARK
500-
&& getDataType().isMatrix()
501-
&& !isCumulativeUnaryOperation() && !isCastUnaryOperation()
502-
&& _op!=OpOp1.MEDIAN && _op!=OpOp1.IQM
503-
&& !(getInput().get(0) instanceof DataOp) //input is not checkpoint
504-
&& getInput().get(0).getParent().size()==1 ) //unary is only parent
505-
{
510+
if(_etype == ExecType.CP // currently CP instruction
511+
&& _etype != ExecType.SPARK /// currently not SP.
512+
&& _etypeForced != ExecType.CP // not forced as CP instruction
513+
&& getInput(0).hasSparkOutput() // input is a spark instruction
514+
&& (getDataType().isMatrix() || getDataType().isFrame()) // output is a matrix or frame
515+
&& !isDisallowedSparkOps() // is invalid spark instruction
516+
// && !(getInput().get(0) instanceof DataOp) // input is not checkpoint
517+
// && getInput(0).getParent().size() <= 1// unary is only parent
518+
) {
506519
//pull unary operation into spark
507520
_etype = ExecType.SPARK;
508521
}
522+
509523

510524
//mark for recompile (forever)
511525
setRequiresRecompileIfNecessary();
@@ -520,7 +534,7 @@ && getInput().get(0).getParent().size()==1 ) //unary is only parent
520534
} else {
521535
setRequiresRecompileIfNecessary();
522536
}
523-
537+
524538
return _etype;
525539
}
526540

src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,6 +2018,15 @@ else if(this.getOpCode() == Builtins.MAX_POOL || this.getOpCode() == Builtins.AV
20182018
else
20192019
raiseValidateError("The compress or decompress instruction is not allowed in dml scripts");
20202020
break;
2021+
case GET_CATEGORICAL_MASK:
2022+
checkNumParameters(2);
2023+
checkFrameParam(getFirstExpr());
2024+
checkScalarParam(getSecondExpr());
2025+
output.setDataType(DataType.MATRIX);
2026+
output.setDimensions(1, -1);
2027+
output.setBlocksize( id.getBlocksize());
2028+
output.setValueType(ValueType.FP64);
2029+
break;
20212030
case QUANTIZE_COMPRESS:
20222031
if(OptimizerUtils.ALLOW_SCRIPT_LEVEL_QUANTIZE_COMPRESS_COMMAND) {
20232032
checkNumParameters(2);
@@ -2383,6 +2392,13 @@ protected void checkMatrixFrameParam(Expression e) { //always unconditional
23832392
raiseValidateError("Expecting matrix or frame parameter for function "+ getOpCode(), false, LanguageErrorCodes.UNSUPPORTED_PARAMETERS);
23842393
}
23852394
}
2395+
2396+
protected void checkFrameParam(Expression e) {
2397+
if(e.getOutput().getDataType() != DataType.FRAME) {
2398+
raiseValidateError("Expecting frame parameter for function " + getOpCode(), false,
2399+
LanguageErrorCodes.UNSUPPORTED_PARAMETERS);
2400+
}
2401+
}
23862402

23872403
protected void checkMatrixScalarParam(Expression e) { //always unconditional
23882404
if (e.getOutput().getDataType() != DataType.MATRIX && e.getOutput().getDataType() != DataType.SCALAR) {

src/main/java/org/apache/sysds/parser/DMLTranslator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2821,6 +2821,9 @@ else if ( in.length == 2 )
28212821
DataType.MATRIX, target.getValueType(), AggOp.COUNT_DISTINCT, Direction.Col, expr);
28222822
break;
28232823

2824+
case GET_CATEGORICAL_MASK:
2825+
currBuiltinOp = new BinaryOp(target.getName(), DataType.MATRIX, ValueType.FP64, OpOp2.GET_CATEGORICAL_MASK, expr, expr2);
2826+
break;
28242827
default:
28252828
throw new ParseException("Unsupported builtin function type: "+source.getOpCode());
28262829
}

src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@
5858
import org.apache.sysds.runtime.compress.lib.CLALibMMChain;
5959
import org.apache.sysds.runtime.compress.lib.CLALibMatrixMult;
6060
import org.apache.sysds.runtime.compress.lib.CLALibMerge;
61-
import org.apache.sysds.runtime.compress.lib.CLALibReplace;
61+
import org.apache.sysds.runtime.compress.lib.CLALibRemoveEmpty;
6262
import org.apache.sysds.runtime.compress.lib.CLALibReorg;
63+
import org.apache.sysds.runtime.compress.lib.CLALibReplace;
6364
import org.apache.sysds.runtime.compress.lib.CLALibReshape;
6465
import org.apache.sysds.runtime.compress.lib.CLALibRexpand;
6566
import org.apache.sysds.runtime.compress.lib.CLALibScalar;
6667
import org.apache.sysds.runtime.compress.lib.CLALibSlice;
68+
import org.apache.sysds.runtime.compress.lib.CLALibSort;
6769
import org.apache.sysds.runtime.compress.lib.CLALibSquash;
6870
import org.apache.sysds.runtime.compress.lib.CLALibTSMM;
6971
import org.apache.sysds.runtime.compress.lib.CLALibTernaryOp;
@@ -101,6 +103,7 @@
101103
import org.apache.sysds.runtime.util.IndexRange;
102104
import org.apache.sysds.utils.DMLCompressionStatistics;
103105
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
106+
import org.apache.sysds.utils.stats.Timing;
104107

105108
public class CompressedMatrixBlock extends MatrixBlock {
106109
private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
@@ -475,16 +478,20 @@ public void readFields(DataInput in) throws IOException {
475478
}
476479

477480
public static CompressedMatrixBlock read(DataInput in) throws IOException {
481+
Timing t = new Timing();
478482
int rlen = in.readInt();
479483
int clen = in.readInt();
480484
long nonZeros = in.readLong();
481485
boolean overlappingColGroups = in.readBoolean();
482486
List<AColGroup> groups = ColGroupIO.readGroups(in, rlen);
483-
return new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups);
487+
CompressedMatrixBlock ret = new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups);
488+
LOG.debug("Compressed read serialization time: " + t.stop());
489+
return ret;
484490
}
485491

486492
@Override
487493
public void write(DataOutput out) throws IOException {
494+
Timing t = new Timing();
488495
final long estimateUncompressed = nonZeros > 0 ? MatrixBlock.estimateSizeOnDisk(rlen, clen,
489496
nonZeros) : Long.MAX_VALUE;
490497
final long estDisk = nonZeros > 0 ? getExactSizeOnDisk() : Long.MAX_VALUE;
@@ -512,6 +519,7 @@ public void write(DataOutput out) throws IOException {
512519
out.writeLong(nonZeros);
513520
out.writeBoolean(overlappingColGroups);
514521
ColGroupIO.writeGroups(out, _colGroups);
522+
LOG.debug("Compressed write serialization time: " + t.stop());
515523
}
516524

517525
/**
@@ -611,14 +619,6 @@ public MatrixBlock aggregateUnaryOperations(AggregateUnaryOperator op, MatrixVal
611619
public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k) {
612620
// check for transpose type
613621
if(tstype == MMTSJType.LEFT) {
614-
if(isEmpty())
615-
return new MatrixBlock(clen, clen, true);
616-
// create output matrix block
617-
if(out == null)
618-
out = new MatrixBlock(clen, clen, false);
619-
else
620-
out.reset(clen, clen, false);
621-
out.allocateDenseBlock();
622622
CLALibTSMM.leftMultByTransposeSelf(this, out, k);
623623
return out;
624624
}
@@ -846,9 +846,8 @@ public CM_COV_Object covOperations(COVOperator op, MatrixBlock that, MatrixBlock
846846
}
847847

848848
@Override
849-
public MatrixBlock sortOperations(MatrixValue weights, MatrixBlock result) {
850-
MatrixBlock right = getUncompressed(weights);
851-
return getUncompressed("sortOperations").sortOperations(right, result);
849+
public MatrixBlock sortOperations(MatrixValue weights, MatrixBlock result, int k) {
850+
return CLALibSort.sort(this, weights, result, k);
852851
}
853852

854853
@Override
@@ -871,9 +870,7 @@ public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, Matr
871870

872871
@Override
873872
public MatrixBlock removeEmptyOperations(MatrixBlock ret, boolean rows, boolean emptyReturn, MatrixBlock select) {
874-
printDecompressWarning("removeEmptyOperations");
875-
MatrixBlock tmp = getUncompressed();
876-
return tmp.removeEmptyOperations(ret, rows, emptyReturn, select);
873+
return CLALibRemoveEmpty.rmempty(this, ret, rows, emptyReturn, select);
877874
}
878875

879876
@Override
@@ -1202,8 +1199,8 @@ public void examSparsity(boolean allowCSR, int k) {
12021199
}
12031200

12041201
@Override
1205-
public void sparseToDense(int k) {
1206-
// do nothing
1202+
public MatrixBlock sparseToDense(int k) {
1203+
return this; // do nothing
12071204
}
12081205

12091206
@Override
@@ -1236,16 +1233,6 @@ public double interQuartileMean() {
12361233
return getUncompressed("interQuartileMean").interQuartileMean();
12371234
}
12381235

1239-
@Override
1240-
public MatrixBlock pickValues(MatrixValue quantiles, MatrixValue ret) {
1241-
return getUncompressed("pickValues").pickValues(quantiles, ret);
1242-
}
1243-
1244-
@Override
1245-
public double pickValue(double quantile, boolean average) {
1246-
return getUncompressed("pickValue").pickValue(quantile, average);
1247-
}
1248-
12491236
@Override
12501237
public double sumWeightForQuantile() {
12511238
return getUncompressed("sumWeightForQuantile").sumWeightForQuantile();

src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class CompressedMatrixBlockFactory {
6464

6565
private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName());
6666

67+
private static final Object asyncCompressLock = new Object();
68+
6769
/** Timing object to measure the time of each phase in the compression */
6870
private final Timing time = new Timing(true);
6971
/** Compression statistics gathered throughout the compression */
@@ -181,21 +183,23 @@ public static Future<Void> compressAsync(ExecutionContext ec, String varName) {
181183
}
182184

183185
public static Future<Void> compressAsync(ExecutionContext ec, String varName, InstructionTypeCounter ins) {
184-
LOG.debug("Compressing Async");
185186
final ExecutorService pool = CommonThreadPool.get(); // We have to guarantee that a thread pool is allocated.
186187
return CompletableFuture.runAsync(() -> {
187188
// method call or code to be async
188189
try {
189190
CacheableData<?> data = ec.getCacheableData(varName);
190-
if(data instanceof MatrixObject) {
191-
MatrixObject mo = (MatrixObject) data;
192-
MatrixBlock mb = mo.acquireReadAndRelease();
193-
MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mo.acquireReadAndRelease(), ins).getLeft();
194-
if(mbc instanceof CompressedMatrixBlock) {
195-
ExecutionContext.createCacheableData(mb);
196-
mo.acquireModify(mbc);
197-
mo.release();
198-
mbc.sum(); // calculate sum to forcefully materialize counts
191+
synchronized(asyncCompressLock){ // synchronize on the data object to not allow multiple compressions of the same matrix.
192+
if(data instanceof MatrixObject) {
193+
LOG.debug("Compressing Async");
194+
MatrixObject mo = (MatrixObject) data;
195+
MatrixBlock mb = mo.acquireReadAndRelease();
196+
MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mb, ins).getLeft();
197+
if(mbc instanceof CompressedMatrixBlock) {
198+
ExecutionContext.createCacheableData(mb);
199+
mo.acquireModify(mbc);
200+
mo.release();
201+
mbc.sum(); // calculate sum to forcefully materialize counts
202+
}
199203
}
200204
}
201205
}

0 commit comments

Comments
 (0)