Ingestion microservice#528
Conversation
doctor-eval
left a comment
There was a problem hiding this comment.
I don't think this code runs, but I assume you've run it so I'm a bit confused! See inline comments. Just stuck to main.go for this one.
| continue | ||
| } | ||
| if fetches.IsClientClosed() { | ||
| return |
There was a problem hiding this comment.
you probably want to do os.Exit(1) here since the application will be existing with an error. Or - consistent with the rest of the code - do panic(errors.New("client is closed"))
| } | ||
|
|
||
| // Execute the batch insert statement with the values slice | ||
| _, err = stmt.Exec(values...) |
There was a problem hiding this comment.
It would surprise me if this works, unless I'm missing something. AFAIK you can't pass multiple copies of arbitrary values to the insert. It feels like this code is held over from the Copy code.
If you do insert into x (a) values ($1) then you can only pass one argument to Exec (being the value of a).
| } | ||
| event.CustomerID = nil | ||
| event.InsertedAt = time.Now() | ||
| events = append(events, event) |
There was a problem hiding this comment.
I don't think there's any benefit to batching these records. It will use memory but not improve performance.
As a first approximation, I'd recommend starting the database transaction within this loop, and simply executing the insert for each row, because regardless of how you do it, you still end up sending a command to the database for every row you get from Kafka. You can't do it in a single Exec.
If you really wanted to do this in a single operation then you could write your own stored procedure. But I would want to do some microbenchmarking before adding that complexity, because I doubt it will be faster. I'd recommend getting it working using regular inserts and see how you go. The performance will probably surprise you.
| if len(events) >= batchSize { | ||
| err = insertEvents(stmt, events, db, ctx) | ||
| if err != nil { | ||
| fmt.Printf("Error inserting events into database: %s\n", err) |
There was a problem hiding this comment.
As noted above (and below), you probably want to do the individual inserts before this check, and in this check all you need to do is a tx.Commit().
However, if the commit fails, you want to panic here - because if you don't, you'll lose data (the Kafka records will be committed). As long as your ingress table is simple, the only reason you'll get an error here is if something really bad happens.
There was a problem hiding this comment.
Wouldn't a simple return work better than a panic? that way we can keep running. That's what we do on the other one
| } | ||
| events = make([]*Event, 0) | ||
| if err := cl.CommitUncommittedOffsets(context.Background()); err != nil { | ||
| fmt.Printf("commit records failed: %v", err) |
There was a problem hiding this comment.
Definitely want to panic here. If you can't commit to Kafka you have some serious problems.
| } | ||
| } | ||
|
|
||
| func insertEvents(stmt *sql.Stmt, events []*Event, db *sql.DB, ctx context.Context) error { |
There was a problem hiding this comment.
As noted above, batching the records is unnecessary. You can do it but I don't think it adds anything, and probably uses more memory. Also, I don't see how the code works, but I'm not super confident since I guess you have actually run it?
In terms of batching, if you want to be a bit fancy, you could encapsulate the batching code in a type, like this, which would hide the actual mechanism away from the main loop, and let you change the algo later. This isn't so much a recommendation as just a suggestion in case you want to try a few different things:
type batch struct {
tx db.Tx;
insertStmt db.Stmt;
count int;
}
func (b *batch) addRecord(r *record) err {
err := insertStmt.Exec(r.a, r.b, ...)
if err != nil {
return err
}
count++
if count < 1000 {
return nil;
}
return tx.Commit()
}
doctor-eval
left a comment
There was a problem hiding this comment.
looks pretty good, suspect you've forgotten to do a Kafka commit when you do a batch commit (which won't cause any problems really) - but I'm fairly sure there is a nasty heisenbug near the end of the main consumer loop, which I've commented on.
The only other issue is that you're continuing when i think most of the time you should panic. happy to discuss further.
| os.Setenv("POSTGRES_DB", "lotus") | ||
| } | ||
|
|
||
| dbURL = fmt.Sprintf("postgres://%s:%s@%s:5432/%s?sslmode=disable", os.Getenv("POSTGRES_USER"), os.Getenv("POSTGRES_PASSWORD"), host, os.Getenv("POSTGRES_DB")) |
There was a problem hiding this comment.
pretty sure you don't need to do this; libpq will use the environment variables directly if they are set.
There was a problem hiding this comment.
Not sure about this, when I was testing it yesterday it was freaking out a little if I didn't specify stuff explicitly. I'll leave it in for now and take it out later if necessary
|
|
||
| tx, err := db.Begin() | ||
| if err != nil { | ||
| fmt.Printf("Error starting transaction: %s\n", err) |
There was a problem hiding this comment.
The only reason that db.Begin() will fail is either bad config (PGxxx), a network error, or some other catastrophic condition. By printing an error and continuing, you are effectively discarding the contents of Kafka, which you probably don't want.
There is a technique called "let it fail" that applies here. If you panic, Docker can attempt to restart the process - using a backoff algorithm - and whatever caused it to fail will either resolve, or you will get alerts (if not from monitoring then from customers). But at least you haven't lost data. (You might want to look at the restart policy in compose if you haven't set it already.)
There's an article about "let it fail" here if you're interested. Article is about Erlang but also applies to an orchestrated microservices architecture like docker compose. https://yiming.dev/blog/2022/07/10/how-let-it-fail-leads-to-simpler-code/
| var streamEvents StreamEvents | ||
| err := json.Unmarshal(r.Value, &streamEvents) | ||
| if err != nil { | ||
| fmt.Printf("Error unmarshalling event: %s\n", err) |
There was a problem hiding this comment.
If this errors out then there is something wrong with the JSON. What I would typically do in this case is produce the message onto some kind of "bad message" topic, so that at least you have something to look at when the customer calls to complain. But my guess is that the JSON is checked on the front end, which means this error actually should not happen. So if it does happen, something bad is wrong and I would probably still panic.
| if len(streamEvents.Events) > 0 { | ||
| streamEvents.Event = &streamEvents.Events[0] | ||
| } else { | ||
| fmt.Println("Error: both event and events fields are missing from stream_events") |
There was a problem hiding this comment.
See above. Dropping messages is a bad idea, I recommend copy it into a dead message queue and then continue.
There was a problem hiding this comment.
This is actually a quirk around how I first implemented the messages. I was first putting the entire input batch into Kafka as one instead of sending them one by one. So for backwards compat I'm accepting Events too, will be removed soon though once there's no more messages like that in Kafka
There was a problem hiding this comment.
@meehawk for the edge service, i would now switch over to just using the Event field instead of Events. Believe all the ones with the old format have been cleaned out
type StreamEvents struct {
Events *[]Event json:"events"
OrganizationID int64 json:"organization_id"
Event *Event json:"event"
}
should now not include the events
| event.InsertedAt = time.Now() | ||
|
|
||
| if err := batch.addRecord(event); err != nil { | ||
| fmt.Printf("Error inserting event: %s\n", err) |
There was a problem hiding this comment.
Looking at the code for batch, I would definitely panic in this case. AFAICS the only thing that can go wrong in batch is either bugs in your code or a serious database failure/network partition of some kind. Because the usual referential integrity issues are already dealt with (on conflict do nothing), all that's left is bad stuff.
If you panic then the application goes back to a known state and can have another crack at it.
|
|
||
| if batch.count > 0 { | ||
| if err := tx.Commit(); err != nil { | ||
| fmt.Printf("Error inserting events into database: %s\n", err) |
There was a problem hiding this comment.
A failure here should panic like the others. A failure on commit is basically only going to happen if something is Horribly Wrong™, and it's certainly not because of input data or a bug in your code.
| fmt.Printf("Error inserting events into database: %s\n", err) | ||
| return | ||
| } | ||
| if err := cl.CommitUncommittedOffsets(context.Background()); err != nil { |
There was a problem hiding this comment.
I think this should be outside of the if batch.count > 0 conditional. Because if you get an exact multiple of the batch size then the count will be zero but AFAICS you haven't committed the Kafka consumer other than in this line of code.
Co-authored-by: doctor-eval <[email protected]>
|
|
||
| type batch struct { | ||
| tx *sql.Tx | ||
| insertStmt *sql.Stmt |
There was a problem hiding this comment.
[nit] but can we name this in full, as in insertStatement. What do you think @diego-escobedo though I could make an arguement to name it inline with sql.Stmt
|
|
||
| func main() { | ||
| var kafkaURL string | ||
| if kafkaURL = os.Getenv("KAFKA_URL"); kafkaURL == "" { |
There was a problem hiding this comment.
We should probably use some library like https://github.com/spf13/viper so that it is easier to define and share configs than read from env but love some thoughts!
There was a problem hiding this comment.
nice suggestion!
| kgo.ConsumeTopics(kafkaTopic), | ||
| kgo.DisableAutoCommit(), | ||
| ) | ||
| if err != nil { |
There was a problem hiding this comment.
[nit] can we add a space here
| defer cl.Close() | ||
|
|
||
| var dbURL string | ||
| if dbURL = os.Getenv("DATABASE_URL"); dbURL == "" { |
There was a problem hiding this comment.
Using https://github.com/spf13/viper would make it easier to define these values in a single place. Also wondering if it might make sense to separate the logic to read from config do a different module/getting function like config.go
| insertStmt: insertStmt, | ||
| } | ||
|
|
||
| fetches.EachRecord(func(r *kgo.Record) { |
There was a problem hiding this comment.
Very minor nit but I think the logic in main.go to be further broken down which might also make it easier to unit test different pieces.
satya-nutella
left a comment
There was a problem hiding this comment.
I left some comments because I was going over the codebase to understand what it is doing 🙈 but feel free to mark them as resolved if they dont seem high priority since I dont have full context here
Resolves LOT-524