Skip to content

Commit 964ee4b

Browse files
committed
Merge branch 'master' into sc-2813
2 parents 2542d01 + 3e29e37 commit 964ee4b

File tree

224 files changed

+7626
-3204
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

224 files changed

+7626
-3204
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ exportMethods("%in%",
265265
"var_samp",
266266
"weekofyear",
267267
"when",
268+
"window",
268269
"year")
269270

270271
exportClasses("GroupedData")

R/pkg/R/functions.R

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2131,6 +2131,69 @@ setMethod("from_unixtime", signature(x = "Column"),
21312131
column(jc)
21322132
})
21332133

2134+
#' window
2135+
#'
2136+
#' Bucketize rows into one or more time windows given a timestamp specifying column. Window
2137+
#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
2138+
#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
2139+
#' the order of months are not supported.
2140+
#'
2141+
#' The time column must be of TimestampType.
2142+
#'
2143+
#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
2144+
#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
2145+
#' If the `slideDuration` is not provided, the windows will be tumbling windows.
2146+
#'
2147+
#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
2148+
#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
2149+
#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
2150+
#'
2151+
#' The output column will be a struct called 'window' by default with the nested columns 'start'
2152+
#' and 'end'.
2153+
#'
2154+
#' @family datetime_funcs
2155+
#' @rdname window
2156+
#' @name window
2157+
#' @export
2158+
#' @examples
2159+
#'\dontrun{
2160+
#' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10,
2161+
#' # 09:00:25-09:01:25, 09:00:40-09:01:40, ...
2162+
#' window(df$time, "1 minute", "15 seconds", "10 seconds")
2163+
#'
2164+
#' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15,
2165+
#' # 09:01:15-09:02:15...
2166+
#' window(df$time, "1 minute", startTime = "15 seconds")
2167+
#'
2168+
#' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ...
2169+
#' window(df$time, "30 seconds", "10 seconds")
2170+
#'}
2171+
setMethod("window", signature(x = "Column"),
2172+
function(x, windowDuration, slideDuration = NULL, startTime = NULL) {
2173+
stopifnot(is.character(windowDuration))
2174+
if (!is.null(slideDuration) && !is.null(startTime)) {
2175+
stopifnot(is.character(slideDuration) && is.character(startTime))
2176+
jc <- callJStatic("org.apache.spark.sql.functions",
2177+
"window",
2178+
x@jc, windowDuration, slideDuration, startTime)
2179+
} else if (!is.null(slideDuration)) {
2180+
stopifnot(is.character(slideDuration))
2181+
jc <- callJStatic("org.apache.spark.sql.functions",
2182+
"window",
2183+
x@jc, windowDuration, slideDuration)
2184+
} else if (!is.null(startTime)) {
2185+
stopifnot(is.character(startTime))
2186+
jc <- callJStatic("org.apache.spark.sql.functions",
2187+
"window",
2188+
x@jc, windowDuration, windowDuration, startTime)
2189+
} else {
2190+
jc <- callJStatic("org.apache.spark.sql.functions",
2191+
"window",
2192+
x@jc, windowDuration)
2193+
}
2194+
column(jc)
2195+
})
2196+
21342197
#' locate
21352198
#'
21362199
#' Locate the position of the first occurrence of substr.

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,6 +1152,10 @@ setGeneric("var_samp", function(x) { standardGeneric("var_samp") })
11521152
#' @export
11531153
setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })
11541154

1155+
#' @rdname window
1156+
#' @export
1157+
setGeneric("window", function(x, ...) { standardGeneric("window") })
1158+
11551159
#' @rdname year
11561160
#' @export
11571161
setGeneric("year", function(x) { standardGeneric("year") })

R/pkg/inst/tests/testthat/test_context.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ test_that("Check masked functions", {
2626
maskedBySparkR <- masked[funcSparkROrEmpty]
2727
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
2828
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
29-
"summary", "transform", "drop")
29+
"summary", "transform", "drop", "window")
3030
expect_equal(length(maskedBySparkR), length(namesOfMasked))
3131
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
3232
# above are those reported as masked when `library(SparkR)`

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,42 @@ test_that("greatest() and least() on a DataFrame", {
12041204
expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3))
12051205
})
12061206

