Skip to content

Commit eb80a87

Browse files
[C++] Implement CommandGetImportedKeys and CommandGetExportedKeys (apache#163)
* Implement CommandGetImportedKeys and CommandGetExportedKeys on Flight SQL Server example * Refactor DoGet methods to reduce code duplication
1 parent 7556b64 commit eb80a87

File tree

5 files changed

+195
-18
lines changed

5 files changed

+195
-18
lines changed

cpp/src/arrow/flight/flight-sql/example/sqlite_server.cc

Lines changed: 93 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,21 @@ void SQLiteFlightSqlServer::ExecuteSql(const std::string& sql) {
106106
}
107107
}
108108

109+
Status DoGetSQLiteQuery(sqlite3* db, const std::string& query,
110+
const std::shared_ptr<Schema>& schema,
111+
std::unique_ptr<FlightDataStream>* result) {
112+
std::shared_ptr<SqliteStatement> statement;
113+
ARROW_RETURN_NOT_OK(SqliteStatement::Create(db, query, &statement));
114+
115+
std::shared_ptr<SqliteStatementBatchReader> reader;
116+
ARROW_RETURN_NOT_OK(SqliteStatementBatchReader::Create(
117+
statement, schema, &reader));
118+
119+
*result = std::unique_ptr<FlightDataStream>(new RecordBatchStream(reader));
120+
121+
return Status::OK();
122+
}
123+
109124
Status GetFlightInfoForCommand(const FlightDescriptor& descriptor,
110125
std::unique_ptr<FlightInfo>* info,
111126
const google::protobuf::Message& command,
@@ -299,16 +314,7 @@ Status SQLiteFlightSqlServer::DoGetTableTypes(const ServerCallContext& context,
299314
std::unique_ptr<FlightDataStream>* result) {
300315
std::string query = "SELECT DISTINCT type as table_type FROM sqlite_master";
301316

302-
std::shared_ptr<SqliteStatement> statement;
303-
ARROW_RETURN_NOT_OK(SqliteStatement::Create(db_, query, &statement));
304-
305-
std::shared_ptr<SqliteStatementBatchReader> reader;
306-
ARROW_RETURN_NOT_OK(SqliteStatementBatchReader::Create(
307-
statement, SqlSchema::GetTableTypesSchema(), &reader));
308-
309-
*result = std::unique_ptr<FlightDataStream>(new RecordBatchStream(reader));
310-
311-
return Status::OK();
317+
return DoGetSQLiteQuery(db_, query, SqlSchema::GetTableTypesSchema(), result);
312318
}
313319

314320
Status SQLiteFlightSqlServer::GetFlightInfoPrimaryKeys(
@@ -343,17 +349,86 @@ SQLiteFlightSqlServer::DoGetPrimaryKeys(const pb::sql::CommandGetPrimaryKeys &co
343349

344350
table_query << " and table_name LIKE '" << command.table() << "'";
345351

346-
std::shared_ptr<SqliteStatement> statement;
347-
ARROW_RETURN_NOT_OK(SqliteStatement::Create(db_, table_query.str(), &statement));
352+
return DoGetSQLiteQuery(db_, table_query.str(), SqlSchema::GetPrimaryKeysSchema(),
353+
result);
354+
}
348355

349-
std::shared_ptr<SqliteStatementBatchReader> reader;
350-
ARROW_RETURN_NOT_OK(SqliteStatementBatchReader::Create(
351-
statement, SqlSchema::GetPrimaryKeysSchema(), &reader));
356+
std::string PrepareQueryForGetImportedOrExportedKeys(const std::string& filter) {
357+
return R"(SELECT * FROM (SELECT NULL AS pk_catalog_name,
358+
NULL AS pk_schema_name,
359+
p."table" AS pk_table_name,
360+
p."to" AS pk_column_name,
361+
NULL AS fk_catalog_name,
362+
NULL AS fk_schema_name,
363+
m.name AS fk_table_name,
364+
p."from" AS fk_column_name,
365+
p.seq AS key_sequence,
366+
NULL AS pk_key_name,
367+
NULL AS fk_key_name,
368+
CASE
369+
WHEN p.on_update = 'CASCADE' THEN 0
370+
WHEN p.on_update = 'RESTRICT' THEN 1
371+
WHEN p.on_update = 'SET NULL' THEN 2
372+
WHEN p.on_update = 'NO ACTION' THEN 3
373+
WHEN p.on_update = 'SET DEFAULT' THEN 4
374+
END AS update_rule,
375+
CASE
376+
WHEN p.on_delete = 'CASCADE' THEN 0
377+
WHEN p.on_delete = 'RESTRICT' THEN 1
378+
WHEN p.on_delete = 'SET NULL' THEN 2
379+
WHEN p.on_delete = 'NO ACTION' THEN 3
380+
WHEN p.on_delete = 'SET DEFAULT' THEN 4
381+
END AS delete_rule
382+
FROM sqlite_master m
383+
JOIN pragma_foreign_key_list(m.name) p ON m.name != p."table"
384+
WHERE m.type = 'table') WHERE )" + filter + R"( ORDER BY
385+
pk_catalog_name, pk_schema_name, pk_table_name, pk_key_name, key_sequence)";
386+
}
352387

