diff --git a/Cargo.lock b/Cargo.lock index 2fea348dbbf..64ab4b1c5e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2960,6 +2960,7 @@ dependencies = [ "parking_lot", "pretty_assertions", "rand 0.8.5", + "rstest", "schema", "serde", "serde_json", diff --git a/influxdb3/tests/server/snapshots/lib__server__write__special_chars_query_results.snap b/influxdb3/tests/server/snapshots/lib__server__write__special_chars_query_results.snap new file mode 100644 index 00000000000..ddca94128fb --- /dev/null +++ b/influxdb3/tests/server/snapshots/lib__server__write__special_chars_query_results.snap @@ -0,0 +1,26 @@ +--- +source: influxdb3/tests/server/write.rs +expression: query_result +--- +[ + { + "field with all,special=chars": 100, + "field with spaces": 42, + "field,with,commas": "test", + "field=with=equals": true, + "normal_field": 1.0, + "normal_tag": "value1", + "tag with spaces": "value2", + "tag with spaces,and=all": "value5", + "tag,with,commas": "value3", + "tag=with=equals": "value4", + "time": "1970-01-01T00:00:01" + }, + { + "field with spaces": 84, + "normal_field": 2.0, + "normal_tag": "value1", + "tag with spaces": "value with spaces", + "time": "1970-01-01T00:00:02" + } +] diff --git a/influxdb3/tests/server/snapshots/lib__server__write__special_chars_table_columns.snap b/influxdb3/tests/server/snapshots/lib__server__write__special_chars_table_columns.snap new file mode 100644 index 00000000000..e5bcb9bd5d7 --- /dev/null +++ b/influxdb3/tests/server/snapshots/lib__server__write__special_chars_table_columns.snap @@ -0,0 +1,94 @@ +--- +source: influxdb3/tests/server/write.rs +expression: table_info +--- +[ + { + "column_name": "field with all,special=chars", + "data_type": "UInt64", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "field with spaces", + "data_type": "Int64", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "field,with,commas", + "data_type": "Utf8", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "field=with=equals", + "data_type": "Boolean", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "normal_field", + "data_type": "Float64", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "normal_tag", + "data_type": "Dictionary(Int32, Utf8)", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "tag with spaces", + "data_type": "Dictionary(Int32, Utf8)", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "tag with spaces,and=all", + "data_type": "Dictionary(Int32, Utf8)", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "tag,with,commas", + "data_type": "Dictionary(Int32, Utf8)", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "tag=with=equals", + "data_type": "Dictionary(Int32, Utf8)", + "is_nullable": "YES", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + }, + { + "column_name": "time", + "data_type": "Timestamp(Nanosecond, None)", + "is_nullable": "NO", + "table_catalog": "public", + "table_name": "metrics", + "table_schema": "iox" + } +] diff --git a/influxdb3/tests/server/write.rs b/influxdb3/tests/server/write.rs index 3a2b5336643..672e53c9aa7 100644 --- a/influxdb3/tests/server/write.rs +++ b/influxdb3/tests/server/write.rs @@ -393,3 +393,76 @@ async fn api_no_sync_param() { println!("Response [{status}]:\n{body}"); assert_eq!(status, StatusCode::NO_CONTENT); } + +#[test_log::test(tokio::test)] +async fn test_table_creation_with_special_characters_line_protocol_compatibility() { + let server = TestServer::spawn().await; + let db_name = "test_special_chars"; + let table_name = "metrics"; + + // Create database + let result = server.create_database(db_name).run().unwrap(); + assert!(result.contains(&format!("Database \"{db_name}\" created successfully"))); + + // Create table with tag and field names containing special characters that require escaping in line protocol + // Using the API directly to avoid CLI parsing issues with special characters + server + .create_table(db_name, table_name) + .with_tags([ + "normal_tag", // Normal tag name + "tag with spaces", // Tag with spaces + "tag,with,commas", // Tag with commas + "tag=with=equals", // Tag with equals signs + "tag with spaces,and=all", // Tag with all special chars + ]) + .with_fields([ + ("normal_field", "float64"), // Normal field name + ("field with spaces", "int64"), // Field with spaces + ("field,with,commas", "utf8"), // Field with commas + ("field=with=equals", "bool"), // Field with equals signs + ("field with all,special=chars", "uint64"), // Field with all special chars + ]) + .run_api() + .await + .unwrap(); + + // Now test writing to the table using valid line protocol with proper escaping + // Line protocol format: measurement,tag_set field_set timestamp + // Tag keys/values and field keys need escaping for spaces, commas, and equals signs + + // Test 1: Write with all fields and tags using proper escaping + let lp = r#"metrics,normal_tag=value1,tag\ with\ spaces=value2,tag\,with\,commas=value3,tag\=with\=equals=value4,tag\ with\ spaces\,and\=all=value5 normal_field=1.0,field\ with\ spaces=42i,field\,with\,commas="test",field\=with\=equals=true,field\ with\ all\,special\=chars=100u 1000000000"#; + + server + .write_lp_to_db(db_name, lp, Precision::Nanosecond) + .await + .expect("Failed to write first line protocol"); + + // Test 2: Write with subset of fields to verify partial writes work + let lp2 = r#"metrics,normal_tag=value1,tag\ with\ spaces=value\ with\ spaces normal_field=2.0,field\ with\ spaces=84i 2000000000"#; + + server + .write_lp_to_db(db_name, lp2, Precision::Nanosecond) + .await + .expect("Failed to write second line protocol"); + + // Query the data to verify it was written correctly + let query_result = server + .query_sql(db_name) + .with_sql(format!("SELECT * FROM {table_name} ORDER BY time")) + .run() + .unwrap(); + + // Use insta snapshot to verify the query results + insta::assert_json_snapshot!("special_chars_query_results", query_result); + + // Test 3: Verify table schema shows the correct column names + let table_info = server + .query_sql(db_name) + .with_sql(format!("SHOW COLUMNS FROM {table_name}")) + .run() + .unwrap(); + + // Use insta snapshot to verify the table columns + insta::assert_json_snapshot!("special_chars_table_columns", table_info); +} diff --git a/influxdb3_catalog/Cargo.toml b/influxdb3_catalog/Cargo.toml index aec3f0435b3..ae41fccc381 100644 --- a/influxdb3_catalog/Cargo.toml +++ b/influxdb3_catalog/Cargo.toml @@ -56,6 +56,7 @@ influxdb3_test_helpers = { path = "../influxdb3_test_helpers" } # crates.io deps insta.workspace = true pretty_assertions.workspace = true +rstest.workspace = true test_helpers.workspace = true test-log.workspace = true diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index f94abf16be1..d861b2af547 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -2587,6 +2587,7 @@ mod tests { use iox_time::MockProvider; use object_store::{local::LocalFileSystem, memory::InMemory}; use pretty_assertions::assert_eq; + use rstest::rstest; use test_helpers::assert_contains; #[test_log::test(tokio::test)] @@ -5731,4 +5732,130 @@ mod tests { assert!(table_def.deleted); assert_eq!(table_def.hard_delete_time, Some(new_specific_time)); } + + #[rstest] + #[case::empty_tag_name(&[""], &[("field1", FieldDataType::String)], "tag key cannot be empty")] + #[case::empty_field_name(&["tag1"], &[("", FieldDataType::String)], "field key cannot be empty")] + #[case::multiple_with_empty_tag(&["tag1", "", "tag3"], &[("field1", FieldDataType::String)], "tag key cannot be empty")] + #[case::multiple_with_empty_field(&["tag1"], &[("field1", FieldDataType::String), ("", FieldDataType::Integer)], "field key cannot be empty")] + #[case::tag_with_newline(&["tag\nname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")] + #[case::field_with_newline(&["tag1"], &[("field\nname", FieldDataType::String)], "field key cannot contain control characters")] + #[case::tag_with_tab(&["tag\tname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")] + #[case::field_with_tab(&["tag1"], &[("field\tname", FieldDataType::String)], "field key cannot contain control characters")] + #[case::tag_with_carriage_return(&["tag\rname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")] + #[case::field_with_carriage_return(&["tag1"], &[("field\rname", FieldDataType::String)], "field key cannot contain control characters")] + #[case::tag_with_null(&["tag\0name"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")] + #[case::field_with_null(&["tag1"], &[("field\0name", FieldDataType::String)], "field key cannot contain control characters")] + #[case::tag_with_form_feed(&["tag\x0Cname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")] + #[case::field_with_form_feed(&["tag1"], &[("field\x0Cname", FieldDataType::String)], "field key cannot contain control characters")] + #[case::tag_with_del(&["tag\x7Fname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")] + #[case::field_with_del(&["tag1"], &[("field\x7Fname", FieldDataType::String)], "field key cannot contain control characters")] + #[test_log::test(tokio::test)] + async fn test_create_table_validates_tag_and_field_names( + #[case] tags: &[&str], + #[case] fields: &[(&str, FieldDataType)], + #[case] expected_error: &str, + ) { + let catalog = Catalog::new_in_memory("test-host").await.unwrap(); + catalog.create_database("test_db").await.unwrap(); + + let result = catalog + .create_table("test_db", "test_table", tags, fields) + .await; + + assert!(result.is_err()); + let err_string = result.unwrap_err().to_string(); + assert_contains!(err_string, expected_error); + } + + // Test valid tag and field names, including those that require escaping in line protocol. + #[rstest] + // Simple valid names + #[case::valid_simple(&["tag1"], &[("field1", FieldDataType::String)], &["tag1"], &["field1"])] + #[case::valid_multiple_tags(&["tag1", "tag2", "tag3"], &[("field1", FieldDataType::String)], &["tag1", "tag2", "tag3"], &["field1"])] + #[case::valid_multiple_fields(&["tag1"], &[("field1", FieldDataType::String), ("field2", FieldDataType::Integer), ("field3", FieldDataType::Float)], &["tag1"], &["field1", "field2", "field3"])] + #[case::valid_underscore(&["tag_1"], &[("field_1", FieldDataType::String)], &["tag_1"], &["field_1"])] + #[case::valid_numbers(&["tag123"], &[("field456", FieldDataType::String)], &["tag123"], &["field456"])] + #[case::valid_mixed(&["tag_123"], &[("field_456", FieldDataType::String)], &["tag_123"], &["field_456"])] + // Special characters that don't require escaping + #[case::valid_camelcase(&["tagName"], &[("fieldName", FieldDataType::String)], &["tagName"], &["fieldName"])] + #[case::valid_dots(&["tag.name"], &[("field.name", FieldDataType::String)], &["tag.name"], &["field.name"])] + #[case::valid_hyphens(&["tag-name"], &[("field-name", FieldDataType::String)], &["tag-name"], &["field-name"])] + #[case::valid_colons(&["tag:name"], &[("field:name", FieldDataType::String)], &["tag:name"], &["field:name"])] + #[case::valid_slashes(&["tag/name"], &[("field/name", FieldDataType::String)], &["tag/name"], &["field/name"])] + #[case::valid_brackets(&["tag[0]"], &[("field[0]", FieldDataType::String)], &["tag[0]"], &["field[0]"])] + #[case::valid_parentheses(&["tag(1)"], &[("field(1)", FieldDataType::String)], &["tag(1)"], &["field(1)"])] + #[case::valid_special_chars(&["tag@host"], &[("field#1", FieldDataType::String)], &["tag@host"], &["field#1"])] + #[case::valid_unicode(&["tag_名前"], &[("field_值", FieldDataType::String)], &["tag_名前"], &["field_值"])] + #[case::valid_long_names(&["this_is_a_very_long_tag_name_with_many_characters"], &[("this_is_a_very_long_field_name_with_many_characters", FieldDataType::String)], &["this_is_a_very_long_tag_name_with_many_characters"], &["this_is_a_very_long_field_name_with_many_characters"])] + // Names that require escaping in line protocol; the catalog stores the unescaped form of names: + // - "tag\ with\ space" in line protocol -> "tag with space" in catalog + // - "tag\,comma" in line protocol -> "tag,comma" in catalog + // - "tag\=equals" in line protocol -> "tag=equals" in catalog + // - "tag\\backslash" in line protocol -> "tag\backslash" in catalog + #[case::escaped_space(&["tag with space"], &[("field with space", FieldDataType::String)], &["tag with space"], &["field with space"])] + #[case::escaped_comma(&["tag,comma"], &[("field,comma", FieldDataType::String)], &["tag,comma"], &["field,comma"])] + #[case::escaped_equals(&["tag=equals"], &[("field=equals", FieldDataType::String)], &["tag=equals"], &["field=equals"])] + #[case::literal_backslash(&["tag\\backslash"], &[("field\\backslash", FieldDataType::String)], &["tag\\backslash"], &["field\\backslash"])] + #[case::multiple_special(&["tag with,comma=equals"], &[("field=with space,and=comma", FieldDataType::String)], &["tag with,comma=equals"], &["field=with space,and=comma"])] + #[test_log::test(tokio::test)] + async fn test_create_table_with_valid_names( + #[case] tags: &[&str], + #[case] fields: &[(&str, FieldDataType)], + #[case] expected_tag_names: &[&str], + #[case] expected_field_names: &[&str], + ) { + let catalog = Catalog::new_in_memory("test-host").await.unwrap(); + catalog.create_database("test_db").await.unwrap(); + + let result = catalog + .create_table("test_db", "test_table", tags, fields) + .await; + + assert!( + result.is_ok(), + "Failed to create table with names that are valid in escaped line protocol: {result:?}" + ); + + // Verify the table was created correctly + let db_schema = catalog.db_schema("test_db").unwrap(); + let table_def = db_schema.table_definition("test_table").unwrap(); + + // Verify tag names match expectations + let tag_columns: Vec<_> = table_def + .columns + .resource_iter() + .filter(|col| col.data_type == InfluxColumnType::Tag) + .collect(); + + assert_eq!(tag_columns.len(), expected_tag_names.len()); + for (i, expected_name) in expected_tag_names.iter().enumerate() { + assert_eq!( + tag_columns[i].name.as_ref(), + *expected_name, + "Tag name mismatch at index {i}" + ); + } + + // Verify field names match expectations + let field_columns: Vec<_> = table_def + .columns + .resource_iter() + .filter(|col| { + !matches!( + col.data_type, + InfluxColumnType::Tag | InfluxColumnType::Timestamp + ) + }) + .collect(); + + assert_eq!(field_columns.len(), expected_field_names.len()); + for (i, expected_name) in expected_field_names.iter().enumerate() { + assert_eq!( + field_columns[i].name.as_ref(), + *expected_name, + "Field name mismatch at index {i}" + ); + } + } } diff --git a/influxdb3_catalog/src/catalog/update.rs b/influxdb3_catalog/src/catalog/update.rs index 03eeb7bcecd..0aa31371159 100644 --- a/influxdb3_catalog/src/catalog/update.rs +++ b/influxdb3_catalog/src/catalog/update.rs @@ -1282,6 +1282,19 @@ impl DatabaseCatalogTransaction { if tags.len() + fields.len() > self.columns_per_table_limit - 1 { return Err(CatalogError::TooManyColumns(self.columns_per_table_limit)); } + + // Validate tag names using the same rules as the line protocol parser + for tag in tags { + let tag_name = tag.as_ref(); + validate_key(tag_name, "tag")?; + } + + // Validate field names using the same rules as the line protocol parser + for (field_name, _) in fields { + let field_name_str = field_name.as_ref(); + validate_key(field_name_str, "field")?; + } + let db_schema = Arc::make_mut(&mut self.database_schema); let mut table_def_arc = db_schema.create_new_empty_table(table_name)?; let table_def = Arc::make_mut(&mut table_def_arc); @@ -1373,3 +1386,23 @@ impl Prompt { s } } + +/// Validate that a key (tag or field) is valid. +/// Keys cannot be empty and cannot contain control characters (ASCII 0x00-0x1F and 0x7F). +/// This ensures compatibility with the line protocol parser. +fn validate_key(key: &str, key_type: &str) -> Result<()> { + if key.is_empty() { + return Err(CatalogError::InvalidConfiguration { + message: format!("{key_type} key cannot be empty").into(), + }); + } + + // Check for control characters + if key.chars().any(|c| c.is_control()) { + return Err(CatalogError::InvalidConfiguration { + message: format!("{key_type} key cannot contain control characters").into(), + }); + } + + Ok(()) +}