Step 6: Consuming Data

Choose your client

For the producer application you have chosen a certain method. Use the links below to continue the Getting Started for the preferred method.+ Use the selector below to follow the right steps for your client type.

java consumer
NET consumer
REST consumer

Simple Java Consumer Application using the latest version of the Java Axual Client

Simple .NET Consumer Application using the latest version of the .NET Axual Client

Simple REST Consumer using any tool that can perform REST calls

building a Java application yourself using Axual Java Client to consume data from the stream.

Building a .NET application yourself using Axual .NET Client to consume data from the stream.

Creating REST calls to consume data from the stream.

Creating A Java Consumer Application

When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in step 2 in Avro format. To get some data onto the stream, follow step 5: Create A Producer Application.

To create a consumer application, the start is similar to creating a producer application. You start by including the maven dependency on the Axual Client and create the configuration for it:

<dependency>
    <groupId>io.axual.client</groupId>
    <artifactId>axual-client</artifactId>
    <version>5.4.2</version>
</dependency>

In order to create a very basic consumer application, this is all you need. Next step is to create the configuration for the AxualClient and also for the Consumer that will be used.

ClientConfig config = ClientConfig.newBuilder()
        // This is the app ID as it is configured in the self service
        .setApplicationId("io.axual.example.client.avro.consumer")
        .setApplicationVersion("0.0.1")
        // The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
        .setEndpoint("https://192.168.99.100:443")
        // The tenant you are part of
        .setTenant("demo")
        // The environment corresponding to the shortname of the env in the self service
        .setEnvironment("example")
        // The ssl configuration for your application. The certificate used should have a DN
        // matching the DN that was configured in self service
        .setSslConfig(
                SslConfig.newBuilder()
                        .setEnableHostnameVerification(false)
                        .setKeystoreLocation("example-consumer.client.jks") //(1)
                        .setKeystorePassword(new PasswordConfig("notsecret"))
                        .setKeyPassword(new PasswordConfig("notsecret"))
                        .setTruststoreLocation("common-truststore.jks") //(2)
                        .setTruststorePassword(new PasswordConfig("notsecret"))
                        .build()
        )
        .build();
  1. Refer this folder /example-consumer/jks.

  2. Refer this folder /example-truststore/jks.

The next step is creating a ConsumerConfig, similar to creating the ProducerConfig in step 5.

SpecificAvroConsumerConfig<Application, ApplicationLogEvent> specificAvroConsumerConfig =
        SpecificAvroConsumerConfig.<Application, ApplicationLogEvent>builder()
            // We want to make sure we get all the messages at least once. On a Kafka level, this means
            // that the offsets are committed once the message have been processed bey the application.
            .setDeliveryStrategy(DeliveryStrategy.AT_LEAST_ONCE)
            .setStream("applicationlogevents")
            .setProxyChain(ProxyChain.newBuilder()
                    .append(SWITCHING_PROXY_ID)
                    .append(RESOLVING_PROXY_ID)
                    .append(LINEAGE_PROXY_ID)
                    .append(HEADER_PROXY_ID)
                    .build())
            .build();

A consumer will also need a Processor. This is where all the business logic is defined on how to handle the consumed messages. In this example, we will use a simple LogEventSpecificConsumer that implements the Processor that logs the key and value for each consumed message.

public class LogEventSpecificConsumer implements Processor<Application, ApplicationLogEvent>, AutoCloseable {
    public static final Logger LOG = LoggerFactory.getLogger(LogEventSpecificConsumer.class);
    private final Consumer<Application, ApplicationLogEvent> consumer;
    private final LinkedList<ConsumerMessage<Application, ApplicationLogEvent>> received = new LinkedList<>();

    public LogEventSpecificConsumer(
            final AxualClient axualClient
            , final SpecificAvroConsumerConfig<Application
            , ApplicationLogEvent> consumerConfig) {
        this.consumer = axualClient.buildConsumer(consumerConfig, this);
        // This will start a user thread that does the actual consumption and processing of messages
        this.consumer.startConsuming();
    }

    // This method is used to process the message.
    @Override
    public void processMessage(ConsumerMessage<Application, ApplicationLogEvent> msg) {

        LOG.info("Received message on topic {} partition {} offset {} key {} value {}", msg.getSystem(), msg.getPartition(), msg.getOffset(), msg.getKey(), msg.getValue());
        received.add(msg);
    }

