Producing

Introduction

Producing messages to the Axual Platform consists of three main steps which are discussed in separate segments of this document.

The sequence of steps that need be taken to produce a message are:

Instantiating an AxualProducer

Namespace: Axual.Kafka.Proxy.Proxies.Axual

The Producer is the object used for producing messages.

To create such an object we make use of the Builder pattern from our code base like so:

var producer =
    new AxualProducerBuilder<GenericRecord, GenericRecord>(config) (1)
            .SetKeySerializer(new GenericAvroSerializer<GenericRecord>()) (2)
            .SetValueSerializer(new GenericAvroSerializer<GenericRecord>())(3)
            .SetLogHandler((_, l) => Console.WriteLine($"[{l.Level}]: {l.Message}")) (4)
            .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) (5)
            .Build()) (6)
1 Uses the AxualProducerBuilder builder. Configuration as explained in Producer Configuration
2 Indicates the serializer for the key Objects
3 Indicates the serializer for the value Objects
4 Callback for log handling, in this case we simply log everything
5 Callback for error handling, in this case we simply log the exceptions
6 Build the producer object
It is recommended to use the above code within a using statement such that correct resource management using IDisposable is guaranteed.

Instantiating a Message

Now our producer is readily available, we can proceed to creating some meaningful information that we want to publish to the Axual Platform. That is done in the form of key value pairs in the Kafka realm and those key value pairs are called messages or events.

The payload of those messages is dependent on the stream definition. We will not go into too much detail on why AVRO is preferred since that is covered in the Self Service documentation. Briefly put, AVRO provides for a more efficient means to encapsulate object types compared to STRING and that is why it has our preference.

For our example we are producing to a stream in which the key value pairs defined as Application and ApplicationLogEvent respectively. In other words, each event or message on the stream consists of an Application object for key and ApplicationLogEvent as the value.

The previously mentioned data types are formalized in .avsc files that express the data they consist of.

Application.avsc :

{
  "type": "record",
  "name": "Application",
  "namespace": "io.axual.client.example.schema",
  "fields": [
    {
      "name": "name",
      "doc": "The name of the application",
      "type": "string"
    },
    {
      "name": "version",
      "doc": "(Optional) The application version",
      "default": null,
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "owner",
      "doc": "The owner of the application",
      "default": null,
      "type": [
        "null",
        "string"
      ]
    }
  ]
}

ApplicationLogEvent.avsc:

{
  "type": "record",
  "name": "ApplicationLogEvent",
  "namespace": "io.axual.client.example.schema",
  "fields": [
    {
      "name": "timestamp",
      "doc": "Timestamp of the event",
      "type": "long"
    },
    {
      "name": "source",
      "doc": "The application that sent the event",
      "type": {
        "type": "record",
        "name": "Application",
        "namespace": "io.axual.client.example.schema",
        "fields": [
          {
            "name": "name",
            "doc": "The name of the application",
            "type": "string"
          },
          {
            "name": "version",
            "doc": "(Optional) The application version",
            "default": null,
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "owner",
            "doc": "The owner of the application",
            "default": null,
            "type": [
              "null",
              "string"
            ]
          }
        ]
      }
    },
    {
      "name": "context",
      "doc": "The application context, contains application-specific key-value pairs",
      "type": {
        "type": "map",
        "values": "string"
      }
    },
    {
      "name": "level",
      "doc": "The log level, being either DEBUG, INFO, WARN or ERROR",
      "type": {
        "type": "enum",
        "name": "ApplicationLogLevel",
        "namespace": "io.axual.client.example.schema",
        "symbols": [
          "DEBUG",
          "INFO",
          "WARN",
          "ERROR",
          "FATAL"
        ]
      }
    },
    {
      "name": "message",
      "doc": "The log message",
      "type": "string"
    }
  ]
}

Next we define some utility methods which populate the data definitions from the .avsc files above to convey useful information wrapped in the Message to be produced:

private static GenericRecord CreateApplicationRecord(
    string applicationName,
    string version = null,
    string owner = null)
{
    RecordSchema ApplicationSchema =
            (RecordSchema) RecordSchema.Parse(File.ReadAllText(APPLICATION_SCHEMA_PATH)); (1)

    var application = new GenericRecord(ApplicationSchema); (2)
    application.Add("name", applicationName); (3)
    application.Add("version", version);
    application.Add("owner", owner);
    return application;
}

private static GenericRecord CreateApplicationLogEventRecord(
    GenericRecord application,
    long timestamp,
    string logLevel,
    string message,
    Dictionary<string, object> context)
{
    RecordSchema ApplicationLogEventSchema =
            (RecordSchema) RecordSchema.Parse(File.ReadAllText(APPLICATIONLOGEVENT_SCHEMA_PATH)); (1)

    var levelSchema = (EnumSchema) ApplicationLogEventSchema["level"].Schema;

    var logEvent = new GenericRecord(ApplicationLogEventSchema); (2)
    logEvent.Add("timestamp", timestamp); (3)
    logEvent.Add("source", application);
    if (logLevel != null)
    {
        GenericEnum enumLogLevel = new GenericEnum(levelSchema, logLevel);
        logEvent.Add("level", enumLogLevel);
    }

    logEvent.Add("message", message);
    logEvent.Add("context", context);

    return logEvent;
}
1 Load the schema files. Optimally this takes place only once, outside of the scope of this method
2 Defines returned GenericRecord to be embedded into the message
3 Custom code to populate the object values

Having defined those utility methods we proceed to use them for instantiating our message as follows:

GenericRecord k = CreateApplicationRecord(applicationName, "1.9.9", "none"); (1)
GenericRecord v = CreateApplicationLogEventRecord(owner, timestamp, logLevel, message, context); (2)
var message = new Message<GenericRecord, GenericRecord>() {Key = k, Value = v}; (3)
1 Create key object
2 Create value object
3 Populate the message with the payload created

Producing the Message

Here we use the previously created objects to produce the message using the producer. The only thing that we need and have not mentioned yet is the stream on which we are producing. This is the unresolved name of the stream as it appears on the Self Service Portal.

It is most common for this to be done within a loop but for simplicity we include only a single call of the produce method:

var topicName = "applicationlogevents"; (1)
using (var producer = new AxualProducerBuilder<GenericRecord, GenericRecord>(config)
                    .SetKeySerializer(new GenericAvroSerializer<GenericRecord>())
                    .SetValueSerializer(new GenericAvroSerializer<GenericRecord>())
                    .SetLogHandler((_, l) => Console.WriteLine($"[{l.Level}]: {l.Message}"))
                    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                    .Build())
{
    try
    {
        var res = producer.ProduceAsync(topicName, message).Result; (2)
        Console.WriteLine(
            $"Successful produce to topic {res.Topic} partition {res.Partition} offset {res.Offset}"); (3)
    }
    catch (Exception ex)
    {
        Console.Write(ex); (4)
    }
}
1 Stream name as found in Self Service
2 Use producer object to produce asynchronously to given topic and capture the result
3 Log part of the metadata from the result of the ProduceAsync call
4 Custom exception handling logic