Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ COMMIT?=HEAD
MODSET?=contrib-core
[email protected]:open-telemetry/opentelemetry-collector-contrib.git
.PHONY: push-tags
push-tags: $(MULITMOD)
push-tags: $(MULITMOD)
$(MULITMOD) verify
set -e; for tag in `$(MULITMOD) tag -m ${MODSET} -c ${COMMIT} --print-tags | grep -v "Using" `; do \
echo "pushing tag $${tag}"; \
Expand Down
3 changes: 2 additions & 1 deletion cmd/otelcontribcol/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ ARG USER_UID=10001
USER ${USER_UID}

COPY --from=prep /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY otelcontribcol /
COPY otelcontribcol /otelcol-contrib
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

their Dockerfiles are confusing... the helm chart was expecting to run the command /otelcol-contrib but the binary that gets built and copied into the image is /otelcontribcol?

EXPOSE 4317 55680 55679

ENTRYPOINT ["/otelcontribcol"]
CMD ["--config", "/etc/otel/config.yaml"]
3 changes: 2 additions & 1 deletion cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ processors:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.74.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor v0.74.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.74.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor v0.74.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricsgenerationprocessor v0.74.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor v0.74.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.74.0
Expand Down Expand Up @@ -360,6 +361,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor => ../../processor/datadogprocessor
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sshcheckreceiver => ../../receiver/sshcheckreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver => ../../receiver/datadogreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor => ../../processor/logstransformprocessor
# see https://github.com/mattn/go-ieproxy/issues/45
- github.com/mattn/go-ieproxy => github.com/mattn/go-ieproxy v0.0.1
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
Expand All @@ -368,4 +370,3 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector => ../../connector/spanmetricsconnector
- github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37
- github.com/outcaste-io/ristretto v0.2.0 => github.com/outcaste-io/ristretto v0.2.1

2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.74.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor v0.74.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.74.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor v0.74.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricsgenerationprocessor v0.74.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor v0.74.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.74.0
Expand Down Expand Up @@ -997,6 +998,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sshch

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver => ../../receiver/datadogreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor => ../../processor/logstransformprocessor

replace github.com/mattn/go-ieproxy => github.com/mattn/go-ieproxy v0.0.1

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
Expand Down
129 changes: 80 additions & 49 deletions exporter/clickhouseexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"context"
"database/sql"
"fmt"
"net/url"
"strings"
"time"

_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -39,7 +41,7 @@ type logsExporter struct {
}

