Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[
}
// no topic yet, let's create it
if len(partitions) == 0 {

// (or not)
if m.topicManagerConfig.NoCreate {
return fmt.Errorf("topic does not exist but the manager is configured with NoCreate, so it will not attempt to create it")
}

return m.createTopic(topic,
npar,
rfactor,
Expand Down Expand Up @@ -361,6 +367,10 @@ type TopicManagerConfig struct {
// TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be
// treated
MismatchBehavior TMConfigMismatchBehavior

// If set to true, the topic manager will not attempt to create the topic.
// This can be used if topic creation should be done externally.
NoCreate bool
}

func (tmc *TopicManagerConfig) streamCleanupPolicy() string {
Expand Down
22 changes: 22 additions & 0 deletions topic_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,28 @@ func TestTM_EnsureStreamExists(t *testing.T) {
err := tm.EnsureStreamExists(topic, npar)
require.NoError(t, err)
})
t.Run("no-create", func(t *testing.T) {
tm, bm, ctrl := createTopicManager(t)
defer ctrl.Finish()
var (
topic = "some-topic"
npar = 1
rfactor = 1
)

tm.topicManagerConfig.Stream.Replication = rfactor
tm.topicManagerConfig.Stream.Retention = time.Second
tm.topicManagerConfig.NoCreate = true

bm.client.EXPECT().RefreshMetadata().Return(nil).AnyTimes()

gomock.InOrder(
bm.client.EXPECT().Topics().Return(nil, nil),
)

err := tm.EnsureStreamExists(topic, npar)
require.ErrorContains(t, err, "will not attempt to create it")
})
t.Run("fail", func(t *testing.T) {
tm, bm, ctrl := createTopicManager(t)
defer ctrl.Finish()
Expand Down