Skip to content

Commit 702a0b1

Browse files
Message
1 parent 365fedc commit 702a0b1

10 files changed

Lines changed: 38 additions & 65 deletions

r/NAMESPACE

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ S3method("==","arrow::Message")
77
S3method("==","arrow::RecordBatch")
88
S3method("==","arrow::Schema")
99
S3method("==",Array)
10-
S3method(MessageReader,InputStream)
11-
S3method(MessageReader,default)
1210
S3method(RecordBatchFileReader,Buffer)
1311
S3method(RecordBatchFileReader,RandomAccessFile)
1412
S3method(RecordBatchFileReader,character)
@@ -31,12 +29,12 @@ S3method(parquet_file_reader,RandomAccessFile)
3129
S3method(parquet_file_reader,character)
3230
S3method(parquet_file_reader,raw)
3331
S3method(print,"arrow-enum")
34-
S3method(read_message,"arrow::MessageReader")
3532
S3method(read_message,InputStream)
33+
S3method(read_message,MessageReader)
3634
S3method(read_message,default)
37-
S3method(read_record_batch,"arrow::Message")
3835
S3method(read_record_batch,Buffer)
3936
S3method(read_record_batch,InputStream)
37+
S3method(read_record_batch,Message)
4038
S3method(read_record_batch,raw)
4139
S3method(read_schema,"arrow::Message")
4240
S3method(read_schema,Buffer)