1207+
test_that("time windowing (window()) with all inputs", {
1208+
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1209+
df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds")
1210+
local <- collect(df)$v
1211+
# Not checking time windows because of possible time zone issues. Just checking that the function
1212+
# works
1213+
expect_equal(local, c(1))
1214+
})
1215+
1216+
test_that("time windowing (window()) with slide duration", {
1217+
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1218+
df$window <- window(df$t, "5 seconds", "2 seconds")
1219+
local <- collect(df)$v
1220+
# Not checking time windows because of possible time zone issues. Just checking that the function
1221+
# works
1222+
expect_equal(local, c(1, 1))
1223+
})
1224+
1225+
test_that("time windowing (window()) with start time", {
1226+
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1227+
df$window <- window(df$t, "5 seconds", startTime = "2 seconds")
1228+
local <- collect(df)$v
1229+
# Not checking time windows because of possible time zone issues. Just checking that the function
1230+
# works
1231+
expect_equal(local, c(1))
1232+
})
1233+
1234+
test_that("time windowing (window()) with just window duration", {
1235+
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1236+
df$window <- window(df$t, "5 seconds")
1237+
local <- collect(df)$v
1238+
# Not checking time windows because of possible time zone issues. Just checking that the function
1239+
# works
1240+
expect_equal(local, c(1))
1241+
})
1242+
12071243
test_that("when(), otherwise() and ifelse() on a DataFrame", {
12081244
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
12091245
df <- createDataFrame(sqlContext, l)

bin/spark-class

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ fi
4444

4545
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
4646
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
47-
echo "You need to build Spark before running this program." 1>&2
47+
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
4848
exit 1
4949
else
5050
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"

build/mvn

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ install_mvn() {
7272
local MVN_VERSION="3.3.9"
7373

7474
install_app \
75-
"http://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
75+
"https://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
7676
"apache-maven-${MVN_VERSION}-bin.tar.gz" \
7777
"apache-maven-${MVN_VERSION}/bin/mvn"
7878

@@ -84,7 +84,7 @@ install_zinc() {
8484
local zinc_path="zinc-0.3.9/bin/zinc"
8585
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
8686
install_app \
87-
"http://downloads.typesafe.com/zinc/0.3.9" \
87+
"https://downloads.typesafe.com/zinc/0.3.9" \
8888
"zinc-0.3.9.tgz" \
8989
"${zinc_path}"
9090
ZINC_BIN="${_DIR}/${zinc_path}"
@@ -100,7 +100,7 @@ install_scala() {
100100
local scala_bin="${_DIR}/scala-${scala_version}/bin/scala"
101101

102102
install_app \
103-
"http://downloads.typesafe.com/scala/${scala_version}" \
103+
"https://downloads.typesafe.com/scala/${scala_version}" \
104104
"scala-${scala_version}.tgz" \
105105
"scala-${scala_version}/bin/scala"
106106

common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.network.protocol;
1919

2020
import java.io.IOException;
21+
import java.nio.ByteBuffer;
2122
import java.nio.channels.WritableByteChannel;
2223
import javax.annotation.Nullable;
2324

@@ -43,6 +44,14 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
4344
private final long bodyLength;
4445
private long totalBytesTransferred;
4546

47+
/**
48+
* When the write buffer size is larger than this limit, I/O will be done in chunks of this size.
49+
* The size should not be too large as it will waste underlying memory copy. e.g. If network
50+
* avaliable buffer is smaller than this limit, the data cannot be sent within one single write
51+
* operation while it still will make memory copy with this size.
52+
*/
53+
private static final int NIO_BUFFER_LIMIT = 256 * 1024;
54+
4655
/**
4756
* Construct a new MessageWithHeader.
4857
*
@@ -128,8 +137,27 @@ protected void deallocate() {
128137
}
129138

130139
private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
131-
int written = target.write(buf.nioBuffer());
140+
ByteBuffer buffer = buf.nioBuffer();
141+
int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
142+
target.write(buffer) : writeNioBuffer(target, buffer);
132143
buf.skipBytes(written);
133144
return written;
134145
}
146+
147+
private int writeNioBuffer(
148+
WritableByteChannel writeCh,
149+
ByteBuffer buf) throws IOException {
150+
int originalLimit = buf.limit();
151+
int ret = 0;
152+
153+
try {
154+
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
155+
buf.limit(buf.position() + ioSize);
156+
ret = writeCh.write(buf);
157+
} finally {
158+
buf.limit(originalLimit);
159+
}
160+
161+
return ret;
162+
}
135163
}

common/network-yarn/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<!-- Make sure all Hadoop dependencies are provided to avoid repackaging. -->
3737
<hadoop.deps.scope>provided</hadoop.deps.scope>
3838
<shuffle.jar>${project.build.directory}/scala-${scala.binary.version}/spark-${project.version}-yarn-shuffle.jar</shuffle.jar>
39-
<shade>org/spark-project/</shade>
39+
<shade>org/spark_project/</shade>
4040
</properties>
4141

4242
<dependencies>
@@ -91,7 +91,7 @@
9191
<relocations>
9292
<relocation>
9393
<pattern>com.fasterxml.jackson</pattern>
94-
<shadedPattern>org.spark-project.com.fasterxml.jackson</shadedPattern>
94+
<shadedPattern>${spark.shade.packageName}.com.fasterxml.jackson</shadedPattern>
9595
<includes>
9696
<include>com.fasterxml.jackson.**</include>
9797
</includes>

conf/log4j.properties.template

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}:
2828
log4j.logger.org.apache.spark.repl.Main=WARN
2929

3030
# Settings to quiet third party logs that are too verbose
31-
log4j.logger.org.spark-project.jetty=WARN
32-
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
31+
log4j.logger.org.spark_project.jetty=WARN
32+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
3333
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
3434
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
3535
log4j.logger.org.apache.parquet=ERROR

0 commit comments

Comments
 (0)