    public LinkedList<ConsumerMessage<Application, ApplicationLogEvent>> getReceived() {
        return received;
    }

    @Override
    public void close() {
        this.consumer.stopConsuming();
    }

    public boolean isConsuming() {
        return this.consumer.isConsuming();
    }
}

The consumer is then started like this:

/* Both the AxualClient and LogEventSpecificConsumer are AutoClosable and can be used in try-with-resources.*/
try (final AxualClient axualClient = new AxualClient(config);
     final LogEventSpecificConsumer consumer = new LogEventSpecificConsumer(axualClient, specificAvroConsumerConfig)) {
    // We want to prevent that the main thread exits the try-with-resources block, since that
    // will cause the consumer and axualClient to close, ending the user thread that started
    // the consumption. So in this example we make the main thread sleep for a second as long
    // as the consumer is still consuming.
    while (consumer.isConsuming()) {
        sleep(1000);
    }
}

When all of the above steps have been done correctly, start your consumer app.
With the logging level for the io.axual package to INFO, your app will produce logging that will look like this:

[main] INFO io.axual.client.consumer.base.BaseConsumer - Created consumer with source of class: io.axual.client.consumer.avro.AvroMessageSource.
[pool-1-thread-1] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - TTL updated to: 600000 (was: 0)
[pool-1-thread-1] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - Fetched discovery properties: DiscoveryResult: {...}
[pool-1-thread-1] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Received new DiscoveryResult for SwitchingConsumer: DiscoveryResult: {...}
[pool-1-thread-1] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Switching SwitchingConsumer from null to local
[pool-1-thread-1] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Creating new backing consumer with Discovery API result: DiscoveryResult: {...}
[pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.ConsumerSwitcher - Creating a new consumer with properties: {...}
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: {...}

[pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...}

[pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...}

[pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...}

[pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...}

[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'acl.principal.builder' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'cluster' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'instance' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'resolvingconsumerpartitionassignor.topic.resolver' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'topic.pattern' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'endpoint' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'resolvingconsumerpartitionassignor.backing.assignor' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'tenant' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'group.id.pattern' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'app.id' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'distributor.distance' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'ttl' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'enable.value.headers' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'app.version' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'rest.proxy.url' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'environment' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'system' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'distributor.timeout' was supplied but isn't a known config.
[pool-1-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.3.0
[pool-1-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: fc1aaa116b661c8a
[pool-1-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1585853689619
[pool-1-thread-1] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Created new backing consumer
[pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.ConsumerSwitcher - Consumer switched, applying assignments and subscriptions
[pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.ConsumerSwitcher - Consumer switch finished
[pool-1-thread-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Cluster ID: oA8Db59vRi21SnnMO2-G3g
[pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.SwitchingConsumer - Subscribing to topics: [applicationlogevents]
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Subscribed to topic(s): demo-Local-example-applicationlogevents
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Discovered group coordinator 192.168.99.100:9096 (id: 2147483646 rack: null)
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Revoking previously assigned partitions []
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] (Re-)joining group
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] (Re-)joining group
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Successfully joined group with generation 5
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Setting newly assigned partitions: demo-Local-example-applicationlogevents-0
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Setting offset for partition demo-Local-example-applicationlogevents-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.99.100:9096 (id: 1 rack: rack-1), epoch=0}}
[pool-1-thread-1] INFO io.axual.client.example.axualclient.avro.LogEventSpecificConsumer - Received message on topic demo-system partition 0 offset 10 key {"name": "app_0", "version": "1.9.9", "owner": "none"} value {"timestamp": 1000, "source": {"name": "logeventproducer", "version": "0.0.1", "owner": "Team Log"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Message 0"}

This is all the coding required to start a basic consumer!

Creating A .NET Consumer Application

When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in step 2 in Avro format. To get some data onto the stream, follow step 5: Create A Producer Application.

Setting up NuGet and credentials

Please make sure to include the following packageSources tag in your NuGet.Config, and make sure you have configured the right credentials for that repository as indicated below.

<configuration>
    <packageSources>
        <add key="axual" value="https://dev.axual.io/nexus/repository/axual-nuget/" />
    </packageSources>
    <packageSourceCredentials>
        <axual>
        <add key="Username" value="[USERNAME]" />
        <add key="ClearTextPassword" value="[PASSWORD]" />
        </axual>
    </packageSourceCredentials>
</configuration>

Setting up Dependency

Start by including the dependency on the Axual .NET client library as you would do with any dependency or
by adding PackageReference tag in your .csproj project.

<ItemGroup>
      <PackageReference Include="Axual.Kafka.Proxy" Version="1.0.1" />
      <PackageReference Include="Axual.SchemaRegistry.Serdes.Avro" Version="1.0.1" />
</ItemGroup>

Building The Application

In order to create a very basic consumer application, this is all you need.
Next step is to create the configuration for the AxualConsumerBuilder.

Consumer Configuration

var config = new AxualConsumerConfig
{
    // This is the app ID as it is configured in the self service
    ApplicationId = "loganalyzer",
    // The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
    EndPoint = new UriBuilder("https", "192.168.99.100", 443).Uri,
    // The tenant you are part of
    Tenant = "demo",
    // The environment corresponding to the shortname of the env in the self service
    Environment = "example",

    SecurityProtocol = SecurityProtocol.Ssl,
    SslKeystorePassword = "notsecret",
    SslKeystoreLocation = ServerSettings.KEYSTORE_RESOURCE_PATH, (1)
    EnableSslCertificateVerification = true,
    // Client verifies the identity of the broker
    SslCaLocation = ServerSettings.SSL_CA_PATH, (2)
    SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
};
1 For KEYSTORE_RESOURCE_PATH file refer to this folder /example-consumer/example-consumer.client.p12.cer.
2 For SSL_CA_PATH file refer to this folder /example-consumer/example_consumer.cer.

Creating The Consumer

With the above configurations, we can instantiate the AxualConsumerBuilder to build a AxualConsumer instantiate.

var consumer = new AxualConsumerBuilder<Application, ApplicationLogEvent>(config)
    .SetKeyDeserializer(new SpecificAvroDeserializer<Application>())
    .SetValueDeserializer(new SpecificAvroDeserializer<ApplicationLogEvent>())
    .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
    .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
    .Build()

Consuming Messages

Now, we are ready to start receive records using the AxualConsumer.

consumer.Subscribe("applicationlogevents");

try
{
    while (true)
    {
        try
        {
            var consumeResult = consumer.Consume(cancellationToken);

            if (consumeResult.IsPartitionEOF)
            {
                Console.WriteLine(
                    $"> Reached end of stream {consumeResult.Topic}, partition " +
                    $"{consumeResult.Partition}, offset {consumeResult.Offset}.");

                continue;
            }

            Console.WriteLine(
                $"> Received message at {consumeResult.TopicPartitionOffset}: " + Environment.NewLine +
                $"Key: {consumeResult.Message.Key}" + Environment.NewLine +
                $"Value: {consumeResult.Message.Value}");

            try
            {
                consumer.Commit(consumeResult);
            }
            catch (KafkaException e)
            {
                Console.WriteLine($"> Commit error: {e.Error.Reason}");
            }
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"> Consume error: {e.Error.Reason}");
        }
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("> Closing consumer.");
    consumer.Close();
}

When all of the above steps have been done correctly, start your consumer app, your app will produce logging that will look like this:

----------------------------------------------------------------------------------------
'axual_client_proxy_specific_avro_consumer' consuming from stream 'applicationlogevents'
----------------------------------------------------------------------------------------
Started consumer, Ctrl-C to stop consuming
> Received message at applicationlogevents [[1]] @41677:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1000, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 0 = some value 0}, log level = INFO, message = Message 0
> Received message at applicationlogevents [[1]] @41678:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1001, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 1 = some value 1}, log level = INFO, message = Message 1
> Received message at applicationlogevents [[1]] @41679:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1002, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 2 = some value 2}, log level = INFO, message = Message 2
> Received message at applicationlogevents [[1]] @41680:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1003, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 3 = some value 3}, log level = INFO, message = Message 3
> Received message at applicationlogevents [[1]] @41681:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1004, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 4 = some value 4}, log level = INFO, message = Message 4
> Received message at applicationlogevents [[1]] @41682:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1005, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 5 = some value 5}, log level = INFO, message = Message 5
> Received message at applicationlogevents [[1]] @41683:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1006, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 6 = some value 6}, log level = INFO, message = Message 6
> Received message at applicationlogevents [[1]] @41684:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1007, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 7 = some value 7}, log level = INFO, message = Message 7
> Received message at applicationlogevents [[1]] @41685:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1008, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 8 = some value 8}, log level = INFO, message = Message 8
> Received message at applicationlogevents [[1]] @41686:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1009, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 9 = some value 9}, log level = INFO, message = Message 9

