Workaround for Kafka Avro Deserializer NullReferenceException
The Problem
AvroDeserializer<GenericRecord> throws a cryptic NullReferenceException when deserializing null byte arrays:
Unhandled exception. Confluent.Kafka.ConsumeException: Local: Value deserialization error
---> System.NullReferenceException: Object reference not set to an instance of an object.
at Confluent.SchemaRegistry.Serdes.GenericDeserializerImpl.Deserialize(String topic, Byte[] array)
at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
Root cause: GenericDeserializerImpl.Deserialize() doesn’t validate null input before accessing the byte array.
The Solution
Create a wrapper that intercepts null values before they reach the deserializer:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Avro.Generic;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
public class AvroGenericRecordDeserializerWrapper : IAsyncDeserializer<GenericRecord>
{
private readonly ISchemaRegistryClient _schemaRegistryClient;
private readonly AvroDeserializer<GenericRecord> _avroDeserializer;
public AvroGenericRecordDeserializerWrapper(ISchemaRegistryClient schemaRegistryClient, IEnumerable<KeyValuePair<string, string>> config = null)
{
_avroDeserializer = new AvroDeserializer<GenericRecord>(schemaRegistryClient, config);
_schemaRegistryClient = schemaRegistryClient;
}
public async Task<GenericRecord> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull, SerializationContext context)
{
try
{
if (isNull)
{
Console.WriteLine($"isNull=true. This would throw NullReferenceException down the line with the provided implementation. Skipping downstream call and just returning null GenericRecord. (data.Lenght={data.Length.ToString()})");
return await Task.FromResult<GenericRecord>(null);
}
return await _avroDeserializer.DeserializeAsync(data, isNull, context);
}
catch (Exception ex)
{
Console.WriteLine($"Failed executing DeserializeAsync(data, isNull, context). Returning null GenericRecord ex => {ex.ToString()}");
return await Task.FromResult<GenericRecord>(null);
}
}
}
Usage
Replace AvroDeserializer<GenericRecord> with the wrapper:
var builder = new ConsumerBuilder<GenericRecord, GenericRecord>(_consumerConfig)
// .SetKeyDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
// .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.SetKeyDeserializer(new AvroGenericRecordDeserializerWrapper(schemaRegistry).AsSyncOverAsync())
.SetValueDeserializer(new AvroGenericRecordDeserializerWrapper(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((c, e) => {
Console.WriteLine($"Consumer error handler: {e.Reason}");
});