16 Feb 23

Using Avro in Ruby

At a previous company, we were evolving our monolith to be more event-driven. We didn’t exactly break it up, but we were getting a few things outside of it.

We had already put events and pub/sub in place using ActiveSupport::Notifictions. We decided to use events in response to coupling that was happening in ActiveRecord callbacks.

I’ve always wanted to write a gem that would watch all callbacks and if you operated on something other than self an exception would be thrown. In every Rails app I have worked in, inevitably a callback gets added that upon save of one model, operates on an instance(s) of another model.

For example, I find this pretty regularly:

class SomeModel < ApplicationRecord

  after_save do
        SomeOtherModel.where(...).update_all
    end
end

The problem is the coupling that creates. We can no longer test the main class in isolation. We have to have a SomeOtherModel class defined. Maybe I’ll write that gem someday. Maybe it already exists in some form.

Back to events. We put a pattern in place where events are published and subscribers subscribe to them. For example, part of the application was a questionnaire. When the questionnaire was submitted, we would save it and then we would publish an event. Again, it used ActiveSupport::Notifications.

With events, when a model was saved, we didn’t have to use a callback to update some other model. We instead published an event (in the controller) broadcasting a message that a particular model was saved and what changed about it and a subscriber that has a single responsibility of updating the other model performs that action.

This happened synchronously but not in a transaction in case you were wondering. That is the one potential con of the pattern vs callbacks which happen in a transaction. But, we safeguard that some with a conditional in the controller letting us know the original model was in fact saved. It’s not exactly apples to apples. But each subscriber is easy to define and find. And we can now test the original model in isolation.

In the above scenario, we need to do a number of things when the questionnaire is submitted. We calculate a score. We send an email. We create an event in an audit log, etc. Each of those things is handled by a separate subscriber. They each perform a single responsibility. But it is synchronous and we’d benefit from running them all in parallel asynchronously.

OK, so where does Avro come into the picture? When we implemented our events pattern, the team decided to model our events after Debezium events. If you are unfamiliar, Debezium is a process that watches a database like mysql or Postgres and when changes occur, it will publish an event. It watches the log. The intention is that Debezium will publish these events to a system like Kafka. As the data in a database changes (i.e. a User record gets created or a Comment gets updated), Debezium will stream these events to a downstream system to be processed. Debezium uses Avro as the format for the messages it produces.

We implemented that in Rails. We defined our events using the Avro-builder gem. So in app/events, you’d find an event that captures the fact that a Comment was created for example. The event contains a payload. We decided to validate that the payloads adhered to the schemas we defined for the events. That prevents us from firing off a bunch of events and have the subscribers fail because they expected data that wasn’t being included all of the sudden. The schema is the contract between the event producer and the subscribers.

Here is an example of what a simple event definition looks like:

module Events
  module Comments
    class Created
      include Events::Event

      EVENT_NAME    = "created.comment"
      EVENT_VERSION = "0.1"

      attr_reader :comment, :user

      def initialize(comment:, user: nil)
        @comment = comment
        @user    = user
      end

      def schema
        Avro::Builder.build do
          namespace EVENT_NAME

          record :comment do
            optional :before, :record

            required :after, :record do
              required :uuid, :string, logical_type: :uuid
              required :body, :string
              required :author_id, :int
              required :commentable_type, :string
              required :commentable_id, :int
              optional :actor, :string
            end

            required :source, :record do
              required :version, :string
              required :name, :string
              required :ts_ms, :long, logical_type: 'timestamp-millis'
              required :table, :string
              optional :query, :string
            end

            required :op, :enum, symbols: [:c, :u, :d]
            required :ts_ms, :long, logical_type: 'timestamp-millis'
          end
        end
      end

      def payload
        {
          before: nil,
          after: {
            uuid: comment.uuid,
            body: comment.body,
            author_id: comment.author_id,
            commentable_type: comment.commentable_type,
            commentable_id: comment.commentable_id,
            actor: user&.uuid # actor is optional for now
          },
          source: {
            version: EVENT_VERSION,
            name:    EVENT_NAME,
            ts_ms:   ts_ms(comment.updated_at), # persistence timestamp
            table:   comment.class.table_name,
          },
          op:    'c',
          ts_ms: ts_ms(Time.zone.now) # event timestamp
        }
      end
    end
  end
end

We abstracted a number of things as you can tell by the mixin mentioned. But the important parts are the schema and the payload. An Avro message coming from Debezium will also contain the schema and the payload and will look very similar.