This is all the coding required to start a basic consumer!

Creating A REST Consumer

When you have completed this step, you will start consuming data from the stream configured in step 2 in Avro format.

Prerequisites

Tool to send/receive HTTP messages.

Consuming data via REST

For the following steps, we’re going to use curl tool to consume data.

curl --request GET \
    --url "https://192.168.99.100:18100/stream/example/applicationlogevents" \
    --header "axual-application-id: io.axual.example.client.avro.consumer" \
    --header "axual-application-version: 0.0.1" \
    --header "axual-consumer-uuid: log-consumer1" \
    --header "axual-key-type: AVRO" \
    --header "axual-value-type: AVRO" \
    --header "axual-commit-strategy: AFTER_READ" \
    --header "axual-polling-timeout-ms: 10000" \
    --header "Content-Type: application/json" \
    --key ../client-cert/local-config/security/applications/example-consumer/pem/example_consumer.key \
    --cert ../client-cert/local-config/security/applications/example-consumer/cer/example_consumer.cer \
    --cacert ../client-cert/local-config/security/applications/common-truststore/cachain/common-truststore.pem
  1. For key refer this folder /example-consumer/pem.

  2. For cert refer this folder /example-consumer/cer.

  3. For cacert refer this folder /common-truststore/cachain.

When you execute above command, this will return response that will look like this:

