16 Feb 23

this is formatted much nicer on Medium

At Blue Lava, we’re evolving our monolith to be more event-driven. We’re not exactly breaking it up, but we are getting a few things outside of it.

We had already put events and pub/sub in place using ActiveSupport::Notifictions. We decided to use events in response to coupling that was happening in ActiveRecord callbacks.

I’ve always wanted to write a gem that would watch all callbacks and if you operated on something other than self an exception would be thrown. In every Rails app I have worked in, inevitably a callback gets added that upon save of one model, operates on an instance(s) of another model.

For example, I find this pretty regularly:

class SomeModel < ApplicationRecord after_save do SomeOtherModel.where(...).update_all endend

The problem is the coupling that creates. We can no longer test the main class in isolation. We have to have a SomeOtherModel class defined. Maybe I’ll write that gem someday. Maybe it already exists in some form.

Back to events. We put a pattern in place where events are published and subscribers subscribe to them. For example, part of our application is a questionnaire. When the questionnaire is submitted, we save it and then we publish an event. Again, it uses ActiveSupport::Notifications at the moment.

Now when a model is saved, we don’t have to use a callback to update some other model. We instead publish an event (in the controller) broadcasting a message that a particular model was saved and what changed about it and a subscriber that has a single responsibility of updating the other model performs that action.

This happens synchronously but not in a transaction in case you were wondering. That is the one potential con of the pattern vs callbacks which happen in a transaction. But, we safeguard that some with a conditional in the controller letting us know the original model was in fact saved. It’s not exactly apples to apples. But each subscriber is easy to define and find. And we can now test the original model in isolation.

In the above scenario, we need to do a number of things when the questionnaire is submitted. We calculate a score. We send an email. We create an event in an audit log, etc. Each of those things is handled by a separate subscriber now. They each perform a single responsibility. But it is synchronous and we’d benefit from running them all in parallel asynchronously.

OK, so where does Avro come into the picture? When we implemented our events pattern, we decided to model our events after Debezium events. If you are unfamiliar, Debezium is a process that watches a database like mysql or Postgres and when changes occur, it will publish an event. It watches the log. The intention is that Debezium will publish these events to a system like Kafka. As the data in a database changes (i.e. a User record gets created or a Comment gets updated), Debezium will stream these events to a downstream system to be processed. Debezium uses Avro as the format for the messages it produces.

We implemented that in Rails. We define our events using the Avro-builder gem. So in app/events, you’ll find an event that captures the fact that a Comment was created for example. The event contains a payload. We decided to validate that the payloads adhered to the schemas we defined for the events. That prevents us from firing off a bunch of events and have the subscribers fail because they expected data that wasn’t being included all of the sudden. The schema is the contract between the event producer and the subscribers.

Here is an example of what a simple event definition looks like:

