Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
source: influxdb3/tests/server/write.rs
expression: query_result
---
[
{
"field with all,special=chars": 100,
"field with spaces": 42,
"field,with,commas": "test",
"field=with=equals": true,
"normal_field": 1.0,
"normal_tag": "value1",
"tag with spaces": "value2",
"tag with spaces,and=all": "value5",
"tag,with,commas": "value3",
"tag=with=equals": "value4",
"time": "1970-01-01T00:00:01"
},
{
"field with spaces": 84,
"normal_field": 2.0,
"normal_tag": "value1",
"tag with spaces": "value with spaces",
"time": "1970-01-01T00:00:02"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
---
source: influxdb3/tests/server/write.rs
expression: table_info
---
[
{
"column_name": "field with all,special=chars",
"data_type": "UInt64",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "field with spaces",
"data_type": "Int64",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "field,with,commas",
"data_type": "Utf8",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "field=with=equals",
"data_type": "Boolean",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "normal_field",
"data_type": "Float64",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "normal_tag",
"data_type": "Dictionary(Int32, Utf8)",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "tag with spaces",
"data_type": "Dictionary(Int32, Utf8)",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "tag with spaces,and=all",
"data_type": "Dictionary(Int32, Utf8)",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "tag,with,commas",
"data_type": "Dictionary(Int32, Utf8)",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "tag=with=equals",
"data_type": "Dictionary(Int32, Utf8)",
"is_nullable": "YES",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
},
{
"column_name": "time",
"data_type": "Timestamp(Nanosecond, None)",
"is_nullable": "NO",
"table_catalog": "public",
"table_name": "metrics",
"table_schema": "iox"
}
]
73 changes: 73 additions & 0 deletions influxdb3/tests/server/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,76 @@ async fn api_no_sync_param() {
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);
}

#[test_log::test(tokio::test)]
async fn test_table_creation_with_special_characters_line_protocol_compatibility() {
let server = TestServer::spawn().await;
let db_name = "test_special_chars";
let table_name = "metrics";

// Create database
let result = server.create_database(db_name).run().unwrap();
assert!(result.contains(&format!("Database \"{db_name}\" created successfully")));

// Create table with tag and field names containing special characters that require escaping in line protocol
// Using the API directly to avoid CLI parsing issues with special characters
server
.create_table(db_name, table_name)
.with_tags([
"normal_tag", // Normal tag name
"tag with spaces", // Tag with spaces
"tag,with,commas", // Tag with commas
"tag=with=equals", // Tag with equals signs
"tag with spaces,and=all", // Tag with all special chars
])
.with_fields([
("normal_field", "float64"), // Normal field name
("field with spaces", "int64"), // Field with spaces
("field,with,commas", "utf8"), // Field with commas
("field=with=equals", "bool"), // Field with equals signs
("field with all,special=chars", "uint64"), // Field with all special chars
])
.run_api()
.await
.unwrap();

// Now test writing to the table using valid line protocol with proper escaping
// Line protocol format: measurement,tag_set field_set timestamp
// Tag keys/values and field keys need escaping for spaces, commas, and equals signs

// Test 1: Write with all fields and tags using proper escaping
let lp = r#"metrics,normal_tag=value1,tag\ with\ spaces=value2,tag\,with\,commas=value3,tag\=with\=equals=value4,tag\ with\ spaces\,and\=all=value5 normal_field=1.0,field\ with\ spaces=42i,field\,with\,commas="test",field\=with\=equals=true,field\ with\ all\,special\=chars=100u 1000000000"#;

server
.write_lp_to_db(db_name, lp, Precision::Nanosecond)
.await
.expect("Failed to write first line protocol");

// Test 2: Write with subset of fields to verify partial writes work
let lp2 = r#"metrics,normal_tag=value1,tag\ with\ spaces=value\ with\ spaces normal_field=2.0,field\ with\ spaces=84i 2000000000"#;

server
.write_lp_to_db(db_name, lp2, Precision::Nanosecond)
.await
.expect("Failed to write second line protocol");

// Query the data to verify it was written correctly
let query_result = server
.query_sql(db_name)
.with_sql(format!("SELECT * FROM {table_name} ORDER BY time"))
.run()
.unwrap();

// Use insta snapshot to verify the query results
insta::assert_json_snapshot!("special_chars_query_results", query_result);

// Test 3: Verify table schema shows the correct column names
let table_info = server
.query_sql(db_name)
.with_sql(format!("SHOW COLUMNS FROM {table_name}"))
.run()
.unwrap();

// Use insta snapshot to verify the table columns
insta::assert_json_snapshot!("special_chars_table_columns", table_info);
}
1 change: 1 addition & 0 deletions influxdb3_catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }
# crates.io deps
insta.workspace = true
pretty_assertions.workspace = true
rstest.workspace = true
test_helpers.workspace = true
test-log.workspace = true