353-
*result = std::unique_ptr<FlightDataStream>(
354-
new RecordBatchStream(reader));
388+
Status SQLiteFlightSqlServer::GetFlightInfoImportedKeys(
389+
const pb::sql::CommandGetImportedKeys& command, const ServerCallContext& context,
390+
const FlightDescriptor& descriptor, std::unique_ptr<FlightInfo>* info) {
391+
return GetFlightInfoForCommand(descriptor, info, command,
392+
SqlSchema::GetImportedAndExportedKeysSchema());
393+
}
355394

356-
return Status::OK();
395+
Status SQLiteFlightSqlServer::DoGetImportedKeys(
396+
const pb::sql::CommandGetImportedKeys& command, const ServerCallContext& context,
397+
std::unique_ptr<FlightDataStream>* result) {
398+
std::string filter = "fk_table_name = '" + command.table() + "'";
399+
if (command.has_catalog()) {
400+
filter += " AND fk_catalog_name = '" + command.catalog() + "'";
401+
}
402+
if (command.has_schema()) {
403+
filter += " AND fk_schema_name = '" + command.schema() + "'";
404+
}
405+
std::string query = PrepareQueryForGetImportedOrExportedKeys(filter);
406+
407+
return DoGetSQLiteQuery(db_, query, SqlSchema::GetImportedAndExportedKeysSchema(),
408+
result);
409+
}
410+
411+
Status SQLiteFlightSqlServer::GetFlightInfoExportedKeys(
412+
const pb::sql::CommandGetExportedKeys& command, const ServerCallContext& context,
413+
const FlightDescriptor& descriptor, std::unique_ptr<FlightInfo>* info) {
414+
return GetFlightInfoForCommand(descriptor, info, command,
415+
SqlSchema::GetImportedAndExportedKeysSchema());
416+
}
417+
418+
Status SQLiteFlightSqlServer::DoGetExportedKeys(
419+
const pb::sql::CommandGetExportedKeys& command, const ServerCallContext& context,
420+
std::unique_ptr<FlightDataStream>* result) {
421+
std::string filter = "pk_table_name = '" + command.table() + "'";
422+
if (command.has_catalog()) {
423+
filter += " AND pk_catalog_name = '" + command.catalog() + "'";
424+
}
425+
if (command.has_schema()) {
426+
filter += " AND pk_schema_name = '" + command.schema() + "'";
427+
}
428+
std::string query = PrepareQueryForGetImportedOrExportedKeys(filter);
429+
430+
return DoGetSQLiteQuery(db_, query, SqlSchema::GetImportedAndExportedKeysSchema(),
431+
result);
357432
}
358433

