Kafka Avro Consumer issue workaround

I reported the issue here

Description

AvroDeserializer throws NullReferenceException without any further info.

Unhandled exception. Confluent.Kafka.ConsumeException: Local: Value deserialization error
 ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at Confluent.SchemaRegistry.Serdes.GenericDeserializerImpl.Deserialize(String topic, Byte[] array)
   at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   --- End of inner exception stack trace ---
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
   at Arch.Pocs.KafkaClient.Consumer.ConsoleUI.KafkaAvroConsumer.Poll(String topic)+MoveNext() in /app/KafkaAvroConsumer.cs:line 42
   at Arch.Pocs.KafkaClient.Consumer.ConsoleUI.Program.Main(String[] args) in /app/Program.cs:line 32
   at Arch.Pocs.KafkaClient.Consumer.ConsoleUI.Program.<Main>(String[] args)

The issue is: The method Confluent.SchemaRegistry.Serdes.AvroDeserializer<T>.DeserializeAsync(...) (https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializer.cs) makes a call to deserializerImpl.Deserialize(context.Topic, isNull ? null : data.ToArray()).ConfigureAwait(continueOnCapturedContext: false). Notice that the second parameter could potencially be null. Object deserializerImpl is of type GenericDeserializerImpl for GenericRecord. The implementation of the method GenericDeserializerImpl.Deserialize(string topic, byte[] array) (https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs) does not check for null, so if the byte array is null, the NullReferenceException would be thrown, which is exactly what happend.

So:

A wrapper for AvroDeserializer<GenericRecord>

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Avro.Generic;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;

public class AvroGenericRecordDeserializerWrapper : IAsyncDeserializer<GenericRecord>
{
    private readonly ISchemaRegistryClient _schemaRegistryClient;
    private readonly AvroDeserializer<GenericRecord> _avroDeserializer;
    public AvroGenericRecordDeserializerWrapper(ISchemaRegistryClient schemaRegistryClient, IEnumerable<KeyValuePair<string, string>> config = null)
    { 
        _avroDeserializer = new AvroDeserializer<GenericRecord>(schemaRegistryClient, config);
        _schemaRegistryClient = schemaRegistryClient;
    }

    public async Task<GenericRecord> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull, SerializationContext context)
    {
        try
        {
            if (isNull)
            {
                Console.WriteLine($"isNull=true. This would throw NullReferenceException down the line with the provided implementation. Skipping downstream call and just returning null GenericRecord. (data.Lenght={data.Length.ToString()})");
                return await Task.FromResult<GenericRecord>(null);
            }
            return await _avroDeserializer.DeserializeAsync(data, isNull, context);
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Failed executing DeserializeAsync(data, isNull, context). Returning null GenericRecord ex => {ex.ToString()}");
            return await Task.FromResult<GenericRecord>(null);
        }            
    }
}

Using the AvroGenericRecordDeserializerWrapper rather than AvroDeserializer<GenericRecord>

Just set AvroGenericRecordDeserializerWrapper as deserializers instead of AvroDeserializer<GenericRecord>

var builder = new ConsumerBuilder<GenericRecord, GenericRecord>(_consumerConfig)
                        // .SetKeyDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                        // .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                        .SetKeyDeserializer(new AvroGenericRecordDeserializerWrapper(schemaRegistry).AsSyncOverAsync())
                        .SetValueDeserializer(new AvroGenericRecordDeserializerWrapper(schemaRegistry).AsSyncOverAsync())
                        .SetErrorHandler((c, e) => {
                            Console.WriteLine($"Consumer error handler: {e.Reason}");
                        });