Skip to content

Conversation

@javierluraschi
Copy link
Contributor

@javierluraschi javierluraschi commented Oct 10, 2018

https://issues.apache.org/jira/browse/ARROW-3479 enabled

library(arrow)

record <- arrow::record_batch(data.frame(a = c(1,2,3)))
stream <- record$to_stream()

This PR implements the reverse operation by running:

read_record_batch_stream(stream)
[[1]]
# A tibble: 3 x 1
      a
  <dbl>
1     1
2     2
3     3

Notice that read_record_batch_stream() returns a list of data frames since the original data is batched. I considered being more efficient and returning a single data frame; however, this is not trivial since we would have to refactor our *Array_to_Vector Rcpp functions to support assembling chunks efficiently. Instead, for my use case, I much rather handle this in Spark to ensure Spark returns a single batch instead of multiple ones. Additionally, the problem of binding lists of data frames to single data frames is well studied in R, as a users I would call dplyr::bind_rows() which has been optimized already. This is not to say that we should not consider optimizing this further, but at least, for my use case, this is good enough to get me going. There is also a case to be said here to support a proper streaming interface with a callback function, etc. Then again, this interface is good enough for me at the moment.

Also, this PR makes read_record_batch() public to have parity to read_record_batch_stream(), the former operates over a file while the latter over a stream. @romainfrancois, I can revert this change if we did not intend to make read_record_batch() public, but it made sense to me to expose both as I was looking at this change.

Regarding performance in sparklyr, data collection is something that we spent significant time optimizing to collect in columnar format already; but there are still improvements.

Without this PR:

library(sparklyr)
library(arrow)
library(dplyr)

sc <- spark_connect(master = "local")

system.time({
    data <- sdf_len(sc, 10^6) %>% transmute(x = rand()) %>% collect()
})
   user  system elapsed 
  0.054   0.014   0.591

While this PR enables:

library(arrow)
system.time({
    data <- sdf_len(sc, 10^6) %>% transmute(x = rand()) %>% collect()
})
   user  system elapsed 
  0.057   0.012   0.360 

But more importantly, this operation:

system.time({
    data <- sdf_len(sc, 10^7) %>% transmute(x = rand()) %>% collect()
})

would break due to java.lang.OutOfMemoryError: Java heap space without Arrow.

While this PR and Arrow enable,

library(arrow)
system.time({
    data <- sdf_len(sc, 10^7) %>% transmute(x = rand()) %>% collect()
})
   user  system elapsed 
  0.490   0.189   3.671 

Which, at least in my system, enables collecting 10X more data than previously supported. By default, sparklyr connects in local mode to Spark with about 500mb of reserved memory in the JVM, this enables collecting about 100mb. Allocating additional memory to Spark's JVM should allow collecting even bigger datasets. The exception is still thrown when datasets of +20% of the allocated memory are collected, but this is a Java exception that I need to troubleshoot in Spark's R backend first, not yet in R's Arrow bindings.

@romainfrancois
Copy link
Contributor

I think it's unfortunate you did this before #2714 is merged.

I'll take the idea of being able to read from a raw vector (not sure it's a good idea to call these streams, as streams are different things, e.g. InputStream ...), so I'll update #2714 with a read_record_batch.raw method so that you can do:

vec <- .... # raw vector with the batch bytes
read_record_batch(vec)

Also having read_record_batch_stream to return a list of data frames does not play well with the rest. I'll rather implement read_table.raw

An arrow::Table is just that, a set of record batch sharing the same schema, and a Table can be converted to a tibble with the canonical as_tibble method.

break;
}

batches.push_back(RecordBatch__to_dataframe(chunk));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😬 you don't really want to use List::push_back

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Rcpp::List is implemented as an Rcpp::Vector. However, when I look at the implementation of push_back() Vector.h#L622-L646, looks like the vector would contain SEXP elements. From what I understand SEXP would wrap the data frame List over each entry in the Vector which would cause the SEXP to be copied many times, but not the underlined data frame, correct?

The contents inside a batch are usually pretty big so there should be only a very few batches coming from Spark, not sure how other clients would use batches though, so as long as the SEXP is not triggering a a copy of the entire data frame for every call to push_bac(), which I believe is not, we would only incur the cost of copying a few SXEPs which shouldn't be terrible.

That said, if there are use cases where one would create a lot of small batches of record_batch records, I agree using push_back() here could be an issue. Is there a more efficient way to append an element to a list in Rcpp? I also wonder why Rcpp relies on this loop:

for( ; it < position; ++it, ++target_it){
  *target_it = *it ;
}

Instead of std::copy() the SEXPs, there is probably a good reason I'm missing.

Did I missed something else to consider?

@wesm wesm changed the title [ARROW-3484] [R] Support to read record_batch from stream ARROW-3484: [R] Support to read record_batch from stream Oct 11, 2018
@javierluraschi
Copy link
Contributor Author

Not unfortunate at all, been learning at all so happy to close this one is needed... However, related to this comment: https://github.com/apache/arrow/pull/2714/files#r224516362, while I could consider using read_record_batch.raw(), it seems like it only reads one record... but I rather need a function that would read many record batches from a raw(). So it feels to me like we still need this function to read the whole stream of record batches, correct? Am I missing something here?

@romainfrancois
Copy link
Contributor

Nope, I am.

@romainfrancois
Copy link
Contributor

An Rcpp::List is indeed a vector of R objects, aka SEXP. They don’t grow, so when you push_back what happens is that a new list with one more element is created and the data is copied. This usually is bad performance. It’s usually better to use a structure that knows how to grow such as a std::vector.

And in any case, dealing with multiple record batches of the same format is the job of an arrow::Table

@romainfrancois
Copy link
Contributor

I need to revise what read_record_batch does, see the comments on the pr that was merged a few hours ago.

@javierluraschi
Copy link
Contributor Author

Closing, functionality was implemented in #2749

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants