Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -70,7 +73,9 @@ public static <OutputT> PTransform<PCollection<Row>, PCollection<OutputT>> fromR
*
* <p>This function allows converting between two types as long as the two types have
* <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i> if they recursively
* have fields with the same names, but possibly different orders.
* have fields with the same names, but possibly different orders. If the source schema can be
* unboxed to match the target schema (i.e. the source schema contains a single field that is
* compatible with the target schema), then conversion also succeeds.
*/
public static <InputT, OutputT> PTransform<PCollection<InputT>, PCollection<OutputT>> to(
Class<OutputT> clazz) {
Expand All @@ -82,7 +87,9 @@ public static <InputT, OutputT> PTransform<PCollection<InputT>, PCollection<Outp
*
* <p>This function allows converting between two types as long as the two types have
* <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i> if they recursively
* have fields with the same names, but possibly different orders.
* have fields with the same names, but possibly different orders. If the source schema can be
* unboxed to match the target schema (i.e. the source schema contains a single field that is
* compatible with the target schema), then conversion also succeeds.
*/
public static <InputT, OutputT> PTransform<PCollection<InputT>, PCollection<OutputT>> to(
TypeDescriptor<OutputT> typeDescriptor) {
Expand All @@ -92,11 +99,24 @@ public static <InputT, OutputT> PTransform<PCollection<InputT>, PCollection<Outp
private static class ConvertTransform<InputT, OutputT>
extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
TypeDescriptor<OutputT> outputTypeDescriptor;
Schema unboxedSchema = null;

ConvertTransform(TypeDescriptor<OutputT> outputTypeDescriptor) {
this.outputTypeDescriptor = outputTypeDescriptor;
}

@Nullable
private static Schema getBoxedNestedSchema(Schema schema) {
if (schema.getFieldCount() != 1) {
return null;
}
FieldType fieldType = schema.getField(0).getType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would it work if the field is nullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the entire Convert transform ignores nullable (this was already a bit of a bug with it). We appear to be missing a JIRA on this issue, so I'll file one.

if (!fieldType.getTypeName().isCompositeType()) {
return null;
}
return fieldType.getRowSchema();
}

@Override
@SuppressWarnings("unchecked")
public PCollection<OutputT> expand(PCollection<InputT> input) {
Expand Down Expand Up @@ -124,15 +144,21 @@ public PCollection<OutputT> expand(PCollection<InputT> input) {
registry.getSchema(outputTypeDescriptor),
registry.getToRowFunction(outputTypeDescriptor),
registry.getFromRowFunction(outputTypeDescriptor));
// assert matches input schema.
// TODO: Properly handle nullable.
if (!outputSchemaCoder.getSchema().assignableToIgnoreNullable(input.getSchema())) {
throw new RuntimeException(
"Cannot convert between types that don't have equivalent schemas."
+ " input schema: "
+ input.getSchema()
+ " output schema: "
+ outputSchemaCoder.getSchema());

Schema outputSchema = outputSchemaCoder.getSchema();
if (!outputSchema.assignableToIgnoreNullable(input.getSchema())) {
// We also support unboxing nested Row schemas, so attempt that.
// TODO: Support unboxing to primitive types as well.
unboxedSchema = getBoxedNestedSchema(input.getSchema());
if (unboxedSchema == null || !outputSchema.assignableToIgnoreNullable(unboxedSchema)) {
Schema checked = (unboxedSchema == null) ? input.getSchema() : unboxedSchema;
throw new RuntimeException(
"Cannot convert between types that don't have equivalent schemas."
+ " input schema: "
+ checked
+ " output schema: "
+ outputSchemaCoder.getSchema());
}
}
} catch (NoSuchSchemaException e) {
throw new RuntimeException("No schema registered for " + outputTypeDescriptor);
Expand All @@ -145,7 +171,9 @@ public PCollection<OutputT> expand(PCollection<InputT> input) {
new DoFn<InputT, OutputT>() {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<OutputT> o) {
o.output(outputSchemaCoder.getFromRowFunction().apply(row));
// Read the row, potentially unboxing if necessary.
Row input = (unboxedSchema == null) ? row : row.getValue(0);
o.output(outputSchemaCoder.getFromRowFunction().apply(input));
}
}))
.setSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
*
* <pre>{@code
* PCollection<UserEvent> events = readUserEvents();
* PCollection<Row> rows = event.apply(Select.fieldNames("location.*"));
* PCollection<Row> rows = event.apply(Select.fieldNames("location")
* .apply(Convert.to(Location.class));
* }</pre>
*/
@Experimental(Kind.SCHEMAS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,86 @@
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;

/** Helper methods to select fields from a Schema. */
/** Helper methods to select subrows out of rows. */
public class SelectHelpers {
// Currently we don't flatten selected nested fields.

private static Schema union(Iterable<Schema> schemas) {
Schema.Builder unioned = Schema.builder();
for (Schema schema : schemas) {
unioned.addFields(schema.getFields());
}
return unioned.build();
}

/**
* Get the output schema resulting from selecting the given {@link FieldAccessDescriptor} from the
* given schema.
*
* <p>Fields are always extracted and then stored in a new Row. For example, consider the
* following Java POJOs:
*
* <pre>{@code
* class UserEvent {
* String userId;
* String eventId;
* int eventType;
* Location location;
* }
* }</pre>
*
* <pre>{@code
* class Location {
* double latitude;
* double longtitude;
* }
* }</pre>
*
* <p>If selecting just the location field, then the returned schema will wrap that of the
* singular field being selected; in this case the returned schema will be a Row containing a
* single Location field. If location.latitude is selected, then the returned Schema will be a Row
* containing a double latitude field.
*
* <p>The same holds true when selecting from lists or maps. For example:
*
* <pre>{@code
* class EventList {
* List<UserEvent> events;
* }
* }</pre>
*
* <p>If selecting events.location.latitude, the returned schema will contain a single array of
* Row, where that Row contains a single double latitude field; it will not contain an array of
* double.
*/
public static Schema getOutputSchema(
Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) {
if (fieldAccessDescriptor.getAllFields()) {
return inputSchema;
}
Schema.Builder builder = new Schema.Builder();

List<Schema> schemas = Lists.newArrayList();
Schema.Builder builder = Schema.builder();
for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
builder.addField(inputSchema.getField(fieldId));
}
schemas.add(builder.build());

for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
FieldDescriptor fieldDescriptor = nested.getKey();
FieldAccessDescriptor nestedAccess = nested.getValue();
Field field = inputSchema.getField(checkNotNull(fieldDescriptor.getFieldId()));

FieldType outputType =
getOutputSchemaHelper(
field.getType(), nested.getValue(), fieldDescriptor.getQualifiers(), 0);
builder.addField(field.getName(), outputType);
getOutputSchemaHelper(field.getType(), nestedAccess, fieldDescriptor.getQualifiers(), 0);
if (outputType.getTypeName().isCompositeType()) {
schemas.add(outputType.getRowSchema());
} else {
schemas.add(Schema.builder().addField(field.getName(), outputType).build());
}
}
return builder.build();

return union(schemas);
}

private static FieldType getOutputSchemaHelper(
Expand Down Expand Up @@ -96,6 +153,7 @@ private static FieldType getOutputSchemaHelper(
}
}

/** Select a sub Row from an input Row. */
public static Row selectRow(
Row input,
FieldAccessDescriptor fieldAccessDescriptor,
Expand All @@ -106,47 +164,73 @@ public static Row selectRow(
}

Row.Builder output = Row.withSchema(outputSchema);
selectIntoRow(input, output, fieldAccessDescriptor);
return output.build();
}

/** Select out of a given {@link Row} object. */
public static void selectIntoRow(
Row input, Row.Builder output, FieldAccessDescriptor fieldAccessDescriptor) {
if (fieldAccessDescriptor.getAllFields()) {
output.addValues(input.getValues());
return;
}

for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
// TODO: Once we support specific qualifiers (like array slices), extract them here.
output.addValue(input.getValue(fieldId));
}

Schema outputSchema = output.getSchema();
for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
FieldDescriptor field = nested.getKey();
String fieldName = inputSchema.nameOf(checkNotNull(field.getFieldId()));
FieldType nestedInputType = inputSchema.getField(field.getFieldId()).getType();
FieldType nestedOutputType = outputSchema.getField(fieldName).getType();
Object value =
selectRowHelper(
field.getQualifiers(),
0,
input.getValue(fieldName),
nested.getValue(),
nestedInputType,
nestedOutputType);
output.addValue(value);
FieldAccessDescriptor nestedAccess = nested.getValue();
FieldType nestedInputType = input.getSchema().getField(field.getFieldId()).getType();
FieldType nestedOutputType = outputSchema.getField(output.nextFieldId()).getType();
selectIntoRowHelper(
field.getQualifiers(),
input.getValue(field.getFieldId()),
output,
nestedAccess,
nestedInputType,
nestedOutputType);
}
return output.build();
}

@SuppressWarnings("unchecked")
private static Object selectRowHelper(
private static void selectIntoRowHelper(
List<Qualifier> qualifiers,
int qualifierPosition,
Object value,
Row.Builder output,
FieldAccessDescriptor fieldAccessDescriptor,
FieldType inputType,
FieldType outputType) {
if (qualifierPosition >= qualifiers.size()) {
if (qualifiers.isEmpty()) {
Row row = (Row) value;
return selectRow(
row, fieldAccessDescriptor, inputType.getRowSchema(), outputType.getRowSchema());
selectIntoRow(row, output, fieldAccessDescriptor);
return;
}

if (fieldAccessDescriptor.getAllFields()) {
// Since we are selecting all fields (and we do not yet support array slicing), short circuit.
return value;
// There are qualifiers. That means that the result will be either a list or a map, so
// construct the result and add that to our Row.
output.addValue(
selectValueHelper(qualifiers, 0, value, fieldAccessDescriptor, inputType, outputType));
}

private static Object selectValueHelper(
List<Qualifier> qualifiers,
int qualifierPosition,
Object value,
FieldAccessDescriptor fieldAccessDescriptor,
FieldType inputType,
FieldType outputType) {
if (qualifierPosition >= qualifiers.size()) {
// We have already constructed all arrays and maps. What remains must be a Row.
Row row = (Row) value;
Row.Builder output = Row.withSchema(outputType.getRowSchema());
selectIntoRow(row, output, fieldAccessDescriptor);
return output.build();
}

Qualifier qualifier = qualifiers.get(qualifierPosition);
Expand All @@ -156,10 +240,10 @@ private static Object selectRowHelper(
FieldType nestedInputType = checkNotNull(inputType.getCollectionElementType());
FieldType nestedOutputType = checkNotNull(outputType.getCollectionElementType());
List<Object> list = (List) value;
List selectedList = Lists.newArrayListWithCapacity(list.size());
List<Object> selectedList = Lists.newArrayListWithCapacity(list.size());
for (Object o : list) {
Object selected =
selectRowHelper(
selectValueHelper(
qualifiers,
qualifierPosition + 1,
o,
Expand All @@ -178,7 +262,7 @@ private static Object selectRowHelper(
Map selectedMap = Maps.newHashMapWithExpectedSize(map.size());
for (Map.Entry<Object, Object> entry : map.entrySet()) {
Object selected =
selectRowHelper(
selectValueHelper(
qualifiers,
qualifierPosition + 1,
entry.getValue(),
Expand Down
11 changes: 11 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,17 @@ public static class Builder {
this.schema = schema;
}

public int nextFieldId() {
if (fieldValueGetterFactory != null) {
throw new RuntimeException("Not supported");
}
return values.size();
}

public Schema getSchema() {
return schema;
}

public Builder addValue(@Nullable Object values) {
this.values.add(values);
return this;
Expand Down
Loading