Skip to content

Commit dd5c025

Browse files
pdepaepebernd
authored andcommitted
Allow kafka group.id to be configured (Graylog2#5867)
Signed-off-by: Pierre De Paepe <[email protected]> Closes Graylog2#3976
1 parent 998cb6f commit dd5c025

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

graylog2-server/src/main/java/org/graylog2/inputs/transports/KafkaTransport.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@
7272
import static com.codahale.metrics.MetricRegistry.name;
7373

7474
public class KafkaTransport extends ThrottleableTransport {
75-
public static final String GROUP_ID = "graylog2";
7675
public static final String CK_FETCH_MIN_BYTES = "fetch_min_bytes";
7776
public static final String CK_FETCH_WAIT_MAX = "fetch_wait_max";
7877
public static final String CK_ZOOKEEPER = "zookeeper";
7978
public static final String CK_TOPIC_FILTER = "topic_filter";
8079
public static final String CK_THREADS = "threads";
8180
public static final String CK_OFFSET_RESET = "offset_reset";
81+
public static final String CK_GROUP_ID = "group_id";
8282

8383
// See https://kafka.apache.org/090/documentation.html for available values for "auto.offset.reset".
8484
private static final ImmutableMap<String, String> OFFSET_RESET_VALUES = ImmutableMap.of(
@@ -87,6 +87,7 @@ public class KafkaTransport extends ThrottleableTransport {
8787
);
8888

8989
private static final String DEFAULT_OFFSET_RESET = "largest";
90+
private static final String DEFAULT_GROUP_ID = "graylog2";
9091

9192
private static final Logger LOG = LoggerFactory.getLogger(KafkaTransport.class);
9293

@@ -185,7 +186,7 @@ public void run() {
185186

186187
final Properties props = new Properties();
187188

188-
props.put("group.id", GROUP_ID);
189+
props.put("group.id", configuration.getString(CK_GROUP_ID, DEFAULT_GROUP_ID));
189190
props.put("client.id", "gl2-" + nodeId + "-" + input.getId());
190191

191192
props.put("fetch.min.bytes", String.valueOf(configuration.getInt(CK_FETCH_MIN_BYTES)));
@@ -382,6 +383,13 @@ public ConfigurationRequest getRequestedConfiguration() {
382383
"What to do when there is no initial offset in ZooKeeper or if an offset is out of range",
383384
ConfigurationField.Optional.OPTIONAL));
384385

386+
cr.addField(new TextField(
387+
CK_GROUP_ID,
388+
"Consumer group id",
389+
DEFAULT_GROUP_ID,
390+
"Name of the consumer group the Kafka input belongs to",
391+
ConfigurationField.Optional.OPTIONAL));
392+
385393
return cr;
386394
}
387395
}

0 commit comments

Comments
 (0)