r/R/enums.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ FileMode <- enum("FileMode",
6969

7070
#' @rdname enums
7171
#' @export
72-
MessageType <- enum("arrow::Message::Type",
72+
MessageType <- enum("Message::Type",
7373
NONE = 0L, SCHEMA = 1L, DICTIONARY_BATCH = 2L, RECORD_BATCH = 3L, TENSOR = 4L
7474
)
7575

r/R/message.R

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
#'
3030
#' @rdname arrow__ipc__Message
3131
#' @name arrow__ipc__Message
32-
`arrow::Message` <- R6Class("arrow::Message", inherit = Object,
32+
Message <- R6Class("Message", inherit = Object,
3333
public = list(
3434
Equals = function(other){
35-
assert_that(inherits(other, "arrow::Message"))
35+
assert_that(inherits(other, "Message"))
3636
ipc___Message__Equals(self, other)
3737
},
3838
body_length = function() ipc___Message__body_length(self),
@@ -60,29 +60,18 @@
6060
#'
6161
#' @rdname arrow__ipc__MessageReader
6262
#' @name arrow__ipc__MessageReader
63-
`arrow::MessageReader` <- R6Class("arrow::MessageReader", inherit = Object,
63+
#' @export
64+
MessageReader <- R6Class("MessageReader", inherit = Object,
6465
public = list(
65-
ReadNextMessage = function() unique_ptr(`arrow::Message`, ipc___MessageReader__ReadNextMessage(self))
66+
ReadNextMessage = function() unique_ptr(Message, ipc___MessageReader__ReadNextMessage(self))
6667
)
6768
)
6869

69-
#' Open a MessageReader that reads from a stream
70-
#'
71-
#' @param stream an InputStream
72-
#'
73-
#' @export
74-
MessageReader <- function(stream) {
75-
UseMethod("MessageReader")
76-
}
77-
78-
#' @export
79-
MessageReader.default <- function(stream) {
80-
MessageReader(BufferReader$create(stream))
81-
}
82-
83-
#' @export
84-
MessageReader.InputStream <- function(stream) {
85-
unique_ptr(`arrow::MessageReader`, ipc___MessageReader__Open(stream))
70+
MessageReader$create <- function(stream) {
71+
if (!inherits(stream, "InputStream")) {
72+
stream <- BufferReader$create(stream)
73+
}
74+
unique_ptr(MessageReader, ipc___MessageReader__Open(stream))
8675
}
8776

8877
#' Read a Message from a stream
@@ -95,16 +84,16 @@ read_message <- function(stream) {
9584
}
9685

9786
#' @export
98-
read_message.default<- function(stream) {
87+
read_message.default <- function(stream) {
9988
read_message(BufferReader$create(stream))
10089
}
10190

10291
#' @export
10392
read_message.InputStream <- function(stream) {
104-
unique_ptr(`arrow::Message`, ipc___ReadMessage(stream) )
93+
unique_ptr(Message, ipc___ReadMessage(stream) )
10594
}
10695

10796
#' @export
108-
`read_message.arrow::MessageReader` <- function(stream) {
97+
read_message.MessageReader <- function(stream) {
10998
stream$ReadNextMessage()
11099
}

r/R/read_record_batch.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ read_record_batch <- function(obj, schema){
2828
}
2929

3030
#' @export
31-
`read_record_batch.arrow::Message` <- function(obj, schema) {
31+
read_record_batch.Message <- function(obj, schema) {
3232
assert_that(inherits(schema, "arrow::Schema"))
3333
shared_ptr(`arrow::RecordBatch`, ipc___ReadRecordBatch__Message__Schema(obj, schema))
3434
}
@@ -47,7 +47,7 @@ read_record_batch.raw <- function(obj, schema){
4747
}
4848

4949
#' @export
50-
`read_record_batch.Buffer` <- function(obj, schema){
50+
read_record_batch.Buffer <- function(obj, schema){
5151
stream <- BufferReader$create(obj)
5252
on.exit(stream$close())
5353
read_record_batch(stream, schema)

r/man/MessageReader.Rd

Lines changed: 0 additions & 14 deletions
This file was deleted.

r/man/arrow__ipc__Message.Rd

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

r/man/arrow__ipc__MessageReader.Rd

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

r/tests/testthat/test-message.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
context("arrow::Message")
18+
context("Message")
1919

2020
test_that("read_message can read from input stream", {
2121
batch <- record_batch(x = 1:10)
2222
bytes <- batch$serialize()
2323
stream <- BufferReader$create(bytes)
2424

2525
message <- read_message(stream)
26-
expect_is(message, "arrow::Message")
26+
expect_is(message, "Message")
2727
expect_equal(message$type, MessageType$RECORD_BATCH)
2828
expect_is(message$body, "Buffer")
2929
expect_is(message$metadata, "Buffer")
@@ -37,7 +37,7 @@ test_that("read_message() can read Schema messages", {
3737
stream <- BufferReader$create(bytes)
3838
message <- read_message(stream)
3939

40-
expect_is(message, "arrow::Message")
40+
expect_is(message, "Message")
4141
expect_equal(message$type, MessageType$SCHEMA)
4242
expect_is(message$body, "Buffer")
4343
expect_is(message$metadata, "Buffer")

r/tests/testthat/test-messagereader.R

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
context("arrow::MessageReader")
18+
context("MessageReader")
1919

2020
test_that("MessageReader can be created from raw vectors", {
2121
batch <- record_batch(x = 1:10)
2222
bytes <- batch$serialize()
2323

24-
reader <- MessageReader(bytes)
24+
reader <- MessageReader$create(bytes)
2525

2626
message <- reader$ReadNextMessage()
27-
expect_is(message, "arrow::Message")
27+
expect_is(message, "Message")
2828
expect_equal(message$type, MessageType$RECORD_BATCH)
2929
expect_is(message$body, "Buffer")
3030
expect_is(message$metadata, "Buffer")
@@ -35,10 +35,10 @@ test_that("MessageReader can be created from raw vectors", {
3535
schema <- schema(x = int32())
3636
bytes <- schema$serialize()
3737

38-
reader <- MessageReader(bytes)
38+
reader <- MessageReader$create(bytes)
3939

4040
message <- reader$ReadNextMessage()
41-
expect_is(message, "arrow::Message")
41+
expect_is(message, "Message")
4242
expect_equal(message$type, MessageType$SCHEMA)
4343
expect_is(message$body, "Buffer")
4444
expect_is(message$metadata, "Buffer")
@@ -54,11 +54,11 @@ test_that("MessageReader can be created from input stream", {
5454
stream <- BufferReader$create(bytes)
5555
expect_is(stream, "BufferReader")
5656

57-
reader <- MessageReader(stream)
58-
expect_is(reader, "arrow::MessageReader")
57+
reader <- MessageReader$create(stream)
58+
expect_is(reader, "MessageReader")
5959

6060
message <- reader$ReadNextMessage()
61-
expect_is(message, "arrow::Message")
61+
expect_is(message, "Message")
6262
expect_equal(message$type, MessageType$RECORD_BATCH)
6363
expect_is(message$body, "Buffer")
6464
expect_is(message$metadata, "Buffer")
@@ -72,11 +72,11 @@ test_that("MessageReader can be created from input stream", {
7272
stream <- BufferReader$create(bytes)
7373
expect_is(stream, "BufferReader")
7474

75-
reader <- MessageReader(stream)
76-
expect_is(reader, "arrow::MessageReader")
75+
reader <- MessageReader$create(stream)
76+
expect_is(reader, "MessageReader")
7777

7878
message <- reader$ReadNextMessage()
79-
expect_is(message, "arrow::Message")
79+
expect_is(message, "Message")
8080
expect_equal(message$type, MessageType$SCHEMA)
8181
expect_is(message$body, "Buffer")
8282
expect_is(message$metadata, "Buffer")

r/tests/testthat/test-schema.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,17 @@ test_that("reading schema from Buffer", {
3838
buffer <- stream$getvalue()
3939
expect_is(buffer, "Buffer")
4040

41-
reader <- MessageReader(buffer)
42-
expect_is(reader, "arrow::MessageReader")
41+
reader <- MessageReader$create(buffer)
42+
expect_is(reader, "MessageReader")
4343

4444
message <- reader$ReadNextMessage()
45-
expect_is(message, "arrow::Message")
45+
expect_is(message, "Message")
4646
expect_equal(message$type, MessageType$SCHEMA)
4747

4848
stream <- BufferReader$create(buffer)
4949
expect_is(stream, "BufferReader")
5050
message <- read_message(stream)
51-
expect_is(message, "arrow::Message")
51+
expect_is(message, "Message")
5252
expect_equal(message$type, MessageType$SCHEMA)
5353
})
5454

0 commit comments

Comments
 (0)