Testing Apache Avro Compatibility
Experimenting in Python with specific Avro compatibility scenarios
There is a complete article on Avro compatibility and what should we do in the various compatibility modes.
In my opinion, this article is really good and makes it easy to understand how to make a breaking change with Avro format. But I still want to actually test what results these steps will have, so this article records my experimental process.
First, we build an experimental environment, basically we only need a schema registry. I choose Confluent Schema Registry, mainly because it’s simple to deploy and use, and the Confluent official document provides a complete local experimental environment. Nevertheless, I only need two core components, the broker and the schema-registry.
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.2.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Next, let’s follow the steps in the reference document to actually evolve a field, in other words, evolve a string field into a record field.
I wonder what happens in the following scenarios in the default BACKWARD
compatibility mode, and also how the unworkable scenarios described in the article will be presented.
- The consumer is fully engaged in the producer’s schema.
- The consumer only uses the initial schema.
- The consumer lags behind the producer’s version.
The whole experiment process is as follows.
To run this script needs to install the dependency first.
pip install confluent-kafka fastavro
The execution results are actually as expected, i.e., there are no problems, and the consumer only parses the parts that can be understood.
In addition, if the compatibility mode is BACKWARD_TRANSITIVE
, the following error will be encountered.
SchemaRegistryError: Schema being registered is incompatible with an earlier schema for subject “example.test.BACKWARD_TRANSITIVE-value”, details: [Incompatibility{type:READER_FIELD_MISSING_DEFAULT_VALUE, location:/fields/0, message:person_name, reader:{“type”:“record”,“name”:“Person”,“namespace”:“example.test.BACKWARD_TRANSITIVE”,“fields”:[{“name”:“person_name”,“type”:{“type”:“record”,“name”:“Name”,“fields”:[{“name”:“first_name”,“type”:“string”},{“name”:“last_name”,“type”:“string”}]}}]}, writer:{“type”:“record”,“name”:“Person”,“namespace”:“example.test.BACKWARD_TRANSITIVE”,“fields”:[{“name”:“name”,“type”:“string”,“default”:“<DEPRECATED>”}]}}] (HTTP status code 409, SR code 409)
On the other hand, if it is FORWARD_TRANSITIVE
, we will encounter the following.
SchemaRegistryError: Schema being registered is incompatible with an earlier schema for subject “example.test.FORWARD_TRANSITIVE-value”, details: [Incompatibility{type:READER_FIELD_MISSING_DEFAULT_VALUE, location:/fields/0, message:name, reader:{“type”:“record”,“name”:“Person”,“namespace”:“example.test.FORWARD_TRANSITIVE”,“fields”:[{“name”:“name”,“type”:“string”}]}, writer:{“type”:“record”,“name”:“Person”,“namespace”:“example.test.FORWARD_TRANSITIVE”,“fields”:[{“name”:“person_name”,“type”:{“type”:“record”,“name”:“Name”,“fields”:[{“name”:“first_name”,“type”:“string”,“default”:“<NOT_IN_USE>”},{“name”:“last_name”,“type”:“string”,“default”:“<NOT_IN_USE>”}]},“default”:{“first_name”:“<??>”,“last_name”:“<??>”}}]}}] (HTTP status code 409, SR code 409)
The above error also occurs at the corresponding step and cannot be continued, as stated in the reference document.
Conclusion
This article is mainly about the experimental process, but after experiencing it, it is also better to understand the best practices that should be followed for schema evolution.
The best practice principles for schema evolution are mentioned in this article.
- Make your primary key required.
- Give default values to all the fields that could be removed in the future.
- Be very careful when using ENUM as the can not evolve over time.
- Do not rename fields. You can add aliases instead.
- When evolving schema, ALWAYS give default values.
- When evolving schema, NEVER remove, rename of the required field or change the type.
I’m not sure where his source is from, but the descriptions match my past experience quite well.