Skip to content

Commit ed59e6c

Browse files
committed
remove json path
1 parent 4965ac7 commit ed59e6c

File tree

5 files changed

+30
-157
lines changed

5 files changed

+30
-157
lines changed

connector/connect/src/main/protobuf/spark/connect/base.proto

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,7 @@ message ExecutePlanRequest {
139139
message ExecutePlanResponse {
140140
string client_id = 1;
141141

142-
// Result type
143-
oneof result_type {
144-
ArrowBatch arrow_batch = 2;
145-
JSONBatch json_batch = 3;
146-
}
142+
ArrowBatch arrow_batch = 2;
147143

148144
// Metrics for the query execution. Typically, this field is only present in the last
149145
// batch of results and then represent the overall state of the query execution.
@@ -155,14 +151,6 @@ message ExecutePlanResponse {
155151
bytes data = 2;
156152
}
157153

158-
// Message type when the result is returned as JSON. This is essentially a bulk wrapper
159-
// for the JSON result of a Spark DataFrame. All rows are returned in the JSON record format
160-
// of `{col -> row}`.
161-
message JSONBatch {
162-
int64 row_count = 1;
163-
bytes data = 2;
164-
}
165-
166154
message Metrics {
167155

168156
repeated MetricObject metrics = 1;

connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import scala.collection.JavaConverters._
2222
import com.google.protobuf.ByteString
2323
import io.grpc.stub.StreamObserver
2424

25-
import org.apache.spark.SparkException
2625
import org.apache.spark.connect.proto
2726
import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
2827
import org.apache.spark.internal.Logging
@@ -58,68 +57,6 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
5857
processAsArrowBatches(request.getClientId, dataframe)
5958
}
6059

61-
def processAsJsonBatches(clientId: String, dataframe: DataFrame): Unit = {
62-
// Only process up to 10MB of data.
63-
val sb = new StringBuilder
64-
var rowCount = 0
65-
dataframe.toJSON
66-
.collect()
67-
.foreach(row => {
68-
69-
// There are a few cases to cover here.
70-
// 1. The aggregated buffer size is larger than the MAX_BATCH_SIZE
71-
// -> send the current batch and reset.
72-
// 2. The aggregated buffer size is smaller than the MAX_BATCH_SIZE
73-
// -> append the row to the buffer.
74-
// 3. The row in question is larger than the MAX_BATCH_SIZE
75-
// -> fail the query.
76-
77-
// Case 3. - Fail
78-
if (row.size > MAX_BATCH_SIZE) {
79-
throw SparkException.internalError(
80-
s"Serialized row is larger than MAX_BATCH_SIZE: ${row.size} > ${MAX_BATCH_SIZE}")
81-
}
82-
83-
// Case 1 - FLush and send.
84-
if (sb.size + row.size > MAX_BATCH_SIZE) {
85-
val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
86-
val batch = proto.ExecutePlanResponse.JSONBatch
87-
.newBuilder()
88-
.setData(ByteString.copyFromUtf8(sb.toString()))
89-
.setRowCount(rowCount)
90-
.build()
91-
response.setJsonBatch(batch)
92-
responseObserver.onNext(response.build())
93-
sb.clear()
94-
sb.append(row)
95-
rowCount = 1
96-
} else {
97-
// Case 2 - Append.
98-
// Make sure to put the newline delimiters only between items and not at the end.
99-
if (rowCount > 0) {
100-
sb.append("\n")
101-
}
102-
sb.append(row)
103-
rowCount += 1
104-
}
105-
})
106-
107-
// If the last batch is not empty, send out the data to the client.
108-
if (sb.size > 0) {
109-
val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
110-
val batch = proto.ExecutePlanResponse.JSONBatch
111-
.newBuilder()
112-
.setData(ByteString.copyFromUtf8(sb.toString()))
113-
.setRowCount(rowCount)
114-
.build()
115-
response.setJsonBatch(batch)
116-
responseObserver.onNext(response.build())
117-
}
118-
119-
responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
120-
responseObserver.onCompleted()
121-
}
122-
12360
def processAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = {
12461
val spark = dataframe.sparkSession
12562
val schema = dataframe.schema

python/pyspark/sql/connect/proto/base_pb2.py

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636

3737

3838
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
39-
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xb5\x01\n\x07\x45xplain\x12\x45\n\x0c\x65xplain_mode\x18\x01 \x01(\x0e\x32".spark.connect.Explain.ExplainModeR\x0b\x65xplainMode"c\n\x0b\x45xplainMode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\n\n\x06SIMPLE\x10\x01\x12\x0c\n\x08\x45XTENDED\x10\x02\x12\x0b\n\x07\x43ODEGEN\x10\x03\x12\x08\n\x04\x43OST\x10\x04\x12\r\n\tFORMATTED\x10\x05"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\x81\x02\n\x12\x41nalyzePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\x30\n\x07\x65xplain\x18\x05 \x01(\x0b\x32\x16.spark.connect.ExplainR\x07\x65xplainB\x0e\n\x0c_client_type"\x8a\x01\n\x13\x41nalyzePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12/\n\x06schema\x18\x02 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x12%\n\x0e\x65xplain_string\x18\x03 \x01(\tR\rexplainString"\xcf\x01\n\x12\x45xecutePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x42\x0e\n\x0c_client_type"\xad\x07\n\x13\x45xecutePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12P\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchH\x00R\narrowBatch\x12M\n\njson_batch\x18\x03 \x01(\x0b\x32,.spark.connect.ExecutePlanResponse.JSONBatchH\x00R\tjsonBatch\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x1a=\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a<\n\tJSONBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricTypeB\r\n\x0bresult_type2\xc7\x01\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x42"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3'
39+
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xb5\x01\n\x07\x45xplain\x12\x45\n\x0c\x65xplain_mode\x18\x01 \x01(\x0e\x32".spark.connect.Explain.ExplainModeR\x0b\x65xplainMode"c\n\x0b\x45xplainMode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\n\n\x06SIMPLE\x10\x01\x12\x0c\n\x08\x45XTENDED\x10\x02\x12\x0b\n\x07\x43ODEGEN\x10\x03\x12\x08\n\x04\x43OST\x10\x04\x12\r\n\tFORMATTED\x10\x05"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\x81\x02\n\x12\x41nalyzePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\x30\n\x07\x65xplain\x18\x05 \x01(\x0b\x32\x16.spark.connect.ExplainR\x07\x65xplainB\x0e\n\x0c_client_type"\x8a\x01\n\x13\x41nalyzePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12/\n\x06schema\x18\x02 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x12%\n\x0e\x65xplain_string\x18\x03 \x01(\tR\rexplainString"\xcf\x01\n\x12\x45xecutePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x42\x0e\n\x0c_client_type"\x8f\x06\n\x13\x45xecutePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12N\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchR\narrowBatch\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x1a=\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricType2\xc7\x01\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x42"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3'
4040
)
4141

4242

@@ -48,7 +48,6 @@
4848
_EXECUTEPLANREQUEST = DESCRIPTOR.message_types_by_name["ExecutePlanRequest"]
4949
_EXECUTEPLANRESPONSE = DESCRIPTOR.message_types_by_name["ExecutePlanResponse"]
5050
_EXECUTEPLANRESPONSE_ARROWBATCH = _EXECUTEPLANRESPONSE.nested_types_by_name["ArrowBatch"]
51-
_EXECUTEPLANRESPONSE_JSONBATCH = _EXECUTEPLANRESPONSE.nested_types_by_name["JSONBatch"]
5251
_EXECUTEPLANRESPONSE_METRICS = _EXECUTEPLANRESPONSE.nested_types_by_name["Metrics"]
5352
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT = _EXECUTEPLANRESPONSE_METRICS.nested_types_by_name[
5453
"MetricObject"
@@ -139,15 +138,6 @@
139138
# @@protoc_insertion_point(class_scope:spark.connect.ExecutePlanResponse.ArrowBatch)
140139
},
141140
),
142-
"JSONBatch": _reflection.GeneratedProtocolMessageType(
143-
"JSONBatch",
144-
(_message.Message,),
145-
{
146-
"DESCRIPTOR": _EXECUTEPLANRESPONSE_JSONBATCH,
147-
"__module__": "spark.connect.base_pb2"
148-
# @@protoc_insertion_point(class_scope:spark.connect.ExecutePlanResponse.JSONBatch)
149-
},
150-
),
151141
"Metrics": _reflection.GeneratedProtocolMessageType(
152142
"Metrics",
153143
(_message.Message,),
@@ -191,7 +181,6 @@
191181
)
192182
_sym_db.RegisterMessage(ExecutePlanResponse)
193183
_sym_db.RegisterMessage(ExecutePlanResponse.ArrowBatch)
194-
_sym_db.RegisterMessage(ExecutePlanResponse.JSONBatch)
195184
_sym_db.RegisterMessage(ExecutePlanResponse.Metrics)
196185
_sym_db.RegisterMessage(ExecutePlanResponse.Metrics.MetricObject)
197186
_sym_db.RegisterMessage(ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntry)
@@ -219,19 +208,17 @@
219208
_EXECUTEPLANREQUEST._serialized_start = 986
220209
_EXECUTEPLANREQUEST._serialized_end = 1193
221210
_EXECUTEPLANRESPONSE._serialized_start = 1196
222-
_EXECUTEPLANRESPONSE._serialized_end = 2137
223-
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 1479
224-
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 1540
225-
_EXECUTEPLANRESPONSE_JSONBATCH._serialized_start = 1542
226-
_EXECUTEPLANRESPONSE_JSONBATCH._serialized_end = 1602
227-
_EXECUTEPLANRESPONSE_METRICS._serialized_start = 1605
228-
_EXECUTEPLANRESPONSE_METRICS._serialized_end = 2122
229-
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 1700
230-
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 2032
231-
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1909
232-
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 2032
233-
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 2034
234-
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 2122
235-
_SPARKCONNECTSERVICE._serialized_start = 2140
236-
_SPARKCONNECTSERVICE._serialized_end = 2339
211+
_EXECUTEPLANRESPONSE._serialized_end = 1979
212+
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 1398
213+
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 1459
214+
_EXECUTEPLANRESPONSE_METRICS._serialized_start = 1462
215+
_EXECUTEPLANRESPONSE_METRICS._serialized_end = 1979
216+
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 1557
217+
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 1889
218+
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1766
219+
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 1889
220+
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 1891
221+
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 1979
222+
_SPARKCONNECTSERVICE._serialized_start = 1982
223+
_SPARKCONNECTSERVICE._serialized_end = 2181
237224
# @@protoc_insertion_point(module_scope)

python/pyspark/sql/connect/proto/base_pb2.pyi

Lines changed: 2 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -401,28 +401,6 @@ class ExecutePlanResponse(google.protobuf.message.Message):
401401
self, field_name: typing_extensions.Literal["data", b"data", "row_count", b"row_count"]
402402
) -> None: ...
403403

404-
class JSONBatch(google.protobuf.message.Message):
405-
"""Message type when the result is returned as JSON. This is essentially a bulk wrapper
406-
for the JSON result of a Spark DataFrame. All rows are returned in the JSON record format
407-
of `{col -> row}`.
408-
"""
409-
410-
DESCRIPTOR: google.protobuf.descriptor.Descriptor
411-
412-
ROW_COUNT_FIELD_NUMBER: builtins.int
413-
DATA_FIELD_NUMBER: builtins.int
414-
row_count: builtins.int
415-
data: builtins.bytes
416-
def __init__(
417-
self,
418-
*,
419-
row_count: builtins.int = ...,
420-
data: builtins.bytes = ...,
421-
) -> None: ...
422-
def ClearField(
423-
self, field_name: typing_extensions.Literal["data", b"data", "row_count", b"row_count"]
424-
) -> None: ...
425-
426404
class Metrics(google.protobuf.message.Message):
427405
DESCRIPTOR: google.protobuf.descriptor.Descriptor
428406

@@ -530,14 +508,11 @@ class ExecutePlanResponse(google.protobuf.message.Message):
530508

531509
CLIENT_ID_FIELD_NUMBER: builtins.int
532510
ARROW_BATCH_FIELD_NUMBER: builtins.int
533-
JSON_BATCH_FIELD_NUMBER: builtins.int
534511
METRICS_FIELD_NUMBER: builtins.int
535512
client_id: builtins.str
536513
@property
537514
def arrow_batch(self) -> global___ExecutePlanResponse.ArrowBatch: ...
538515
@property
539-
def json_batch(self) -> global___ExecutePlanResponse.JSONBatch: ...
540-
@property
541516
def metrics(self) -> global___ExecutePlanResponse.Metrics:
542517
"""Metrics for the query execution. Typically, this field is only present in the last
543518
batch of results and then represent the overall state of the query execution.
@@ -547,39 +522,17 @@ class ExecutePlanResponse(google.protobuf.message.Message):
547522
*,
548523
client_id: builtins.str = ...,
549524
arrow_batch: global___ExecutePlanResponse.ArrowBatch | None = ...,
550-
json_batch: global___ExecutePlanResponse.JSONBatch | None = ...,
551525
metrics: global___ExecutePlanResponse.Metrics | None = ...,
552526
) -> None: ...
553527
def HasField(
554528
self,
555-
field_name: typing_extensions.Literal[
556-
"arrow_batch",
557-
b"arrow_batch",
558-
"json_batch",
559-
b"json_batch",
560-
"metrics",
561-
b"metrics",
562-
"result_type",
563-
b"result_type",
564-
],
529+
field_name: typing_extensions.Literal["arrow_batch", b"arrow_batch", "metrics", b"metrics"],
565530
) -> builtins.bool: ...
566531
def ClearField(
567532
self,
568533
field_name: typing_extensions.Literal[
569-
"arrow_batch",
570-
b"arrow_batch",
571-
"client_id",
572-
b"client_id",
573-
"json_batch",
574-
b"json_batch",
575-
"metrics",
576-
b"metrics",
577-
"result_type",
578-
b"result_type",
534+
"arrow_batch", b"arrow_batch", "client_id", b"client_id", "metrics", b"metrics"
579535
],
580536
) -> None: ...
581-
def WhichOneof(
582-
self, oneof_group: typing_extensions.Literal["result_type", b"result_type"]
583-
) -> typing_extensions.Literal["arrow_batch", "json_batch"] | None: ...
584537

585538
global___ExecutePlanResponse = ExecutePlanResponse

python/pyspark/sql/tests/connect/test_connect_basic.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,12 @@ def test_create_global_temp_view(self):
222222
self.connect.sql("SELECT 1 AS X LIMIT 0").createGlobalTempView("view_1")
223223

224224
def test_to_pandas(self):
225-
# SPARK-XXXX: Test to pandas
225+
# SPARK-41005: Test to pandas
226226
query = """
227227
SELECT * FROM VALUES
228-
(false, 1, float(NULL)), (false, NULL, float(2.0)), (NULL, 3, float(3.0))
228+
(false, 1, NULL),
229+
(false, NULL, float(2.0)),
230+
(NULL, 3, float(3.0))
229231
AS tab(a, b, c)
230232
"""
231233

@@ -236,7 +238,9 @@ def test_to_pandas(self):
236238

237239
query = """
238240
SELECT * FROM VALUES
239-
(1, 1, float(NULL)), (2, NULL, float(2.0)), (3, 3, float(3.0))
241+
(1, 1, NULL),
242+
(2, NULL, float(2.0)),
243+
(3, 3, float(3.0))
240244
AS tab(a, b, c)
241245
"""
242246

@@ -247,7 +251,9 @@ def test_to_pandas(self):
247251

248252
query = """
249253
SELECT * FROM VALUES
250-
(1.0, 1, "1"), (NULL, NULL, NULL), (2.0, 3, "3")
254+
(double(1.0), 1, "1"),
255+
(NULL, NULL, NULL),
256+
(double(2.0), 3, "3")
251257
AS tab(a, b, c)
252258
"""
253259

@@ -258,7 +264,9 @@ def test_to_pandas(self):
258264

259265
query = """
260266
SELECT * FROM VALUES
261-
(float(1.0), 1.0, 1, "1"), (float(2.0), 2.0, 2, "2"), (float(3.0), 2.0, 3, "3")
267+
(float(1.0), double(1.0), 1, "1"),
268+
(float(2.0), double(2.0), 2, "2"),
269+
(float(3.0), double(3.0), 3, "3")
262270
AS tab(a, b, c, d)
263271
"""
264272

0 commit comments

Comments
 (0)