Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/prometheus/prompb"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var ErrDiagnosticsValueMissing = errors.New("expected diagnostic value missing")
Expand Down Expand Up @@ -594,6 +595,19 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U

epoch := strings.TrimSpace(r.FormValue("epoch"))

timeFormats := []string{
"rfc3339",
"epoch",
}
// timeFormat should default to "epoch"
timeFormat := strings.TrimSpace(r.FormValue("time_format"))
if timeFormat == "" {
timeFormat = "epoch"
} else if !slices.Contains(timeFormats, timeFormat) {
h.httpError(rw, fmt.Sprintf("Time format must be one of the following: %s", strings.Join(timeFormats, ",")), http.StatusBadRequest)
return
}

p := influxql.NewParser(qr)
db := r.FormValue("db")

Expand Down Expand Up @@ -747,8 +761,12 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
}

// if requested, convert result timestamps to epoch
if epoch != "" {
if epoch != "" && timeFormat == "epoch" {
convertToEpoch(r, epoch)
} else if timeFormat == "rfc3339" {
if err := convertToTimeFormat(r, time.RFC3339Nano); err != nil {
h.httpError(rw, fmt.Sprintf("error converting time to RFC3339Nano: %s", err.Error()), http.StatusBadRequest)
}
}

// Write out result immediately if chunked.
Expand Down Expand Up @@ -1812,6 +1830,23 @@ func convertToEpoch(r *query.Result, epoch string) {
}
}

func convertToTimeFormat(r *query.Result, format string) error {
for _, s := range r.Series {
for _, v := range s.Values {
switch format {
case time.RFC3339Nano:
if ts, ok := v[0].(time.Time); ok {
v[0] = ts.Format(time.RFC3339Nano)
}
default:
return fmt.Errorf("unknown time format: %s", format)
}
}
}

return nil
}

// servePromWrite receives data in the Prometheus remote write protocol and writes it
// to the database
func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
Expand Down
78 changes: 78 additions & 0 deletions services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/stretchr/testify/require"
"io"
"log"
"math"
Expand Down Expand Up @@ -596,6 +597,83 @@ func TestHandler_Query_CloseNotify(t *testing.T) {
}
}

// Ensure the handler returns results with RFC3339 timestamp format when requested.
func TestHandler_Query_RFC3339(t *testing.T) {
testTime1 := time.Date(2021, 1, 1, 12, 0, 0, 0, time.UTC)
testTime2 := time.Date(2021, 1, 2, 12, 0, 0, 0, time.UTC)
testTimeNano := time.Date(2021, 1, 1, 12, 0, 0, 123456789, time.UTC)

tests := []struct {
name string
series []*models.Row
expectedResult string
}{
{
name: "single series",
series: []*models.Row{{
Name: "series0",
Columns: []string{"time", "value"},
Values: [][]interface{}{
{testTime1, 42},
},
}},
expectedResult: fmt.Sprintf(`{"results":[{"statement_id":1,"series":[{"name":"series0","columns":["time","value"],"values":[["%s",42]]}]}]}`, testTime1.Format(time.RFC3339Nano)),
},
{
name: "multiple series",
series: []*models.Row{
{
Name: "series0",
Columns: []string{"time", "value"},
Values: [][]interface{}{
{testTime1, 42},
{testTime2, 43},
},
},
{
Name: "series1",
Columns: []string{"time", "value"},
Values: [][]interface{}{
{testTime1, 100},
},
},
},
expectedResult: fmt.Sprintf(`{"results":[{"statement_id":1,"series":[{"name":"series0","columns":["time","value"],"values":[["%s",42],["%s",43]]},{"name":"series1","columns":["time","value"],"values":[["%s",100]]}]}]}`, testTime1.Format(time.RFC3339Nano), testTime2.Format(time.RFC3339Nano), testTime1.Format(time.RFC3339Nano)),
},
{
name: "nanosecond precision",
series: []*models.Row{{
Name: "series0",
Columns: []string{"time", "value"},
Values: [][]interface{}{
{testTimeNano, 42},
},
}},
expectedResult: fmt.Sprintf(`{"results":[{"statement_id":1,"series":[{"name":"series0","columns":["time","value"],"values":[["%s",42]]}]}]}`, testTimeNano.Format(time.RFC3339Nano)),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
ctx.Results <- &query.Result{
StatementID: 1,
Series: tt.series,
}
return nil
}

w := httptest.NewRecorder()
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&time_format=rfc3339", nil))
require.Equal(t, http.StatusOK, w.Code, "response status")

body := strings.TrimSpace(w.Body.String())
require.Equal(t, tt.expectedResult, body, "response body")
})
}
}

// Ensure the handler returns an appropriate 401 status when authentication
// fails on ping endpoints.
func TestHandler_Ping_ErrAuthorize(t *testing.T) {
Expand Down