Notice the before block is empty. That is because this is a “created” event. Nothing existed before. In an “updated” event, the before block would contain values that represented the state of the object being updated before the update was applied. The after block obviously contains the values after the update was applied. There is some additional data that comes along such as timestamps.

To summarize so far, we published events in Rails to the internal memory queue implemented via ActiveSupport::Notifications. We used Avro as the format. And modeled the events after Debezium events. You could use a Ruby hash, JSON, etc. But we selected Avro and utilized the Avro-builder gem. It makes for an easy transition which I’ll get to. We have subscribers that listen for the various events being published and perform single responsibility business logic.

The subscribers aren’t all that interesting, they look something like:

module Subscribers
  module Comments
    module Created
      class Logger
        include Subscribers::Subscriber

        EVENT_NAME = Events::Comments::Created::EVENT_NAME

        def initialize
          subscribe
        end

        def process(event:)
          payload    = JSON.parse(event.payload.to_json, object_class: OpenStruct)
          comment    = Comment.find_by(uuid: payload.after.uuid)

              # perform some business logic
        end
      end
    end
  end
end 

Now, back to transitioning the monolith. We started to implement a system where we no longer define the events in Rails manually. We started actually utilizing Debezium after some needed updates to our data model. Debezium published events to Kafka a la CDC.

So the pattern was, open a PR with the migration to create the table. Run the migration. Open a PR that creates the model and introduced a consumer to consume the event that Debezium broadcasts when the model is saved or updated, or deleted. Along with some configuration. Once merged the following will happen:

  • A model instance is saved or updated in the Rails app
  • Transaction happens in the database and written to the write ahead log
  • Debezium sees the change and publishes a message to Kafka
  • Kafka places the message on the topic
  • Our Karafka based consumers defined in the Rails app receive the message and perform each of their responsibilities in parallel

The application implementing this pattern was an API. So all the endpoint had to be responsible for is mutating the database record. It didn’t have to publish an event. It no longer had to email anyone or update another associated record. It simply performs the save or update or delete and moves on. Quick responses to the API consumer. All of the side effects of the save or update or delete are handled by consumers in different parallel processes.

The other added benefit is that if a bug is discovered in a consumer, just that consumer can be paused until fixed, we don’t have to prevent access to the API endpoint. I suppose we could also nerd out and mention that consumers can be written for these events completely outside of Ruby/Rails.

Consumers were declared as a consumer of a specific Kafka topic. You could define multiple consumers per topic. So we were going to have topics for say the Users table. We’d declare the topic and any consumers in the Karakfa initializers file.

Each consumer received a copy of a message published to the assigned topic at least once. Our consumers then processed the messages if they were relevant to the responsibility of the consumer. Some consumers may save records. Some consumers may publish a “command” event. If you aren’t familiar with command/query (aka CQRS), it’s an interesting pattern.

Because we no longer needed to define the events because we’d consume events published by Debezium, we could remove the Avro-builder gem. We’ll still need the base Avro gem. We’ll also be using the Avro-turf gem to read schemas out of the schema registry. If you are unfamiliar with Kafka, it registers all schemas for all events in a schema registry.

The messages being consumed, because they were in Avro format, contained their schema. But there were going to be some times where we’d want to look up a specific schema, and specifically a version, of a schema from the registry directly. But that will end up being it as far as Avro goes.

The Salsify team did a lot of work on various Avro gems. Really cool stuff. We Experimented with using their Avromatic gem for defining models/objects based on Avro schemas. Our use case was a little different than theirs based on who is responsible for defining the schemas for events. If you are wanting to define schemas and back models with them, definitely check it out. If you’re curious, the class definition would look something like this:

class PersonEvent
  # NOTE: this loads the schema by name from the local schema store
  # include Avromatic::Model.build(schema_name: :person)

  # NOTE: this pulls the schema from the registry and builds schema at runtime
  # subject = Avromatic.schema_registry.subject_version("person")
  # schema = Avro::Schema.parse(subject["schema"])
  # include Avromatic::Model.build(schema: schema, subject: "person")

  # NOTE: this pulls the latest value schema from registry using messaging API
  schema = Avromatic.messaging.fetch_schema(subject: "app.public.persons-value")
  include Avromatic::Model.build(schema: schema.first)
end

We were excited about the decoupling we were able to achieve and the parallel processing. The evolution was made pretty smooth because of the team’s decision to leverage Avro for our current events. Moving our subscribers to consumers was pretty easy since the event format was essentially the same, just originating from Debezium rather than our manual publishing.