359434
} // namespace example

cpp/src/arrow/flight/flight-sql/example/sqlite_server.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,20 @@ class SQLiteFlightSqlServer : public FlightSqlServerBase {
7979
std::unique_ptr<FlightInfo> *info) override;
8080
Status DoGetTableTypes(const ServerCallContext &context,
8181
std::unique_ptr<FlightDataStream> *result) override;
82+
Status GetFlightInfoImportedKeys(const pb::sql::CommandGetImportedKeys &command,
83+
const ServerCallContext &context,
84+
const FlightDescriptor &descriptor,
85+
std::unique_ptr<FlightInfo> *info) override;
86+
Status DoGetImportedKeys(const pb::sql::CommandGetImportedKeys &command,
87+
const ServerCallContext &context,
88+
std::unique_ptr<FlightDataStream> *result) override;
89+
Status GetFlightInfoExportedKeys(const pb::sql::CommandGetExportedKeys &command,
90+
const ServerCallContext &context,
91+
const FlightDescriptor &descriptor,
92+
std::unique_ptr<FlightInfo> *info) override;
93+
Status DoGetExportedKeys(const pb::sql::CommandGetExportedKeys &command,
94+
const ServerCallContext &context,
95+
std::unique_ptr<FlightDataStream> *result) override;
8296

8397
Status GetFlightInfoPrimaryKeys(const pb::sql::CommandGetPrimaryKeys &command,
8498
const ServerCallContext &context,

cpp/src/arrow/flight/flight-sql/sql_server.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,24 @@ return arrow::schema({field("catalog_name", utf8()), field("schema_name", utf8()
300300
field("key_sequence", int64()), field("key_name", utf8())});
301301
}
302302

303+
std::shared_ptr<Schema> SqlSchema::GetImportedAndExportedKeysSchema() {
304+
return arrow::schema({
305+
field("pk_catalog_name", utf8(), true),
306+
field("pk_schema_name", utf8(), true),
307+
field("pk_table_name", utf8(), false),
308+
field("pk_column_name", utf8(), false),
309+
field("fk_catalog_name", utf8(), true),
310+
field("fk_schema_name", utf8(), true),
311+
field("fk_table_name", utf8(), false),
312+
field("fk_column_name", utf8(), false),
313+
field("key_sequence", int32(), false),
314+
field("fk_key_name", utf8(), true),
315+
field("pk_key_name", utf8(), true),
316+
field("update_rule", uint8(), false),
317+
field("delete_rule", uint8(), false)
318+
});
319+
}
320+
303321
} // namespace sql
304322
} // namespace flight
305323
} // namespace arrow

cpp/src/arrow/flight/flight-sql/sql_server.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,11 @@ class SqlSchema {
303303
/// flags is set to true.
304304
/// \return The default schema template.
305305
static std::shared_ptr<Schema> GetPrimaryKeysSchema();
306+
307+
/// \brief Gets the Schema used on CommandGetImportedKeys and CommandGetExportedKeys
308+
/// response.
309+
/// \return The default schema template.
310+
static std::shared_ptr<Schema> GetImportedAndExportedKeysSchema();
306311
};
307312
} // namespace sql
308313
} // namespace flight

cpp/src/arrow/flight/flight-sql/sql_server_test.cc

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,71 @@ TEST(TestFlightSqlServer, TestCommandGetPrimaryKeys) {
349349
ASSERT_TRUE(expected_table->Equals(*table));
350350
}
351351

