mirror of
https://github.com/juspay/services-flake.git
synced 2024-10-26 13:09:04 +03:00
[apache-kafka] Fix the configuration to allow kafka to run on non-default port (#76)
Tests are inspired from: https://github.com/NixOS/nixpkgs/blob/master/nixos/tests/kafka.nix The module is based on: https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/apache-kafka.nix
This commit is contained in:
parent
2e58b4a5b6
commit
c7bed59b87
@ -1,20 +1,26 @@
|
||||
# Based on: https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/apache-kafka.nix
|
||||
{ config, lib, pkgs, name, ... }:
|
||||
let
|
||||
mkPropertyString =
|
||||
let
|
||||
render = {
|
||||
bool = lib.boolToString;
|
||||
int = toString;
|
||||
list = lib.concatMapStringsSep "," mkPropertyString;
|
||||
string = lib.id;
|
||||
};
|
||||
in
|
||||
v: render.${lib.strings.typeOf v} v;
|
||||
|
||||
stringlySettings = lib.mapAttrs (_: mkPropertyString)
|
||||
(lib.filterAttrs (_: v: v != null) config.settings);
|
||||
|
||||
generator = (pkgs.formats.javaProperties { }).generate;
|
||||
in
|
||||
with lib;
|
||||
{
|
||||
options = {
|
||||
enable = mkOption {
|
||||
description = lib.mdDoc "Whether to enable Apache Kafka.";
|
||||
default = false;
|
||||
type = types.bool;
|
||||
};
|
||||
|
||||
brokerId = mkOption {
|
||||
description = lib.mdDoc "Broker ID.";
|
||||
default = -1;
|
||||
type = types.int;
|
||||
};
|
||||
enable = mkEnableOption (lib.mdDoc "Apache Kafka event streaming broker");
|
||||
|
||||
port = mkOption {
|
||||
description = lib.mdDoc "Port number the broker should listen on.";
|
||||
@ -22,43 +28,97 @@ with lib;
|
||||
type = types.port;
|
||||
};
|
||||
|
||||
hostname = mkOption {
|
||||
description = lib.mdDoc "Hostname the broker should bind to.";
|
||||
default = "localhost";
|
||||
type = types.str;
|
||||
};
|
||||
|
||||
dataDir = lib.mkOption {
|
||||
type = types.str;
|
||||
default = "./data/${name}";
|
||||
description = "The apache-kafka data directory";
|
||||
description = lib.mdDoc "The apache-kafka data directory";
|
||||
};
|
||||
|
||||
logDirs = mkOption {
|
||||
description = lib.mdDoc "Log file directories inside the data directory.";
|
||||
default = [ "/kafka-logs" ];
|
||||
type = types.listOf types.path;
|
||||
};
|
||||
|
||||
zookeeper = mkOption {
|
||||
description = lib.mdDoc "Zookeeper connection string";
|
||||
default = "localhost:2181";
|
||||
type = types.str;
|
||||
};
|
||||
|
||||
extraProperties = mkOption {
|
||||
description = lib.mdDoc "Extra properties for server.properties.";
|
||||
type = types.nullOr types.lines;
|
||||
default = null;
|
||||
};
|
||||
|
||||
serverProperties = mkOption {
|
||||
settings = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Complete server.properties content. Other server.properties config
|
||||
options will be ignored if this option is used.
|
||||
[Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
|
||||
{file}`server.properties`.
|
||||
|
||||
Note that .properties files contain mappings from string to string.
|
||||
Keys with dots are NOT represented by nested attrs in these settings,
|
||||
but instead as quoted strings (ie. `settings."broker.id"`, NOT
|
||||
`settings.broker.id`).
|
||||
'';
|
||||
type = types.nullOr types.lines;
|
||||
type = types.submodule {
|
||||
freeformType = with types; let
|
||||
primitive = oneOf [ bool int str ];
|
||||
in
|
||||
lazyAttrsOf (nullOr (either primitive (listOf primitive)));
|
||||
|
||||
options = {
|
||||
"broker.id" = mkOption {
|
||||
description = lib.mdDoc "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
|
||||
default = null;
|
||||
type = with types; nullOr int;
|
||||
};
|
||||
|
||||
"log.dirs" = mkOption {
|
||||
description = lib.mdDoc "Log file directories.";
|
||||
# Deliberaly leave out old default and use the rewrite opportunity
|
||||
# to have users choose a safer value -- /tmp might be volatile and is a
|
||||
# slightly scary default choice.
|
||||
# default = [ "/tmp/apache-kafka" ];
|
||||
type = with types; listOf string;
|
||||
default = [ (config.dataDir + "/logs") ];
|
||||
};
|
||||
|
||||
"listeners" = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Kafka Listener List.
|
||||
See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
|
||||
'';
|
||||
type = types.listOf types.str;
|
||||
default = [ "PLAINTEXT://localhost:${builtins.toString config.port}" ];
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
clusterId = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
|
||||
'';
|
||||
type = with types; nullOr str;
|
||||
default = null;
|
||||
};
|
||||
|
||||
configFiles.serverProperties = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Kafka server.properties configuration file path.
|
||||
Defaults to the rendered `settings`.
|
||||
'';
|
||||
type = types.path;
|
||||
default = generator "server.properties" stringlySettings;
|
||||
};
|
||||
|
||||
configFiles.log4jProperties = mkOption {
|
||||
description = lib.mdDoc "Kafka log4j property configuration file path";
|
||||
type = types.path;
|
||||
default = pkgs.writeText "log4j.properties" config.log4jProperties;
|
||||
defaultText = ''pkgs.writeText "log4j.properties" config.log4jProperties'';
|
||||
};
|
||||
|
||||
formatLogDirs = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Whether to format log dirs in KRaft mode if all log dirs are
|
||||
unformatted, ie. they contain no meta.properties.
|
||||
'';
|
||||
type = types.bool;
|
||||
default = false;
|
||||
};
|
||||
|
||||
formatLogDirsIgnoreFormatted = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Whether to ignore already formatted log dirs when formatting log dirs,
|
||||
instead of failing. Useful when replacing or adding disks.
|
||||
'';
|
||||
type = types.bool;
|
||||
default = false;
|
||||
};
|
||||
|
||||
log4jProperties = mkOption {
|
||||
@ -84,12 +144,7 @@ with lib;
|
||||
];
|
||||
};
|
||||
|
||||
package = mkOption {
|
||||
description = lib.mdDoc "The kafka package to use";
|
||||
default = pkgs.apacheKafka;
|
||||
defaultText = literalExpression "pkgs.apacheKafka";
|
||||
type = types.package;
|
||||
};
|
||||
package = mkPackageOption pkgs "apacheKafka" { };
|
||||
|
||||
jre = mkOption {
|
||||
description = lib.mdDoc "The JRE with which to run Kafka";
|
||||
@ -106,30 +161,13 @@ with lib;
|
||||
processes = {
|
||||
"${name}" =
|
||||
let
|
||||
serverProperties =
|
||||
if config.serverProperties != null then
|
||||
config.serverProperties
|
||||
else
|
||||
''
|
||||
# Generated by services-flake
|
||||
broker.id=${toString config.brokerId}
|
||||
port=${toString config.port}
|
||||
host.name=${config.hostname}
|
||||
log.dirs=${concatStringsSep "," (builtins.map (dir: "${config.dataDir}${dir}") config.logDirs)}
|
||||
zookeeper.connect=${config.zookeeper}
|
||||
${toString config.extraProperties}
|
||||
'';
|
||||
|
||||
serverConfig = pkgs.writeText "server.properties" serverProperties;
|
||||
logConfig = pkgs.writeText "log4j.properties" config.log4jProperties;
|
||||
|
||||
startScript = pkgs.writeShellScriptBin "start-kafka" ''
|
||||
${config.jre}/bin/java \
|
||||
-cp "${config.package}/libs/*" \
|
||||
-Dlog4j.configuration=file:${logConfig} \
|
||||
-Dlog4j.configuration=file:${config.configFiles.log4jProperties} \
|
||||
${toString config.jvmOptions} \
|
||||
kafka.Kafka \
|
||||
${serverConfig}
|
||||
${config.configFiles.serverProperties}
|
||||
'';
|
||||
in
|
||||
{
|
||||
@ -137,7 +175,7 @@ with lib;
|
||||
|
||||
readiness_probe = {
|
||||
# TODO: need to find a better way to check if kafka is ready. Maybe use one of the scripts in bin?
|
||||
exec.command = "${pkgs.netcat.nc}/bin/nc -z ${config.hostname} ${toString config.port}";
|
||||
exec.command = "${pkgs.netcat.nc}/bin/nc -z localhost ${builtins.toString config.port}";
|
||||
initial_delay_seconds = 2;
|
||||
period_seconds = 10;
|
||||
timeout_seconds = 4;
|
||||
|
@ -2,14 +2,32 @@
|
||||
services.zookeeper."z1".enable = true;
|
||||
# To avoid conflicting with `zookeeper_test.nix` in case the tests are run in parallel
|
||||
services.zookeeper."z1".port = 2182;
|
||||
services.apache-kafka."k1".enable = true;
|
||||
services.apache-kafka."k1".zookeeper = "localhost:2182";
|
||||
services.apache-kafka."k1" = {
|
||||
enable = true;
|
||||
port = 9094;
|
||||
settings = {
|
||||
# Since the available brokers are only 1
|
||||
"offsets.topic.replication.factor" = 1;
|
||||
"zookeeper.connect" = [ "localhost:2182" ];
|
||||
};
|
||||
};
|
||||
# kafka should start only after zookeeper is healthy
|
||||
settings.processes.k1.depends_on."z1".condition = "process_healthy";
|
||||
settings.processes.test =
|
||||
{
|
||||
command = pkgs.writeShellApplication {
|
||||
runtimeInputs = [ pkgs.bash config.services.apache-kafka.k1.package ];
|
||||
text = ''
|
||||
bash kafka-topics.sh --list --bootstrap-server localhost:9092
|
||||
# Create a topic
|
||||
kafka-topics.sh --create --bootstrap-server localhost:9094 --partitions 1 \
|
||||
--replication-factor 1 --topic testtopic
|
||||
|
||||
# Producer
|
||||
echo 'test 1' | kafka-console-producer.sh --broker-list localhost:9094 --topic testtopic
|
||||
|
||||
# Consumer
|
||||
kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic testtopic \
|
||||
--from-beginning --max-messages 1 | grep -q "test 1"
|
||||
'';
|
||||
name = "kafka-test";
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user