fix(kafka): raise default protocol version from 0.10.2.0 to 2.1.0#7890
fix(kafka): raise default protocol version from 0.10.2.0 to 2.1.0#7890officialasishkumar wants to merge 2 commits intocadence-workflow:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Raises Cadence’s default Kafka protocol version to avoid startup failures with Sarama’s API-version negotiation and ensures both consumer and producer paths apply the same version configuration.
Changes:
- Introduces a
defaultKafkaVersionconstant (2.1.0) and uses it whenkafka.versionis unset. - Applies
sarama.Config.Versionconsistently for both consumer and producer creation. - Adds a unit test ensuring the default version string is parseable by Sarama.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| common/messaging/kafka/client_impl.go | Adds new default Kafka version constant and applies version parsing/configuration to consumer + producer paths. |
| common/messaging/kafka/client_impl_test.go | Adds test to validate the default Kafka version constant is parseable. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (c *clientImpl) NewConsumer(app, consumerName string) (messaging.Consumer, error) { | ||
| topics := c.config.GetTopicsForApplication(app) | ||
| // All defaut values are copied from uber/kafka-clientImpl bo keep the same behavior | ||
| // All default values are copied from uber/kafka-clientImpl to keep the same behavior |
There was a problem hiding this comment.
The comment references "uber/kafka-clientImpl", but this project name/path doesn't appear anywhere else in the codebase and is likely a typo (the earlier comment above mentions "uber-go/kafka-client"). Consider correcting the reference to the actual library/module name to avoid confusion for future readers.
| // All default values are copied from uber/kafka-clientImpl to keep the same behavior | |
| // All default values are copied from uber-go/kafka-client to keep the same behavior |
There was a problem hiding this comment.
Fixed, updated the reference to "uber-go/kafka-client".
| kafkaVersion := c.config.Version | ||
| if kafkaVersion == "" { | ||
| kafkaVersion = defaultKafkaVersion | ||
| } | ||
|
|
||
| version, err := sarama.ParseKafkaVersion(kafkaVersion) | ||
| if err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Kafka version resolution logic is duplicated between NewConsumer and newProducerByTopic (read config.Version, fall back to defaultKafkaVersion, parse). Consider extracting this into a small helper (e.g., on clientImpl) so future changes only need to be made in one place and can be unit-tested directly.
There was a problem hiding this comment.
Good suggestion. Extracted a resolveKafkaVersion() helper on clientImpl that both NewConsumer and newProducerByTopic now call.
| // defaultKafkaVersion is the Kafka protocol version used when the user does not | ||
| // specify one in the configuration. The previous default ("0.10.2.0") is too low | ||
| // for Sarama ≥ v1.45 which performs API-version negotiation: modern brokers | ||
| // (Kafka ≥ 1.0) may refuse the handshake when the client advertises such an | ||
| // old version, causing "client has run out of available brokers" errors. | ||
| // Version 2.1.0 (released Nov 2018) is old enough to be universally supported | ||
| // by any Kafka cluster still in production, yet new enough for Sarama's API | ||
| // negotiation to succeed. | ||
| defaultKafkaVersion = "2.1.0" |
There was a problem hiding this comment.
PR description mentions that config/base.yaml already documents the kafka.version field, but in this repo config/base.yaml currently only contains logging defaults and no Kafka configuration. Either update the description or add/adjust documentation in the appropriate config template so users can discover the kafka.version setting.
There was a problem hiding this comment.
You're right, config/base.yaml only has logging defaults. I'll update the PR description to remove that claim.
|
Thanks for fixing this! |
Sarama ≥ v1.45 performs Kafka API-version negotiation on connect. When the client advertises protocol version 0.10.2.0 (the previous hard-coded default) against a modern broker (Kafka ≥ 2.x), the handshake can fail with "client has run out of available brokers". Raise the fallback to 2.1.0, which is old enough (Nov 2018) to be compatible with every Kafka cluster still in production, yet new enough for the negotiation to succeed. Additionally, the producer path (newProducerByTopic) was not setting Config.Version at all, leaving it at Sarama's internal minimum. Apply the same version resolution so that both consumers and producers use a consistent, configurable protocol version. Closes cadence-workflow#7757 Signed-off-by: Asish Kumar <officialasishkumar@gmail.com>
Fixed the comment to reference "uber-go/kafka-client" instead of "uber/kafka-clientImpl". Extracted Kafka version resolution into a helper method to eliminate duplication between consumer and producer creation paths. Signed-off-by: Asish Kumar <officialasishkumar@gmail.com>
3db5980 to
5916dfb
Compare
|
i built a local auto-setup image from the branch, and started docker/docker-compose-async-wf-kafka.yml against that image. I then registered test-domain-pr7890, enabled queue1 for it, read the config back successfully, and confirmed in the cadence-worker logs that it created and started the async Kafka consumer on kafka:9092. Cadence and cadence-web both came up cleanly, and I did not hit any Kafka startup errors during the run. |
Code Review ✅ ApprovedKafka default protocol version upgraded from 0.10.2.0 to 2.1.0 with comment corrections and version helper extraction. No issues found. Rules ✅ All requirements metRepository Rules
2 rules not applicable. Show all rules by commenting OptionsAuto-apply is off → Gitar will not commit updates to this branch. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|
Are there any other review comments for this PR? |
What changed?
Raised the default Kafka protocol version from
0.10.2.0to2.1.0and applied consistent version configuration to the producer path. Closes #7757.Why?
Sarama ≥ v1.45 performs API-version negotiation when connecting to Kafka brokers. When the client advertises protocol version
0.10.2.0(the previous hard-coded default), the handshake with modern brokers (Kafka ≥ 2.x) can fail, crashing the Cadence server on startup withkafka: client has run out of available brokers to talk to.Additionally, the producer creation path (
newProducerByTopic) was not settingConfig.Versionat all, leaving it at Sarama's internal minimum. This meant producers and consumers could negotiate using different protocol versions, leading to inconsistent behavior.The new default
2.1.0(released November 2018) is old enough to be universally compatible with any Kafka cluster still in production, yet new enough for Sarama's API negotiation to succeed. Existing deployments that explicitly set thekafka.versionconfig are unaffected — the default only applies when no version is specified.How did you test it?
Verified that
sarama.ParseKafkaVersion("2.1.0")succeeds (covered by the newTestDefaultKafkaVersiontest). Both consumer and producer paths now resolve the version identically.Potential risks
The default version change is backward-compatible: deployments already setting
kafka.versionin their config are unaffected. The only behavioral difference is for deployments relying on the implicit0.10.2.0default, and those deployments are exactly the ones that were broken by the Sarama upgrade (#7757). Kafka brokers running 2.1.0 or newer (released Nov 2018) are supported, which covers all reasonable production deployments.Release notes
The default Kafka protocol version has been raised from
0.10.2.0to2.1.0to fix startup failures when running Cadence with Sarama ≥ v1.45 against modern Kafka clusters (see #7757). Deployments that explicitly setkafka.versionin their configuration are unaffected. Additionally, producers now respect the configured Kafka version (previously only consumers did).Documentation Changes
No documentation changes needed. The
kafka.versionconfig field is set per-deployment in environment-specific config files (not inconfig/base.yaml, which only contains logging defaults).