{
   "cluster":"local",
   "messages":[
      {
         "messageId":"bf4ea734-4cd5-49e8-8b76-67bd63311d5d",
         "produceTimestamp":1585904295792,
         "partition":0,
         "offset":10,
         "produceCluster":"local",
         "consumeCluster":"local",
         "headers":{
            "Axual-Producer-Version":[
               "MC4wLjE="
            ],
            "Axual-Message-Id":[
               "v06nNEzVSeiLdme9YzEdXQ=="
            ],
            "Axual-Tenant":[
               "ZGVtbw=="
            ],
            "Axual-Serialization-Time":[
               "AAABcT9DCmk="
            ],
            "Axual-Environment":[
               "ZXhhbXBsZQ=="
            ],
            "Axual-Deserialization-Time":[
               "AAABcT9DWhQ="
            ],
            "Axual-Intermediate-Version":[
               "MC4wLjE="
            ],
            "Axual-System":[
               "ZGVtby1zeXN0ZW0="
            ],
            "Axual-Intermediate-Id":[
               "aW8uYXh1YWwuZXhhbXBsZS5jbGllbnQuYXZyby5jb25zdW1lcg=="
            ],
            "Axual-Cluster":[
               "bG9jYWw="
            ],
            "Axual-Instance":[
               "bG9jYWw="
            ],
            "Axual-Producer-Id":[
               "aW8uYXh1YWwuZXhhbXBsZS5jbGllbnQuYXZyby5wcm9kdWNlcg=="
            ]
         },
         "keyMessage":{
            "type":"AVRO",
            "schema":null,
            "schemaId":null,
            "message":"{\"name\": \"app_0\", \"version\": \"1.9.9\", \"owner\": \"none\"}"
         },
         "valueMessage":{
            "type":"AVRO",
            "schema":null,
            "schemaId":null,
            "message":"{\"timestamp\": 1000, \"source\": {\"name\": \"logeventproducer\", \"version\": \"0.0.1\", \"owner\": \"Team Log\"}, \"context\": {\"Some key\": \"Some Value\"}, \"level\": \"INFO\", \"message\": \"Message 0\"}"
         }
      }
   ]
}

For more details, please refer Rest-Proxy Consume Service

Wrapping up

You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring. Read more about this in 7. Enabling monitoring.

You can also dive into other documentation sections using the menu on the left, such as Self-Service.