func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
client, err := newClickhouseClient(cfg)
client, err := newClickHouseConn(cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -73,28 +75,40 @@ func (e *logsExporter) shutdown(_ context.Context) error {

func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
start := time.Now()
err := doWithTx(ctx, e.client, func(tx *sql.Tx) error {
statement, err := tx.PrepareContext(ctx, e.insertSQL)
err := func() error {
scope, err := e.client.Begin()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is apparently the way to do batch inserts using the database/sql interface of the clickhouse driver https://clickhouse.com/docs/en/integrations/go/clickhouse-go/database-sql-api#batch-insert

if err != nil {
return fmt.Errorf("PrepareContext:%w", err)
return fmt.Errorf("Begin:%w", err)
}
defer func() {
_ = statement.Close()
}()

batch, err := scope.Prepare(e.insertSQL)
if err != nil {
return fmt.Errorf("Prepare:%w", err)
}

var serviceName string
for i := 0; i < ld.ResourceLogs().Len(); i++ {
logs := ld.ResourceLogs().At(i)
resAttr := make(map[string]string)

resourceLogs := ld.ResourceLogs()
for i := 0; i < resourceLogs.Len(); i++ {
logs := resourceLogs.At(i)
res := logs.Resource()
resAttr := attributesToMap(res.Attributes())
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {

attrs := res.Attributes()
attributesToMap(attrs, resAttr)

if v, ok := attrs.Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
for j := 0; j < logs.ScopeLogs().Len(); j++ {
rs := logs.ScopeLogs().At(j).LogRecords()
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
logAttr := attributesToMap(r.Attributes())
_, err = statement.ExecContext(ctx,

logAttr := make(map[string]string, attrs.Len())
attributesToMap(r.Attributes(), logAttr)

_, err = batch.Exec(
r.Timestamp().AsTime(),
traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
Expand All @@ -107,26 +121,31 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
logAttr,
)
if err != nil {
return fmt.Errorf("ExecContext:%w", err)
return fmt.Errorf("Append:%w", err)
}
}
}

// clear map for reuse
for k := range resAttr {
delete(resAttr, k)
}
}
return nil
})

return scope.Commit()
}()

duration := time.Since(start)
e.logger.Info("insert logs", zap.Int("records", ld.LogRecordCount()),
zap.String("cost", duration.String()))
return err
}

func attributesToMap(attributes pcommon.Map) map[string]string {
m := make(map[string]string, attributes.Len())
func attributesToMap(attributes pcommon.Map, dest map[string]string) {
attributes.Range(func(k string, v pcommon.Value) bool {
m[k] = v.AsString()
dest[k] = v.AsString()
return true
})
return m
}

const (
Expand Down Expand Up @@ -155,6 +174,7 @@ PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId)
SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
`

// language=ClickHouse SQL
insertLogsSQLTemplate = `INSERT INTO %s (
Timestamp,
Expand All @@ -167,31 +187,56 @@ SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
Body,
ResourceAttributes,
LogAttributes
) VALUES (
?,
?,
?,
?,
?,
?,
?,
?,
?,
?
)`
)`
)

var driverName = "clickhouse" // for testing

// newClickhouseClient create a clickhouse client.
func newClickhouseClient(cfg *Config) (*sql.DB, error) {
// newClickHouseClient create a clickhouse client.
// used by metrics and traces:
func newClickHouseClient(cfg *Config) (*sql.DB, error) {
db, err := cfg.buildDB(cfg.Database)
if err != nil {
return nil, err
}
return db, nil
}

// used by logs:
func newClickHouseConn(cfg *Config) (*sql.DB, error) {
endpoint := cfg.Endpoint

if len(cfg.ConnectionParams) > 0 {
values := make(url.Values, len(cfg.ConnectionParams))
for k, v := range cfg.ConnectionParams {
values.Add(k, v)
}

if !strings.Contains(endpoint, "?") {
endpoint += "?"
} else if !strings.HasSuffix(endpoint, "&") {
endpoint += "&"
}

endpoint += values.Encode()
}

opts, err := clickhouse.ParseDSN(endpoint)
if err != nil {
return nil, fmt.Errorf("unable to parse endpoint: %w", err)
}

opts.Auth = clickhouse.Auth{
Database: cfg.Database,
Username: cfg.Username,
Password: cfg.Password,
}

// can return a "bad" connection if misconfigured, we won't know
// until a Ping, Exec, etc.. is done
return clickhouse.OpenDB(opts), nil
}

func createDatabase(ctx context.Context, cfg *Config) error {
// use default database to create new database
if cfg.Database == defaultDatabase {
Expand Down Expand Up @@ -231,17 +276,3 @@ func renderCreateLogsTableSQL(cfg *Config) string {
func renderInsertLogsSQL(cfg *Config) string {
return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName)
}

func doWithTx(_ context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was not the proper way, they were using a tx but they were using an insert per row which is not correct, it's one prepared insert and then you append each row to the batch using Exec()

tx, err := db.Begin()
if err != nil {
return fmt.Errorf("db.Begin: %w", err)
}
defer func() {
_ = tx.Rollback()
}()
if err := fn(tx); err != nil {
return err
}
return tx.Commit()
}
23 changes: 20 additions & 3 deletions exporter/clickhouseexporter/exporter_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestLogsExporter_New(t *testing.T) {
}{
"no dsn": {
config: withDefaultConfig(),
want: failWithMsg("exec create logs table sql: parse dsn address failed"),
want: failWithMsg("parse dsn address failed"),
},
}

Expand All @@ -90,6 +90,7 @@ func TestExporter_pushLogsData(t *testing.T) {
t.Run("push success", func(t *testing.T) {
var items int
initClickhouseTestServer(t, func(query string, values []driver.Value) error {
t.Logf(query)
t.Logf("%d, values:%+v", items, values)
if strings.HasPrefix(query, "INSERT") {
items++
Expand All @@ -106,7 +107,12 @@ func TestExporter_pushLogsData(t *testing.T) {
}

func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter {
exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn))
cfg := withTestExporterConfig(fns...)(dsn)
exporter, err := newLogsExporter(zaptest.NewLogger(t), cfg)
require.NoError(t, err)

// need to use the dummy driver driver for testing
exporter.client, err = newClickHouseClient(cfg)
require.NoError(t, err)
require.NoError(t, exporter.start(context.TODO(), nil))

Expand Down Expand Up @@ -194,7 +200,18 @@ func (*testClickhouseDriverStmt) Close() error {
}

func (t *testClickhouseDriverStmt) NumInput() int {
return strings.Count(t.query, "?")
if !strings.HasPrefix(t.query, `INSERT`) {
return 0
}

n := strings.Count(t.query, "?")
if n > 0 {
return n
}

// no ? in batched queries but column are separated with ","
// except for the last one
return strings.Count(t.query, ",") + 1
}

func (t *testClickhouseDriverStmt) Exec(args []driver.Value) (driver.Result, error) {
Expand Down
8 changes: 6 additions & 2 deletions exporter/clickhouseexporter/exporter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type metricsExporter struct {
}

func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) {
client, err := newClickhouseClient(cfg)
client, err := newClickHouseClient(cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -73,7 +73,11 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
metaData := internal.MetricsMetaData{}
metrics := md.ResourceMetrics().At(i)
res := metrics.Resource()
metaData.ResAttr = attributesToMap(res.Attributes())

resAttr := res.Attributes()
attr := make(map[string]string, resAttr.Len())
attributesToMap(resAttr, attr)
metaData.ResAttr = attr
metaData.ResURL = metrics.SchemaUrl()
for j := 0; j < metrics.ScopeMetrics().Len(); j++ {
rs := metrics.ScopeMetrics().At(j).Metrics()
Expand Down
Loading