352+
TEST(TestFlightSqlServer, TestCommandGetImportedKeys) {
353+
std::unique_ptr<FlightInfo> flight_info;
354+
ASSERT_OK(sql_client->GetImportedKeys({}, NULLPTR, NULLPTR, "intTable", &flight_info));
355+
356+
std::unique_ptr<FlightStreamReader> stream;
357+
ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, &stream));
358+
359+
std::shared_ptr<Table> table;
360+
ASSERT_OK(stream->ReadAll(&table));
361+
362+
DECLARE_NULL_ARRAY(pk_catalog_name, String, 1);
363+
DECLARE_NULL_ARRAY(pk_schema_name, String, 1);
364+
DECLARE_ARRAY(pk_table_name, String, ({"foreignTable"}));
365+
DECLARE_ARRAY(pk_column_name, String, ({"id"}));
366+
DECLARE_NULL_ARRAY(fk_catalog_name, String, 1);
367+
DECLARE_NULL_ARRAY(fk_schema_name, String, 1);
368+
DECLARE_ARRAY(fk_table_name, String, ({"intTable"}));
369+
DECLARE_ARRAY(fk_column_name, String, ({"foreignId"}));
370+
DECLARE_ARRAY(key_sequence, Int32, ({0}));
371+
DECLARE_NULL_ARRAY(fk_key_name, String, 1);
372+
DECLARE_NULL_ARRAY(pk_key_name, String, 1);
373+
DECLARE_ARRAY(update_rule, UInt8, ({3}));
374+
DECLARE_ARRAY(delete_rule, UInt8, ({3}));
375+
376+
const std::shared_ptr<Table>& expected_table =
377+
Table::Make(SqlSchema::GetImportedAndExportedKeysSchema(),
378+
{pk_catalog_name, pk_schema_name, pk_table_name, pk_column_name,
379+
fk_catalog_name, fk_schema_name, fk_table_name, fk_column_name,
380+
key_sequence, fk_key_name, pk_key_name, update_rule, delete_rule});
381+
ASSERT_TRUE(expected_table->Equals(*table));
382+
}
383+
384+
TEST(TestFlightSqlServer, TestCommandGetExportedKeys) {
385+
std::unique_ptr<FlightInfo> flight_info;
386+
ASSERT_OK(
387+
sql_client->GetExportedKeys({}, NULLPTR, NULLPTR, "foreignTable", &flight_info));
388+
389+
std::unique_ptr<FlightStreamReader> stream;
390+
ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, &stream));
391+
392+
std::shared_ptr<Table> table;
393+
ASSERT_OK(stream->ReadAll(&table));
394+
395+
DECLARE_NULL_ARRAY(pk_catalog_name, String, 1);
396+
DECLARE_NULL_ARRAY(pk_schema_name, String, 1);
397+
DECLARE_ARRAY(pk_table_name, String, ({"foreignTable"}));
398+
DECLARE_ARRAY(pk_column_name, String, ({"id"}));
399+
DECLARE_NULL_ARRAY(fk_catalog_name, String, 1);
400+
DECLARE_NULL_ARRAY(fk_schema_name, String, 1);
401+
DECLARE_ARRAY(fk_table_name, String, ({"intTable"}));
402+
DECLARE_ARRAY(fk_column_name, String, ({"foreignId"}));
403+
DECLARE_ARRAY(key_sequence, Int32, ({0}));
404+
DECLARE_NULL_ARRAY(fk_key_name, String, 1);
405+
DECLARE_NULL_ARRAY(pk_key_name, String, 1);
406+
DECLARE_ARRAY(update_rule, UInt8, ({3}));
407+
DECLARE_ARRAY(delete_rule, UInt8, ({3}));
408+
409+
const std::shared_ptr<Table>& expected_table =
410+
Table::Make(SqlSchema::GetImportedAndExportedKeysSchema(),
411+
{pk_catalog_name, pk_schema_name, pk_table_name, pk_column_name,
412+
fk_catalog_name, fk_schema_name, fk_table_name, fk_column_name,
413+
key_sequence, fk_key_name, pk_key_name, update_rule, delete_rule});
414+
ASSERT_TRUE(expected_table->Equals(*table));
415+
}
416+
352417
auto env =
353418
::testing::AddGlobalTestEnvironment(new TestFlightSqlServer);
354419

0 commit comments

Comments
 (0)