-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathhandler.go
More file actions
145 lines (113 loc) · 4.5 KB
/
handler.go
File metadata and controls
145 lines (113 loc) · 4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"context"
"fmt"
"time"
kafka "github.com/kanapuli/mcp-kafka/kafka"
mcp_golang "github.com/metoro-io/mcp-golang"
"go.uber.org/zap"
)
// KafkaHandler is a struct that handles Kafka operations for the mcp-kafka tool
type KafkaHandler struct {
Client *kafka.Client
Logger *zap.SugaredLogger
}
// CreateTopic creates a new Kafka topic
// Optional parameters that can be passed via FuncArgs are:
// - NumPartitions: number of partitions for the topic
// - ReplicationFactor: replication factor for the topic
func (k *KafkaHandler) CreateTopic(ctx context.Context, req Request) (*mcp_golang.ToolResponse, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
if err := k.Client.CreateTopic(req.Topic, req.NumPartitions, req.ReplicationFactor); err != nil {
return nil, err
}
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(fmt.Sprintf("Topic %s is created", req.Topic))), nil
}
// DeleteTopic deletes an existing Kafka topic
func (k *KafkaHandler) DeleteTopic(ctx context.Context, req Request) (*mcp_golang.ToolResponse, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
if err := k.Client.DeleteTopic(req.Topic); err != nil {
return nil, err
}
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(fmt.Sprintf("Topic %s is deleted", req.Topic))), nil
}
// ListTopics lists all existing Kafka topics
func (k *KafkaHandler) ListTopics(ctx context.Context, req Request) (*mcp_golang.ToolResponse, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
topics, err := k.Client.ListTopics()
if err != nil {
return nil, err
}
i := 1
response := fmt.Sprintf("Hey %s, Format the following details in a nice table format or in a json format\n", req.Submitter)
response += "Available Topics\n"
for k, v := range topics {
response += fmt.Sprintf("%d - Name: %s, Replication Factor: %d, Partitions: %d\n", i, k, v.ReplicationFactor, v.NumPartitions)
}
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(response)), nil
}
// DescribeTopic describes the specified topic
func (k *KafkaHandler) DescribeTopic(ctx context.Context, req Request) (*mcp_golang.ToolResponse, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
details, err := k.Client.DescribeTopic(req.Topic)
if err != nil {
return nil, err
}
response := fmt.Sprintf("Hey %s, Format the following details in a nice table format or in a json format\n", req.Submitter)
for _, d := range details {
response += fmt.Sprintf("Topic Name: %s\n", d.Name)
if d.Err != 0 {
response += fmt.Sprintf("Topic Error: %s\n", d.Err)
}
switch d.IsInternal {
case true:
response += fmt.Sprintf("Topic is an internal\n")
case false:
response += fmt.Sprintf("Topic is not internal\n")
}
response += fmt.Sprintf("## Topic Partition Details")
for i, partition := range d.Partitions {
response += fmt.Sprintf("%d) Partition ID: %d\n", i+1, partition.ID)
response += fmt.Sprintf("Partition Leader Broker ID: %d\n", partition.Leader)
response += fmt.Sprintf("Partition Leader Epoch: %d\n", partition.LeaderEpoch)
response += fmt.Sprintf("Partition Replicas: %v\n", partition.Replicas)
response += fmt.Sprintf("Partition Errors: %v\n", partition.Err)
response += fmt.Sprintf("Insync replicas: %v\n", partition.Isr)
response += fmt.Sprintf("Offline Replicas: %v\n", partition.OfflineReplicas)
}
}
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(response)), nil
}
// Produce sends a message to the Kafka topic.
func (k *KafkaHandler) Produce(ctx context.Context, req Request) (*mcp_golang.ToolResponse, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
response, err := k.Client.Produce(req.Topic, []byte(req.ProduceMessageKey), []byte(req.ProduceMessageValue), req.ProduceMessageHeaders)
if err != nil {
return nil, err
}
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(response)), nil
}
func (k *KafkaHandler) Consume(ctx context.Context, req Request) (*mcp_golang.ToolResponse, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
response, err := k.Client.SimpleConsume([]string{req.Topic}, time.Duration(req.ConsumerTimeout*int(time.Second)))
if err != nil {
return nil, err
}
consumedMessages := fmt.Sprintf("Hey %s, Here are the consumed messages for the topic: %v. Format it in a very presentable way\n", req.Submitter, req.Topic)
for i, msg := range response {
consumedMessages += fmt.Sprintf("%d. %s\n", i, msg)
}
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(consumedMessages)), nil
}