module Events module Comments class Created include Events::Event EVENT_NAME = "created.comment" EVENT_VERSION = "0.1" attr_reader :comment, :user def initialize(comment:, user: nil) @comment = comment @user = user end def schema Avro::Builder.build do namespace EVENT_NAME record :comment do optional :before, :record required :after, :record do required :uuid, :string, logical_type: :uuid required :body, :string required :author_id, :int required :commentable_type, :string required :commentable_id, :int optional :actor, :string end required :source, :record do required :version, :string required :name, :string required :ts_ms, :long, logical_type: 'timestamp-millis' required :table, :string optional :query, :string end required :op, :enum, symbols: [:c, :u, :d] required :ts_ms, :long, logical_type: 'timestamp-millis' end end end def payload { before: nil, after: { uuid: comment.uuid, body: comment.body, author_id: comment.author_id, commentable_type: comment.commentable_type, commentable_id: comment.commentable_id, actor: user&.uuid # actor is optional for now }, source: { version: EVENT_VERSION, name: EVENT_NAME, ts_ms: ts_ms(comment.updated_at), # persistence timestamp table: comment.class.table_name, }, op: 'c', ts_ms: ts_ms(Time.zone.now) # event timestamp } end end endend

We abstracted a number of things as you can tell by the mixin mentioned. But the important parts are the schema and the payload. An Avro message coming from Debezium will also contain the schema and the payload and will look very similar.

Notice the before block is empty. That is because this is a “created” event. Nothing existed before. In an “updated” event, the before block would contain values that represented the state of the object being updated before the update was applied. The after block obviously contains the values after the update was applied. There is some additional data that comes along such as timestamps.

To summarize so far, we publish events in Rails to the internal memory queue implemented via ActiveSupport::Notifications. We use Avro as the format. And modeled the events after Debezium events. You could use a Ruby hash, JSON, etc. But we selected Avro and utilize the Avro-builder gem. It makes for an easy transition which I’ll get to. We have subscribers that listen for the various events being published and perform single responsibility business logic.

The subscribers aren’t all that interesting, they look something like:

module Subscribers module Comments module Created class Logger include Subscribers::Subscriber EVENT_NAME = Events::Comments::Created::EVENT_NAME def initialize subscribe end def process(event:) payload = JSON.parse(event.payload.to_json, object_class: OpenStruct) comment = Comment.find_by(uuid: payload.after.uuid) # perform some business logic end end end endend

Now, back to transitioning the monolith. We are starting to implement a system where we no longer define the events in Rails manually. We are going to start actually utilizing Debezium after some needed updates to our data model. Debezium is going to publish events to Kafka. A number of us on the team have implemented similar data processing systems in the past at Recurly and Nike so running these various services is not new. The Kafka ecosystem is really mature and handles scenarios like replay-ability should it ever be needed.

So the pattern will be, open a PR with the migration to create the table. Run the migration. Open a PR that creates the model and introduce a consumer to consume the event that Debezium will broadcast when the model is saved or updated, or deleted. Along with some configuration. Once merged the following will happen:

1. A model instance is saved or updated in the Rails app
2. Transaction happens in the database and written to the write ahead log
3. Debezium sees the change and publishes a message to Kafka
4. Kafka places the message on the topic
5. Our Karafka based consumers defined in the Rails app receive the message and perform each of their responsibilities in parallel

The application implementing this pattern is an API. So all the endpoint has to be responsible for is mutating the database record. It doesn’t have to publish an event. It no longer has to email anyone or update another associated record. It simply performs the save or update or delete and moves on. Quick responses to the API consumer. All of the side effects of the save or update or delete are handled by consumers in different parallel processes.

The other added benefit is that if a bug is discovered in a consumer, just that consumer can be paused until fixed, we don’t have to prevent access to the API endpoint. I suppose we could also nerd out and mention that consumers can be written for these events completely outside of Ruby/Rails. We’ll save that for another post maybe where I can discuss our data warehousing.

We’ve decided to use Karafka as the library to run our consumers. The consumers are defined in app/consumers.

They look something like this:

# NOTE: topics are configured in: /karafka.rbclass ExampleConsumer < ApplicationConsumer def consume messages.each { |message| process(message) } end def process(message) Rails.logger.info "payload: #{message.payload} received on topic: #{message.topic}" # perform some business logic endend

Consumers are declared as a consumer of a specific Kafka topic. Obviously you can define multiple consumers per topic. So we’re going to have topics for say the Users table. We’ll declare the topic and any consumers in the Karakfa initializers file.

Each consumer will receive a copy of a message published to the assigned topic at least once. Our consumers will then process the messages if they are relevant to the responsibility of the consumer. Some consumers may save records. Some consumers may publish a “command” event. If you aren’t familiar with command/query (aka CQRS), it’s an interesting pattern.

Because we are no longer needing to define the events because we’ll consumer events published by Debezium, we can remove the Avro-builder gem. We’ll still need the base Avro gem. We’ll also be using the Avro-turf gem to read schemas out of the schema registry. If you are unfamiliar with Kafka, it registers all schemas for all events in a schema registry.

The messages being consumed, because they are in Avro format, will contain their schema. But there are going to be some times where we’ll want to look up a specific schema, and specifically a version, of a schema from the registry directly. But that will end up being it as far as Avro goes.

I have to give a huge shout-out to the Salsify team for the work they’ve done on various Avro gems. We’ve experimented with using their Avromatic gem for defining models/objects based on Avro schemas. It’s possible we’ll use it, but we’re not implementing it at the moment. An OpenStruct gives us enough of an object definition for the time being. Our use case is a little different than theirs based on who is responsible for defining the schemas for events. If you are wanting to define schemas and back models with them, definitely check it out. If you’re curious, the class definition would look something like this:

class PersonEvent # NOTE: this loads the schema by name from the local schema store # include Avromatic::Model.build(schema_name: :person) # NOTE: this pulls the schema from the registry and builds schema at runtime # subject = Avromatic.schema_registry.subject_version("person") # schema = Avro::Schema.parse(subject["schema"]) # include Avromatic::Model.build(schema: schema, subject: "person") # NOTE: this pulls the latest value schema from registry using messaging API schema = Avromatic.messaging.fetch_schema(subject: "app.public.persons-value") include Avromatic::Model.build(schema: schema.first)end

That’s it for now. As time goes on, maybe I’ll write another post about our experience moving our events out of Rails and leveraging Debezium, Kafka, and Karafka. I imagine there will be some interesting bits about race conditions, recursion, batch processing, comparisons to background jobs with Sidekiq, etc.

Right now, we’re excited about the decoupling we’ll be able to achieve and the parallel processing. This evolution was made pretty smooth because of the decision to leverage Avro for our current events. Moving our subscribers to consumers will be pretty easily since the event format is the essentially the same, just originating from Debezium rather than our manual publishing.