Skip to content

Commit 39f57a0

Browse files
Support --delete option for reset-offsets command
1 parent 8ad90a7 commit 39f57a0

File tree

4 files changed

+232
-73
lines changed

4 files changed

+232
-73
lines changed

README.md

+40-4
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ docker-compose up -d
7777
3. Apply the topic configs in [`examples/local-cluster/topics`](/examples/local-cluster/topics):
7878

7979
```
80-
topicctl apply --skip-confirm examples/local-cluster/topics/*yaml
80+
topicctl apply --skip-confirm examples/local-cluster/topics/*.yaml
8181
```
8282

8383
4. Send some test messages to the `topic-default` topic:
@@ -225,13 +225,49 @@ subcommands interactively.
225225
topicctl reset-offsets [topic] [group] [flags]
226226
```
227227

228-
The `reset-offsets` subcommand allows resetting the offsets for a consumer group in a topic. There are 2 main approaches for setting the offsets:
228+
The `reset-offsets` subcommand allows resetting the offsets
229+
for a consumer group in a topic.
230+
There are a few typical approaches for setting the offsets:
229231

230-
1. Use a combination of `--partitions`, `--offset`, `--to-earliest` and `--to-latest` flags. `--partitions` flag specifies a list of partitions to be reset e.g. `1,2,3 ...`. If not used, the command defaults to resetting consumer group offsets for ALL of the partitions. `--offset` flag indicates the specific value that all desired consumer group partitions will be set to. If not set, it will default to -2. Finally, `--to-earliest` flag resets offsets of consumer group members to earliest offsets of partitions while `--to-latest` resets offsets of consumer group members to latest offsets of partitions. However, only one of the `--to-earliest`, `--to-latest` and `--offset` flags can be used at a time. This approach is easy to use but lacks the ability for detailed offset configuration.
232+
1. Use `--partitions` and combine it with one of the offset operators:
233+
`--delete`, `--offset`, `--to-earliest` or `--to-latest`.
234+
2. Use `--partition-offset-map` to pass specific offsets per partition.
235+
For example, `1=5,2=10` means that the consumer group offset
236+
for partition 1 must be set to 5, and partition 2 to offset 10.
237+
This is mainly used for replays of specific traffic,
238+
such as when a deploy has mishandled or corrupted state,
239+
and the prior release must be rerun
240+
starting at a specific offset per partition.
241+
This is the most flexible approach for offset setting.
231242

232-
2. Use `--partition-offset-map` flag to specify a detailed offset configuration for individual partitions. For example, `1=5,2=10,7=12,...` means that the consumer group offset for partition 1 must be set to 5, partition 2 to offset 10, partition 7 to offset 12 and so on. This approach provides greater flexibility and fine-grained control for this operation. Note that `--partition-offset-map` flag is standalone and cannot be coupled with any of the previous flags.
243+
Note that `--partition-offset-map` flag is standalone
244+
and cannot be coupled with other flags.
233245

246+
##### Partition selection flags
234247

248+
At most one of the following may be selected:
249+
250+
* `--partitions` specifies a comma-separated list of partitions IDs.
251+
252+
If none of these are specified,
253+
the command defaults to selecting ALL of the partitions.
254+
255+
##### Offset selection flags
256+
257+
At most one of the following may be selected:
258+
259+
* `--delete` removes stored group offsets.
260+
This will generally have the same effect as `--to-earliest` or `--to-latest`,
261+
depending on the consumer group configuration.
262+
However, `--delete` is more reliable and convenient,
263+
since `--to-earliest` in particular involves a race with message retention
264+
that may require numerous attempts.
265+
* `--offset` indicates the specific value that all selected
266+
consumer group partitions will be set to.
267+
* `--to-earliest` resets group offsets to oldest still-retained per partition.
268+
* `--to-latest` resets group offsets to newest per partitions.
269+
270+
If none of these are specified, `--to-earliest` will be the default.
235271

236272
#### tail
237273

cmd/topicctl/subcmd/reset.go

+136-69
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,19 @@ import (
66
"fmt"
77
"strconv"
88

9+
log "github.com/sirupsen/logrus"
10+
"github.com/spf13/cobra"
11+
12+
"github.com/segmentio/topicctl/pkg/admin"
913
"github.com/segmentio/topicctl/pkg/cli"
1014
"github.com/segmentio/topicctl/pkg/groups"
1115
"github.com/segmentio/topicctl/pkg/util"
12-
log "github.com/sirupsen/logrus"
13-
"github.com/spf13/cobra"
1416
)
1517

1618
var resetOffsetsCmd = &cobra.Command{
17-
Use: "reset-offsets [topic name] [group name]",
19+
Use: "reset-offsets <topic-name> <group-name>",
1820
Short: "reset consumer group offsets",
19-
Args: cobra.MinimumNArgs(2),
21+
Args: cobra.ExactArgs(2),
2022
PreRunE: resetOffsetsPreRun,
2123
RunE: resetOffsetsRun,
2224
}
@@ -27,6 +29,7 @@ type resetOffsetsCmdConfig struct {
2729
partitionOffsetMap map[string]int64
2830
toEarliest bool
2931
toLatest bool
32+
delete bool
3033

3134
shared sharedOptions
3235
}
@@ -62,121 +65,185 @@ func init() {
6265
"to-latest",
6366
false,
6467
"Resets offsets of consumer group members to latest offsets of partitions")
68+
resetOffsetsCmd.Flags().BoolVar(
69+
&resetOffsetsConfig.delete,
70+
"delete",
71+
false,
72+
"Deletes offsets for the given consumer group")
6573

6674
addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared)
6775
RootCmd.AddCommand(resetOffsetsCmd)
6876
}
6977

7078
func resetOffsetsPreRun(cmd *cobra.Command, args []string) error {
71-
resetOffsetSpecification := "You must choose only one of the following reset-offset specifications: --to-earliest, --to-latest, --offset."
72-
offsetMapSpecification := "--partition-offset-map option cannot be coupled with any of the following options: --partitions, --to-earliest, --to-latest, --offset."
73-
74-
if len(resetOffsetsConfig.partitionOffsetMap) > 0 && (cmd.Flags().Changed("offset") ||
75-
len(resetOffsetsConfig.partitions) > 0 ||
76-
resetOffsetsConfig.toEarliest ||
77-
resetOffsetsConfig.toLatest) {
78-
return errors.New(offsetMapSpecification)
79+
resetOffsetSpec := "You must choose only one of the following " +
80+
"reset-offset specifications: --delete, --to-earliest, --to-latest, " +
81+
"--offset, or --partition-offset-map."
82+
offsetMapSpec := "--partition-offset-map option cannot be used with --partitions."
83+
84+
cfg := resetOffsetsConfig
85+
86+
numOffsetSpecs := numTrue(
87+
cfg.toEarliest,
88+
cfg.toLatest,
89+
cfg.delete,
90+
cmd.Flags().Changed("offset"),
91+
len(cfg.partitionOffsetMap) > 0,
92+
)
7993

80-
} else if resetOffsetsConfig.toEarliest && resetOffsetsConfig.toLatest {
81-
return errors.New(resetOffsetSpecification)
94+
if numOffsetSpecs > 1 {
95+
return errors.New(resetOffsetSpec)
96+
}
8297

83-
} else if cmd.Flags().Changed("offset") && (resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest) {
84-
return errors.New(resetOffsetSpecification)
98+
if len(cfg.partitionOffsetMap) > 0 && len(cfg.partitions) > 0 {
99+
return errors.New(offsetMapSpec)
85100
}
86-
return resetOffsetsConfig.shared.validate()
101+
102+
return cfg.shared.validate()
87103
}
88104

89105
func resetOffsetsRun(cmd *cobra.Command, args []string) error {
90106
ctx, cancel := context.WithCancel(context.Background())
91107
defer cancel()
92108

93-
adminClient, err := resetOffsetsConfig.shared.getAdminClient(ctx, nil, true)
109+
cfg := resetOffsetsConfig
110+
111+
adminClient, err := cfg.shared.getAdminClient(ctx, nil, true)
94112
if err != nil {
95113
return err
96114
}
115+
97116
defer adminClient.Close()
98117

118+
connector := adminClient.GetConnector()
119+
99120
topic := args[0]
100121
group := args[1]
101122

102123
topicInfo, err := adminClient.GetTopic(ctx, topic, false)
103124
if err != nil {
104125
return err
105126
}
106-
partitionIDsMap := map[int]struct{}{}
127+
128+
partitionIDsMap := make(map[int]struct{}, len(topicInfo.Partitions))
107129
for _, partitionInfo := range topicInfo.Partitions {
108130
partitionIDsMap[partitionInfo.ID] = struct{}{}
109131
}
110-
var resetOffsetsStrategy string
111-
if resetOffsetsConfig.toLatest {
112-
resetOffsetsStrategy = groups.LatestResetOffsetsStrategy
113-
} else if resetOffsetsConfig.toEarliest {
114-
resetOffsetsStrategy = groups.EarliestResetOffsetsStrategy
132+
133+
var strategy string
134+
135+
switch {
136+
case resetOffsetsConfig.toLatest:
137+
strategy = groups.LatestResetOffsetsStrategy
138+
case resetOffsetsConfig.toEarliest:
139+
strategy = groups.EarliestResetOffsetsStrategy
115140
}
116-
partitionOffsets := map[int]int64{}
117141

118-
if len(resetOffsetsConfig.partitionOffsetMap) > 0 {
119-
for partition, offset := range resetOffsetsConfig.partitionOffsetMap {
120-
var partitionID int
121-
if partitionID, err = strconv.Atoi(partition); err != nil {
122-
return fmt.Errorf("Partition value %s must be a number", partition)
123-
}
124-
if _, ok := partitionIDsMap[partitionID]; !ok {
125-
return fmt.Errorf("Partition %d not found in topic %s", partitionID, topic)
126-
}
142+
// If explicit per-partition offsets were specified, set them now.
143+
partitionOffsets, err := parsePartitionOffsetMap(partitionIDsMap, cfg.partitionOffsetMap)
144+
if err != nil {
145+
return err
146+
}
127147

128-
partitionOffsets[partitionID] = offset
148+
// Set explicit partitions (without offsets) if specified,
149+
// otherwise operate on fetched partition info;
150+
// these will only take effect of per-partition offsets were not specified.
151+
partitions := cfg.partitions
152+
if len(partitions) == 0 && len(partitionOffsets) == 0 {
153+
convert := func(info admin.PartitionInfo) int { return info.ID }
154+
partitions = convertSlice(topicInfo.Partitions, convert)
155+
}
129156

157+
for _, partition := range partitions {
158+
_, ok := partitionIDsMap[partition]
159+
if !ok {
160+
format := "Partition %d not found in topic %s"
161+
return fmt.Errorf(format, partition, topic)
130162
}
131163

132-
} else if len(resetOffsetsConfig.partitions) > 0 {
133-
for _, partition := range resetOffsetsConfig.partitions {
134-
if _, ok := partitionIDsMap[partition]; !ok {
135-
return fmt.Errorf("Partition %d not found in topic %s", partition, topic)
136-
}
137-
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
138-
partitionOffsets[partition], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partition)
139-
if err != nil {
140-
return err
141-
}
142-
} else {
143-
partitionOffsets[partition] = resetOffsetsConfig.offset
144-
}
145-
164+
if strategy == "" {
165+
partitionOffsets[partition] = cfg.offset
166+
return nil
146167
}
147-
} else {
148-
for _, partitionInfo := range topicInfo.Partitions {
149-
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
150-
partitionOffsets[partitionInfo.ID], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partitionInfo.ID)
151-
if err != nil {
152-
return err
153-
}
154-
} else {
155-
partitionOffsets[partitionInfo.ID] = resetOffsetsConfig.offset
156-
}
168+
169+
offset, err := groups.GetEarliestOrLatestOffset(ctx, connector, topic, strategy, partition)
170+
if err != nil {
171+
return err
157172
}
173+
174+
partitionOffsets[partition] = offset
158175
}
159176

160177
log.Infof(
161-
"This will reset the offsets for the following partitions in topic %s for group %s:\n%s",
178+
"This will reset the offsets for the following partitions "+
179+
"in topic %s for group %s:\n%s",
162180
topic,
163181
group,
164182
groups.FormatPartitionOffsets(partitionOffsets),
165183
)
166-
log.Info(
167-
"Please ensure that all other consumers are stopped, otherwise the reset might be overridden.",
168-
)
184+
185+
log.Info("Please ensure that all other consumers are stopped, " +
186+
"otherwise the reset might be overridden.")
169187

170188
ok, _ := util.Confirm("OK to continue?", false)
171189
if !ok {
172190
return errors.New("Stopping because of user response")
173191
}
174192

175193
cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
176-
return cliRunner.ResetOffsets(
177-
ctx,
178-
topic,
179-
group,
180-
partitionOffsets,
181-
)
194+
195+
if resetOffsetsConfig.delete {
196+
input := groups.DeleteOffsetsInput{
197+
GroupID: group,
198+
Topic: topic,
199+
Partitions: partitions,
200+
}
201+
202+
return cliRunner.DeleteOffsets(ctx, &input)
203+
}
204+
205+
return cliRunner.ResetOffsets(ctx, topic, group, partitionOffsets)
206+
}
207+
208+
func numTrue(bools ...bool) int {
209+
var n int
210+
for _, b := range bools {
211+
if b {
212+
n++
213+
}
214+
}
215+
216+
return n
217+
}
218+
219+
func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 {
220+
out := make([]T2, len(input))
221+
222+
for i, v := range input {
223+
out[i] = fn(v)
224+
}
225+
226+
return out
227+
}
228+
229+
func parsePartitionOffsetMap(partitionIDsMap map[int]struct{}, input map[string]int64) (map[int]int64, error) {
230+
out := make(map[int]int64, len(input))
231+
232+
for partition, offset := range input {
233+
partitionID, err := strconv.Atoi(partition)
234+
if err != nil {
235+
format := "Partition value %s must be an integer"
236+
return nil, fmt.Errorf(format, partition)
237+
}
238+
239+
_, ok := partitionIDsMap[partitionID]
240+
if !ok {
241+
format := "Partition %d not found"
242+
return nil, fmt.Errorf(format, partitionID)
243+
}
244+
245+
out[partitionID] = offset
246+
}
247+
248+
return out, nil
182249
}

pkg/cli/cli.go

+22
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,11 @@ func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error {
594594
return nil
595595
}
596596

597+
// DeleteOffsets removes offsets for a single consumer group / topic combination.
598+
func (c *CLIRunner) DeleteOffsets(ctx context.Context, input *groups.DeleteOffsetsInput) error {
599+
return invoke(ctx, c, input, groups.DeleteOffsets)
600+
}
601+
597602
// ResetOffsets resets the offsets for a single consumer group / topic combination.
598603
func (c *CLIRunner) ResetOffsets(
599604
ctx context.Context,
@@ -649,6 +654,7 @@ func (c *CLIRunner) Tail(
649654
10e3,
650655
10e6,
651656
)
657+
652658
stats, err := tailer.LogMessages(ctx, maxMessages, filterRegexp, raw, headers)
653659
filtered := filterRegexp != ""
654660

@@ -689,6 +695,22 @@ func (c *CLIRunner) stopSpinner() {
689695
}
690696
}
691697

698+
type invokeFunc[T any] func(context.Context, *admin.Connector, T) error
699+
700+
func invoke[T any](ctx context.Context, c *CLIRunner, v T, fn invokeFunc[T]) error {
701+
c.startSpinner()
702+
703+
err := fn(ctx, c.adminClient.GetConnector(), v)
704+
c.stopSpinner()
705+
if err != nil {
706+
return err
707+
}
708+
709+
c.printer("Success")
710+
711+
return nil
712+
}
713+
692714
func stringsToInts(strs []string) ([]int, error) {
693715
ints := []int{}
694716

0 commit comments

Comments
 (0)