diff --git a/nix/apache-kafka.nix b/nix/apache-kafka.nix index 63eb8f8..6e382f9 100644 --- a/nix/apache-kafka.nix +++ b/nix/apache-kafka.nix @@ -1,20 +1,26 @@ # Based on: https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/apache-kafka.nix { config, lib, pkgs, name, ... }: +let + mkPropertyString = + let + render = { + bool = lib.boolToString; + int = toString; + list = lib.concatMapStringsSep "," mkPropertyString; + string = lib.id; + }; + in + v: render.${lib.strings.typeOf v} v; + stringlySettings = lib.mapAttrs (_: mkPropertyString) + (lib.filterAttrs (_: v: v != null) config.settings); + + generator = (pkgs.formats.javaProperties { }).generate; +in with lib; { options = { - enable = mkOption { - description = lib.mdDoc "Whether to enable Apache Kafka."; - default = false; - type = types.bool; - }; - - brokerId = mkOption { - description = lib.mdDoc "Broker ID."; - default = -1; - type = types.int; - }; + enable = mkEnableOption (lib.mdDoc "Apache Kafka event streaming broker"); port = mkOption { description = lib.mdDoc "Port number the broker should listen on."; @@ -22,45 +28,99 @@ with lib; type = types.port; }; - hostname = mkOption { - description = lib.mdDoc "Hostname the broker should bind to."; - default = "localhost"; - type = types.str; - }; - dataDir = lib.mkOption { type = types.str; default = "./data/${name}"; - description = "The apache-kafka data directory"; + description = lib.mdDoc "The apache-kafka data directory"; }; - logDirs = mkOption { - description = lib.mdDoc "Log file directories inside the data directory."; - default = [ "/kafka-logs" ]; - type = types.listOf types.path; - }; - - zookeeper = mkOption { - description = lib.mdDoc "Zookeeper connection string"; - default = "localhost:2181"; - type = types.str; - }; - - extraProperties = mkOption { - description = lib.mdDoc "Extra properties for server.properties."; - type = types.nullOr types.lines; - default = null; - }; - - serverProperties = mkOption { + settings = mkOption { description = lib.mdDoc '' - Complete server.properties content. Other server.properties config - options will be ignored if this option is used. + [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs) + {file}`server.properties`. + + Note that .properties files contain mappings from string to string. + Keys with dots are NOT represented by nested attrs in these settings, + but instead as quoted strings (ie. `settings."broker.id"`, NOT + `settings.broker.id`). ''; - type = types.nullOr types.lines; + type = types.submodule { + freeformType = with types; let + primitive = oneOf [ bool int str ]; + in + lazyAttrsOf (nullOr (either primitive (listOf primitive))); + + options = { + "broker.id" = mkOption { + description = lib.mdDoc "Broker ID. -1 or null to auto-allocate in zookeeper mode."; + default = null; + type = with types; nullOr int; + }; + + "log.dirs" = mkOption { + description = lib.mdDoc "Log file directories."; + # Deliberaly leave out old default and use the rewrite opportunity + # to have users choose a safer value -- /tmp might be volatile and is a + # slightly scary default choice. + # default = [ "/tmp/apache-kafka" ]; + type = with types; listOf string; + default = [ (config.dataDir + "/logs") ]; + }; + + "listeners" = mkOption { + description = lib.mdDoc '' + Kafka Listener List. + See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners). + ''; + type = types.listOf types.str; + default = [ "PLAINTEXT://localhost:${builtins.toString config.port}" ]; + }; + }; + }; + }; + + clusterId = mkOption { + description = lib.mdDoc '' + KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid` + ''; + type = with types; nullOr str; default = null; }; + configFiles.serverProperties = mkOption { + description = lib.mdDoc '' + Kafka server.properties configuration file path. + Defaults to the rendered `settings`. + ''; + type = types.path; + default = generator "server.properties" stringlySettings; + }; + + configFiles.log4jProperties = mkOption { + description = lib.mdDoc "Kafka log4j property configuration file path"; + type = types.path; + default = pkgs.writeText "log4j.properties" config.log4jProperties; + defaultText = ''pkgs.writeText "log4j.properties" config.log4jProperties''; + }; + + formatLogDirs = mkOption { + description = lib.mdDoc '' + Whether to format log dirs in KRaft mode if all log dirs are + unformatted, ie. they contain no meta.properties. + ''; + type = types.bool; + default = false; + }; + + formatLogDirsIgnoreFormatted = mkOption { + description = lib.mdDoc '' + Whether to ignore already formatted log dirs when formatting log dirs, + instead of failing. Useful when replacing or adding disks. + ''; + type = types.bool; + default = false; + }; + log4jProperties = mkOption { description = lib.mdDoc "Kafka log4j property configuration."; default = '' @@ -84,12 +144,7 @@ with lib; ]; }; - package = mkOption { - description = lib.mdDoc "The kafka package to use"; - default = pkgs.apacheKafka; - defaultText = literalExpression "pkgs.apacheKafka"; - type = types.package; - }; + package = mkPackageOption pkgs "apacheKafka" { }; jre = mkOption { description = lib.mdDoc "The JRE with which to run Kafka"; @@ -106,30 +161,13 @@ with lib; processes = { "${name}" = let - serverProperties = - if config.serverProperties != null then - config.serverProperties - else - '' - # Generated by services-flake - broker.id=${toString config.brokerId} - port=${toString config.port} - host.name=${config.hostname} - log.dirs=${concatStringsSep "," (builtins.map (dir: "${config.dataDir}${dir}") config.logDirs)} - zookeeper.connect=${config.zookeeper} - ${toString config.extraProperties} - ''; - - serverConfig = pkgs.writeText "server.properties" serverProperties; - logConfig = pkgs.writeText "log4j.properties" config.log4jProperties; - startScript = pkgs.writeShellScriptBin "start-kafka" '' ${config.jre}/bin/java \ -cp "${config.package}/libs/*" \ - -Dlog4j.configuration=file:${logConfig} \ + -Dlog4j.configuration=file:${config.configFiles.log4jProperties} \ ${toString config.jvmOptions} \ kafka.Kafka \ - ${serverConfig} + ${config.configFiles.serverProperties} ''; in { @@ -137,7 +175,7 @@ with lib; readiness_probe = { # TODO: need to find a better way to check if kafka is ready. Maybe use one of the scripts in bin? - exec.command = "${pkgs.netcat.nc}/bin/nc -z ${config.hostname} ${toString config.port}"; + exec.command = "${pkgs.netcat.nc}/bin/nc -z localhost ${builtins.toString config.port}"; initial_delay_seconds = 2; period_seconds = 10; timeout_seconds = 4; diff --git a/nix/apache-kafka_test.nix b/nix/apache-kafka_test.nix index 70dd7e9..5bd3f18 100644 --- a/nix/apache-kafka_test.nix +++ b/nix/apache-kafka_test.nix @@ -2,14 +2,32 @@ services.zookeeper."z1".enable = true; # To avoid conflicting with `zookeeper_test.nix` in case the tests are run in parallel services.zookeeper."z1".port = 2182; - services.apache-kafka."k1".enable = true; - services.apache-kafka."k1".zookeeper = "localhost:2182"; + services.apache-kafka."k1" = { + enable = true; + port = 9094; + settings = { + # Since the available brokers are only 1 + "offsets.topic.replication.factor" = 1; + "zookeeper.connect" = [ "localhost:2182" ]; + }; + }; + # kafka should start only after zookeeper is healthy + settings.processes.k1.depends_on."z1".condition = "process_healthy"; settings.processes.test = { command = pkgs.writeShellApplication { runtimeInputs = [ pkgs.bash config.services.apache-kafka.k1.package ]; text = '' - bash kafka-topics.sh --list --bootstrap-server localhost:9092 + # Create a topic + kafka-topics.sh --create --bootstrap-server localhost:9094 --partitions 1 \ + --replication-factor 1 --topic testtopic + + # Producer + echo 'test 1' | kafka-console-producer.sh --broker-list localhost:9094 --topic testtopic + + # Consumer + kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic testtopic \ + --from-beginning --max-messages 1 | grep -q "test 1" ''; name = "kafka-test"; };