Workaround for Kafka Avro Deserializer NullReferenceException

The Problem

AvroDeserializer<GenericRecord> throws a cryptic NullReferenceException when deserializing null byte arrays:

Unhandled exception. Confluent.Kafka.ConsumeException: Local: Value deserialization error
 ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at Confluent.SchemaRegistry.Serdes.GenericDeserializerImpl.Deserialize(String topic, Byte[] array)
   at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)

Root cause: GenericDeserializerImpl.Deserialize() doesn’t validate null input before accessing the byte array.

The Solution

Create a wrapper that intercepts null values before they reach the deserializer:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Avro.Generic;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;

public class AvroGenericRecordDeserializerWrapper : IAsyncDeserializer<GenericRecord>
{
    private readonly ISchemaRegistryClient _schemaRegistryClient;
    private readonly AvroDeserializer<GenericRecord> _avroDeserializer;
    
    public AvroGenericRecordDeserializerWrapper(ISchemaRegistryClient schemaRegistryClient, IEnumerable<KeyValuePair<string, string>> config = null)
    { 
        _avroDeserializer = new AvroDeserializer<GenericRecord>(schemaRegistryClient, config);
        _schemaRegistryClient = schemaRegistryClient;
    }

    public async Task<GenericRecord> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull, SerializationContext context)
    {
        try
        {
            if (isNull)
            {
                Console.WriteLine($"isNull=true. This would throw NullReferenceException down the line with the provided implementation. Skipping downstream call and just returning null GenericRecord. (data.Lenght={data.Length.ToString()})");
                return await Task.FromResult<GenericRecord>(null);
            }
            return await _avroDeserializer.DeserializeAsync(data, isNull, context);
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Failed executing DeserializeAsync(data, isNull, context). Returning null GenericRecord ex => {ex.ToString()}");
            return await Task.FromResult<GenericRecord>(null);
        }            
    }
}

Usage

Replace AvroDeserializer<GenericRecord> with the wrapper:

var builder = new ConsumerBuilder<GenericRecord, GenericRecord>(_consumerConfig)
    // .SetKeyDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
    // .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
    .SetKeyDeserializer(new AvroGenericRecordDeserializerWrapper(schemaRegistry).AsSyncOverAsync())
    .SetValueDeserializer(new AvroGenericRecordDeserializerWrapper(schemaRegistry).AsSyncOverAsync())
    .SetErrorHandler((c, e) => {
        Console.WriteLine($"Consumer error handler: {e.Reason}");
    });

Issue reported here