Expand Down
127 changes: 127 additions & 0 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2587,6 +2587,7 @@ mod tests {
use iox_time::MockProvider;
use object_store::{local::LocalFileSystem, memory::InMemory};
use pretty_assertions::assert_eq;
use rstest::rstest;
use test_helpers::assert_contains;

#[test_log::test(tokio::test)]
Expand Down Expand Up @@ -5731,4 +5732,130 @@ mod tests {
assert!(table_def.deleted);
assert_eq!(table_def.hard_delete_time, Some(new_specific_time));
}

#[rstest]
#[case::empty_tag_name(&[""], &[("field1", FieldDataType::String)], "tag key cannot be empty")]
#[case::empty_field_name(&["tag1"], &[("", FieldDataType::String)], "field key cannot be empty")]
#[case::multiple_with_empty_tag(&["tag1", "", "tag3"], &[("field1", FieldDataType::String)], "tag key cannot be empty")]
#[case::multiple_with_empty_field(&["tag1"], &[("field1", FieldDataType::String), ("", FieldDataType::Integer)], "field key cannot be empty")]
#[case::tag_with_newline(&["tag\nname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")]
#[case::field_with_newline(&["tag1"], &[("field\nname", FieldDataType::String)], "field key cannot contain control characters")]
#[case::tag_with_tab(&["tag\tname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")]
#[case::field_with_tab(&["tag1"], &[("field\tname", FieldDataType::String)], "field key cannot contain control characters")]
#[case::tag_with_carriage_return(&["tag\rname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")]
#[case::field_with_carriage_return(&["tag1"], &[("field\rname", FieldDataType::String)], "field key cannot contain control characters")]
#[case::tag_with_null(&["tag\0name"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")]
#[case::field_with_null(&["tag1"], &[("field\0name", FieldDataType::String)], "field key cannot contain control characters")]
#[case::tag_with_form_feed(&["tag\x0Cname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")]
#[case::field_with_form_feed(&["tag1"], &[("field\x0Cname", FieldDataType::String)], "field key cannot contain control characters")]
#[case::tag_with_del(&["tag\x7Fname"], &[("field1", FieldDataType::String)], "tag key cannot contain control characters")]
#[case::field_with_del(&["tag1"], &[("field\x7Fname", FieldDataType::String)], "field key cannot contain control characters")]
#[test_log::test(tokio::test)]
async fn test_create_table_validates_tag_and_field_names(
#[case] tags: &[&str],
#[case] fields: &[(&str, FieldDataType)],
#[case] expected_error: &str,
) {
let catalog = Catalog::new_in_memory("test-host").await.unwrap();
catalog.create_database("test_db").await.unwrap();

let result = catalog
.create_table("test_db", "test_table", tags, fields)
.await;

assert!(result.is_err());
let err_string = result.unwrap_err().to_string();
assert_contains!(err_string, expected_error);
}

// Test valid tag and field names, including those that require escaping in line protocol.
#[rstest]
// Simple valid names
#[case::valid_simple(&["tag1"], &[("field1", FieldDataType::String)], &["tag1"], &["field1"])]
#[case::valid_multiple_tags(&["tag1", "tag2", "tag3"], &[("field1", FieldDataType::String)], &["tag1", "tag2", "tag3"], &["field1"])]
#[case::valid_multiple_fields(&["tag1"], &[("field1", FieldDataType::String), ("field2", FieldDataType::Integer), ("field3", FieldDataType::Float)], &["tag1"], &["field1", "field2", "field3"])]
#[case::valid_underscore(&["tag_1"], &[("field_1", FieldDataType::String)], &["tag_1"], &["field_1"])]
#[case::valid_numbers(&["tag123"], &[("field456", FieldDataType::String)], &["tag123"], &["field456"])]
#[case::valid_mixed(&["tag_123"], &[("field_456", FieldDataType::String)], &["tag_123"], &["field_456"])]
// Special characters that don't require escaping
#[case::valid_camelcase(&["tagName"], &[("fieldName", FieldDataType::String)], &["tagName"], &["fieldName"])]
#[case::valid_dots(&["tag.name"], &[("field.name", FieldDataType::String)], &["tag.name"], &["field.name"])]
#[case::valid_hyphens(&["tag-name"], &[("field-name", FieldDataType::String)], &["tag-name"], &["field-name"])]
#[case::valid_colons(&["tag:name"], &[("field:name", FieldDataType::String)], &["tag:name"], &["field:name"])]
#[case::valid_slashes(&["tag/name"], &[("field/name", FieldDataType::String)], &["tag/name"], &["field/name"])]
#[case::valid_brackets(&["tag[0]"], &[("field[0]", FieldDataType::String)], &["tag[0]"], &["field[0]"])]
#[case::valid_parentheses(&["tag(1)"], &[("field(1)", FieldDataType::String)], &["tag(1)"], &["field(1)"])]
#[case::valid_special_chars(&["tag@host"], &[("field#1", FieldDataType::String)], &["tag@host"], &["field#1"])]
#[case::valid_unicode(&["tag_名前"], &[("field_值", FieldDataType::String)], &["tag_名前"], &["field_值"])]
#[case::valid_long_names(&["this_is_a_very_long_tag_name_with_many_characters"], &[("this_is_a_very_long_field_name_with_many_characters", FieldDataType::String)], &["this_is_a_very_long_tag_name_with_many_characters"], &["this_is_a_very_long_field_name_with_many_characters"])]
// Names that require escaping in line protocol; the catalog stores the unescaped form of names:
// - "tag\ with\ space" in line protocol -> "tag with space" in catalog
// - "tag\,comma" in line protocol -> "tag,comma" in catalog
// - "tag\=equals" in line protocol -> "tag=equals" in catalog
// - "tag\\backslash" in line protocol -> "tag\backslash" in catalog
#[case::escaped_space(&["tag with space"], &[("field with space", FieldDataType::String)], &["tag with space"], &["field with space"])]
#[case::escaped_comma(&["tag,comma"], &[("field,comma", FieldDataType::String)], &["tag,comma"], &["field,comma"])]
#[case::escaped_equals(&["tag=equals"], &[("field=equals", FieldDataType::String)], &["tag=equals"], &["field=equals"])]
#[case::literal_backslash(&["tag\\backslash"], &[("field\\backslash", FieldDataType::String)], &["tag\\backslash"], &["field\\backslash"])]
#[case::multiple_special(&["tag with,comma=equals"], &[("field=with space,and=comma", FieldDataType::String)], &["tag with,comma=equals"], &["field=with space,and=comma"])]
#[test_log::test(tokio::test)]
async fn test_create_table_with_valid_names(
#[case] tags: &[&str],
#[case] fields: &[(&str, FieldDataType)],
#[case] expected_tag_names: &[&str],
#[case] expected_field_names: &[&str],
) {
let catalog = Catalog::new_in_memory("test-host").await.unwrap();
catalog.create_database("test_db").await.unwrap();

let result = catalog
.create_table("test_db", "test_table", tags, fields)
.await;

assert!(
result.is_ok(),
"Failed to create table with names that are valid in escaped line protocol: {result:?}"
);

// Verify the table was created correctly
let db_schema = catalog.db_schema("test_db").unwrap();
let table_def = db_schema.table_definition("test_table").unwrap();

// Verify tag names match expectations
let tag_columns: Vec<_> = table_def
.columns
.resource_iter()
.filter(|col| col.data_type == InfluxColumnType::Tag)
.collect();

assert_eq!(tag_columns.len(), expected_tag_names.len());
for (i, expected_name) in expected_tag_names.iter().enumerate() {
assert_eq!(
tag_columns[i].name.as_ref(),
*expected_name,
"Tag name mismatch at index {i}"
);
}

// Verify field names match expectations
let field_columns: Vec<_> = table_def
.columns
.resource_iter()
.filter(|col| {
!matches!(
col.data_type,
InfluxColumnType::Tag | InfluxColumnType::Timestamp
)
})
.collect();

assert_eq!(field_columns.len(), expected_field_names.len());
for (i, expected_name) in expected_field_names.iter().enumerate() {
assert_eq!(
field_columns[i].name.as_ref(),
*expected_name,
"Field name mismatch at index {i}"
);
}
}
}
33 changes: 33 additions & 0 deletions influxdb3_catalog/src/catalog/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,19 @@ impl DatabaseCatalogTransaction {
if tags.len() + fields.len() > self.columns_per_table_limit - 1 {
return Err(CatalogError::TooManyColumns(self.columns_per_table_limit));
}

// Validate tag names using the same rules as the line protocol parser
for tag in tags {
let tag_name = tag.as_ref();
validate_key(tag_name, "tag")?;
}

// Validate field names using the same rules as the line protocol parser
for (field_name, _) in fields {
let field_name_str = field_name.as_ref();
validate_key(field_name_str, "field")?;
}

let db_schema = Arc::make_mut(&mut self.database_schema);
let mut table_def_arc = db_schema.create_new_empty_table(table_name)?;
let table_def = Arc::make_mut(&mut table_def_arc);
Expand Down Expand Up @@ -1373,3 +1386,23 @@ impl<S, R> Prompt<S, R> {
s
}
}

/// Validate that a key (tag or field) is valid.
/// Keys cannot be empty and cannot contain control characters (ASCII 0x00-0x1F and 0x7F).
/// This ensures compatibility with the line protocol parser.
fn validate_key(key: &str, key_type: &str) -> Result<()> {
if key.is_empty() {
return Err(CatalogError::InvalidConfiguration {
message: format!("{key_type} key cannot be empty").into(),
});
}

// Check for control characters
if key.chars().any(|c| c.is_control()) {
return Err(CatalogError::InvalidConfiguration {
message: format!("{key_type} key cannot contain control characters").into(),
});
}

Ok(())
}