Skip to content

Conversation

@bersprockets
Copy link
Contributor

@bersprockets bersprockets commented Nov 2, 2020

What changes were proposed in this pull request?

This PR adds a check to RowReader#hasNextRow such that multiple calls to RowReader#hasNextRow with no intervening call to RowReader#nextRow will avoid consuming more than 1 record.

This PR also modifies RowReader#nextRow such that consecutive calls will return new rows (previously consecutive calls would return the same row).

Why are the changes needed?

SPARK-32346 slightly refactored the AvroFileFormat and AvroPartitionReaderFactory to use a new iterator-like trait called AvroUtils#RowReader. RowReader#hasNextRow consumes a raw input record and stores the deserialized row for the next call to RowReader#nextRow. Unfortunately, sometimes hasNextRow is called twice before nextRow is called, resulting in a lost row.

For example (which assumes V1 Avro reader):

val df = spark.range(0, 25).toDF("index")
df.write.mode("overwrite").format("avro").save("index_avro")
val loaded = spark.read.format("avro").load("index_avro")
// The following will give the expected size
loaded.collect.size
// The following will give the wrong size
loaded.orderBy("index").collect.size

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added tests, which fail without the fix.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix seems good

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Test build #130511 has finished for PR 30221 at commit 9597080.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bersprockets bersprockets changed the title [SPARK-33314][SQL][WIP] Avoid dropping rows in Avro reader [SPARK-33314][SQL] Avoid dropping rows in Avro reader Nov 2, 2020
@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Test build #130512 has finished for PR 30221 at commit 57e10c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

cc @gengliangwang FYI

Comment on lines 179 to 182
if (!interveningNext) {
// until a row is consumed, return previous result of hasNextRow
return prevHasNextRow
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just reset currentRow in nextRow and check currentRow.isDefined here?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Nov 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also feel @viirya 's suggestion would be simpler.

In addition, looks like the implementation didn't respect the Iterator's contracts - calling hasNextRow explicitly shouldn't be prerequisite to call nextRow.

Below code would fix the original issue (as this code passes the new test), as well as it would also work for the case which only calls nextRow with handling NoSuchElementException.

    def hasNextRow: Boolean = {
      while (!completed && currentRow.isEmpty) {
        val r = fileReader.hasNext && !fileReader.pastSync(stopPosition)
        if (!r) {
          fileReader.close()
          completed = true
          currentRow = None
        } else {
          val record = fileReader.next()
          currentRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]]
        }
      }

      currentRow.isDefined
    }

    def nextRow: InternalRow = {
      if (currentRow.isEmpty) {
        if (!hasNextRow) {
          throw new NoSuchElementException("next on empty iterator")
        }
      }

      val row = currentRow.get
      currentRow = None
      row
    }

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same thought initially but then I realised that @bersprockets might have wanted to do the least aggressive way. For example, previously we could get the same row from the next call of nextRow. After the approach above, we cannot although I think it's fine. I don't mind either way.

Copy link
Contributor Author

@bersprockets bersprockets Nov 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this approach is better, because it also handles the case where nextRow is called multiple times (which previously, would return the same row over and over, which, as @HyukjinKwon pointed out would keep the status quo, but it probably wouldn't be correct, since the users of this code are Iterator implementations).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the API semantics perspective, shouldn't nextRow return the next row? It looks okay if hasNextRow has been called multiple times before nextRow is called. But it sounds weird that nextRow will be called with the same row. As a fix this looks fine, but the API, if it is called like that way, sounds a weird design, in particular it is documented as iterator-like interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it sounds weird that nextRow will be called with the same row.

Yes, I agree. As I mentioned above, leaving it that way would keep the status quo, but probably wouldn't be correct since the users of this RowReader are Iterator implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up to my previous comment: As @HyukjinKwon pointed out, I was earlier trying to fix the stated bug without changing other behavior. But I should probably fix nextRow while I am at it, which makes RowReader follow a more recognized pattern and makes the actual Iterators that use it more correct.

@gengliangwang
Copy link
Member

gengliangwang commented Nov 2, 2020

So, if there is a scenario that calls hasNextRow multiple times, the code changes in #29145 also bring in perf regression:

I am sorry but shall we consider reverting #29145? CC @MaxGekk @cloud-fan

@bersprockets
Copy link
Contributor Author

I am sorry but shall we consider reverting #29145? CC @MaxGekk @cloud-fan

@gengliangwang I noted only a single extra call to hasNextRow per task, so the issue was not performance but dropped records (I suppose there could be some scenario I don't know about where hasNextRow is called many extra times).

Anyway, both the fix I proposed and the suggested improvements to my proposed fix would alleviate that concern, since deserialization would be called only once per Avro record (regardless of how many times hasNextRow is called).

@SparkQA
Copy link

SparkQA commented Nov 3, 2020

Test build #130545 has finished for PR 30221 at commit 6d1b468.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

I think I'm not qualified to give +1 (as the code change is now closer to my suggestion), but I think this should be OK.

Regarding revert of #29145, I'm not sure how much it hurts to deserialize one element before, as it will need to be deserialized anyway in nextRow. If there're cases caller calls hasNext multiple times and expect the behavior to point to the "different" rows for each call (in any reasons, like performance), it's no longer matching the contract of Iterator, and we'd be better to define another interface for that.

@gengliangwang
Copy link
Member

FYI I just tried and can't find a scenario that has multiple method calls on hasNext() without next(). Now I think we can merge this one instead of reverting #29145.

@bersprockets
Copy link
Contributor Author

FYI I just tried and can't find a scenario that has multiple method calls on hasNext() without next().

@gengliangwang My reprod case is such an example. When BypassMergeSortShuffleWriter#write is driving the scan, there will be multiple consecutive calls to hasNext (at the start of each task). This causes trouble only with V1 Avro. In datasource V2, there seems to be some intervening iterator which properly handles the multiple hasNext calls, therefore protecting the iterator in AvroPartitionReaderFactory from these multiple calls.

I know of no case where there are consecutive calls to next without an intervening hasNext, but the latest commit to this PR handles it.

@SparkQA
Copy link

SparkQA commented Nov 4, 2020

Test build #130585 has finished for PR 30221 at commit 134c12c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

}
}

class AvroRowReaderSuite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bersprockets @HyukjinKwon I just noticed recently that this suite is in AvroSuite.scala. Are there any specific reasons to not place it to AvroRowReaderSuite.scala?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Feel free to move it there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, review the PR #32607

@bersprockets bersprockets deleted the avro_iterator_play branch November 2, 2022 00:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants