Skip to content

Commit 89947fd

Browse files
authored
fix jsonToRowTest race condition (#36616)
* fix race condition * update based on comments
1 parent 1f63118 commit 89947fd

File tree

1 file changed

+16
-15
lines changed

1 file changed

+16
-15
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,26 +46,21 @@ public class RowJsonUtils {
4646
/**
4747
* Increase the default jackson-databind stream read constraint.
4848
*
49-
* <p>StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0)
50-
* parsing failure. This has caused regressions in its dependencies include Beam. Here we
51-
* overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit.
52-
* If needed, call this method during pipeline run time, e.g. in DoFn.setup.
49+
* <p>In Jackson 2.15, a new constraint is added on the max string length of JSON parsing, see
50+
* https://github.com/FasterXML/jackson-core/issues/863. The default is 20M characters. This is
51+
* too small for some of our users. This method allows users to increase this limit.
5352
*/
54-
public static void increaseDefaultStreamReadConstraints(int newLimit) {
55-
if (newLimit <= defaultBufferLimit) {
53+
public static synchronized void increaseDefaultStreamReadConstraints(int newLimit) {
54+
if (!STREAM_READ_CONSTRAINTS_AVAILABLE) {
5655
return;
5756
}
58-
try {
59-
Class<?> unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints");
60-
57+
if (newLimit > defaultBufferLimit) {
6158
com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints(
6259
com.fasterxml.jackson.core.StreamReadConstraints.builder()
6360
.maxStringLength(newLimit)
6461
.build());
65-
} catch (ClassNotFoundException e) {
66-
// <2.15, do nothing
62+
defaultBufferLimit = newLimit;
6763
}
68-
defaultBufferLimit = newLimit;
6964
}
7065

7166
static {
@@ -103,11 +98,17 @@ static void setStreamReadConstraints(JsonFactory jsonFactory, int sizeLimit) {
10398
*/
10499
public static JsonFactory createJsonFactory(int sizeLimit) {
105100
sizeLimit = Math.max(sizeLimit, MAX_STRING_LENGTH);
106-
JsonFactory jsonFactory = new JsonFactory();
107101
if (STREAM_READ_CONSTRAINTS_AVAILABLE) {
108-
StreamReadConstraintsHelper.setStreamReadConstraints(jsonFactory, sizeLimit);
102+
// Synchronize to avoid race condition with increaseDefaultStreamReadConstraints
103+
// which modifies static defaults that builder() and new JsonFactory() may read.
104+
synchronized (RowJsonUtils.class) {
105+
JsonFactory jsonFactory = new JsonFactory();
106+
StreamReadConstraintsHelper.setStreamReadConstraints(jsonFactory, sizeLimit);
107+
return jsonFactory;
108+
}
109+
} else {
110+
return new JsonFactory();
109111
}
110-
return jsonFactory;
111112
}
112113

113114
public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) {

0 commit comments

Comments
 (0)