Representing Changes in a Database as a Stream of Events

I'll be updating this post with a link to an example repository. I need time to finish up a few edge-cases and it will be up as soon as it is useful. I'll be tweeting it out when ready so please follow if interested!

Background

So, why would you want to do this? Great question!

In environments that have been around for a while, there will be a substantial build-up of automation for the purposes of supporting day-to-day business process. This might include core identity management infrastructure, business services built to support core processes and even simple applications build on top of data produced by these core service providers. Given certain environmental pressures, the organization can (over time) develop a size-able enterprise comprised of many systems of record (SoR); each authoritative over their little snag of the puzzle. These SoRs likely consume data produced by other systems as well as produce their own. The problem is that it's highly likely that these core services are going to be resistant to your new-fangled shiny hopes and dreams for greenfield.

In many distributed organizations, there will be a process for attribute release by which a service provider may request access to a set of attributes for which a given system of record is authoritative for. For an educational domain, think in terms of things like "The HR System", or perhaps a "Student Information System". These are monoliths that are resistant to change. There will likely be some attribute release process and you (as a developer) might request a subset of attributes about an identity, sign an SLA and the rest is magic. Somewhere in "magic land", some poor DBA / Analyst is going to process your request, create a view in an "Enterprisey" database management system and grant an account access to that; sending you the credentials with a jovial "Good Luck!"

You're happy! You have all the data you need to build "Next Big Thing"; so you set off to take this data and build something of it. The first thing you might think is, "Hey, I'm using another DBMS and this 'Enterprisey' stuff doesn't mesh well. I'll write a script to mirror data to my database and query it directly!"

This is the point where you've lost the war. The application is now immediately and tightly coupled to a system that is likely managed by another organization, is upstream and probably has no intentions of "meeting you in the middle" because of your deadlines or "business needs" or whatever. Your application will likely also have a difficult time running in isolation as it requires access to possibly-many shared databases and other resources.

What I describe in this post is not a silver-bullet for decoupling your application from a shared resource. In fact, there are better solutions that may offer a higher return for the amount of work required. A traditional approach to this problem might include a simple anti-corruption layer backed repository mapping. This is fairly detail-oriented but is also simple to implement and easy to understand. It also results in a fairly "thick" and "complex" layer in-between contexts as there is simply a lot of translation happening. It's also fairly resource-intensive as you're live querying through that mapping layer regularly. As with all things, it's a trade-off.

This is an integration pattern that is purposed around integrating event-oriented and Legacy systems. It is a higher upfront investment with a payoff of a thinner and much-less complex ACL between bounded contexts. It can also open the opportunity to treat an entire Legacy system or Big Ball of Mud architecture as any other bounded context that you'd see in a CQRS/ES system. Because we're now only interested in events coming from this "Legacy bounded context bubble" we are able to test any integration a bit more reasonably. We're not having to worry about building huge amounts of seed data to populate local copies of the Legacy data-set. We actually don't even need a database underneath our integration as we can just fake events and publish them to a consuming bounded context and test boundaries in this way.

Getting Started

I'm going to walk through how you might approach representing an arbitrary Legacy system's changes as a stream of events that we have interest in. In the end, we should be able to apply this pattern to almost any legacy system where flat-state can be queried regularly. First, a brief overview of the process we'll be following:

  1. Build Understanding of Legacy
  2. Discover Interesting Hypothetical Domain Events
  3. Producing Events from State
  4. (bonus) Exposing Events as a JSON API

Build Understanding of Legacy

So, imagine we're in the domain of higher education and we're building a piece of software responsible for matching students to scholarships that they are eligible for. You might imagine that GPA would be a factor in determining eligibility. Pushing this further, we've previously requested GPA as a released attribute about our students and we have a denormalized view we can query. We need a reliable means of querying this information. Let's start with what our schema looks like:

Name: DNRM_STUDENT_GPA_VW

Column Type Comments
STUDENT_ID VARCHAR(11) Globally unique identifier within Legacy system.
CUM_GPA DECIMAL(8,3) Numeric GPA in decimal to three digits of precision.

Pretty simple stuff! We can query current cumulative GPA, given a student identifier. Let's write up an interface for that:

interface GpaRegistrar {

    /**
     * @param StudentId $id value object uniquely identifying a student
     *
     * @returns GPA value object representing GPA
     */
    public gpaByStudentId(StudentId $id);

}

Before we move on, realize that this is purposefully a simple example. However, even in this example there are several nuanced questions we have yet to ask about our denormalization of this data. It is critically important to have a complete understanding of the semantics of a given denormalization. Here are some questions (and a few example answers) I would have, given this de-normalization with no other information:

  • Is this for the current semester? How do I know? We'll assume it's always current for this example
  • What happens when a student graduates or otherwise leaves University?
  • What happens when a student joins? Do they come in with a perfect GPA? We'll assume that the first time they show up in dataset, the GPA is accurate.
  • What is a "perfect" GPA? Not really important in a first iteration, but it's 4.333 (A+). We'll come back to this at the end.
  • Do identifiers ever change? Can a student change this? Assume that for this example, the answer is no! However, consider the impact after you finish the article.

Some of these seem completely obvious; I would agree. However, the point is that you need to ask questions about even the most seemingly obvious points so that you are sure you have as complete a picture as is reasonable. We'll move forward and see if we hit any snags.

Discover Interesting Hypothetical Domain Events

This next part is probably the most fun as it's the most thought provoking (at least for me). You need to ask yourself:

Given my bounded context which will eventually be consuming events from this "GPA Legacy Context", what am I most interested in knowing about?

Our current solution needs to respond to semantic changes in GPA in order to evaluate a student's eligibility for a set of scholarships. However, it's not just "changes" we're interested in. That's not fun! We want to have events that explicate answers to some of the answers to our questions during our legacy discovery process. Having a bunch of StudentGpaChanged events to consume will not be very interesting and possibly not as useful as something more expressive. So...

We ask ourselves, as the consumer, why do we care about when a GPA value for a student fluctuates? Any number of things might happen as a result:

  1. A student may become eligible for a scholarship when their GPA improves.
  2. A student may become ineligible for a scholarship when their GPA worsens.

Is there anything else? What about when records just "show up and leave" the dataset? What does that mean? Notice that we asked these questions during discovery for the attribute release we requested. It is now more important than ever to REALLY have answers to those questions in order to proceed. For purposes of example, we'll say it works this way:

  1. A student may be eligible for a scholarship as soon as they arrive at the University.
  2. A student becomes ineligible for all scholarships when they leave the University.

We'll stop here for a second and reduce. We have a few statements that contain what we're interested in and why. Now we just have to explicate what exactly we want to consume...

Producing Events From State

We've decided we're interested in the following events and want to start working towards an implementation:

  • StudentGpaImproved
  • StudentGpaWorsened
  • StudentArrivedAtUniversity
  • StudentLeftTheUniversity

These are pretty basic data-transfer objects that have a few public attributes. Depending on our needs for consumption, we need to include different properties on each event. This is a simple example, so this will be straight-forward:

class StudentGpaImproved
{
    /**
     * @var StudentId
     */
    public $studentId;

    /**
     * @var GPA
     */
    public $gpa;

    /* ... */
}

We make a DTO for each of our event types. Next, the question is "Given our Legacy interface that only allows querying for current state, how do we produce events?". The answer is pretty straight-forward. We need to be able to codify the changes in current state that map to each of our events. We know that a StudentGpaImproved when the current queried state is above a previously queried state. We know that a StudentArrivedAtUniversity when we don't have previously queried state that matches a current queries state. StudentLeftTheUniversity is pretty much the opposite of this.

The basic idea then becomes that we must keep some cache of current state under a regularly run process to compare queried state against the cache; using the difference to inform production of events. So you can imagine how you might go about this:

  1. Grab all things we currently cache and compare against current state. Produce "improve/worsen" events
  2. If we get a miss when querying current state matching something from cache. Produce "left university" events
  3. Detect all new things that exist in current state, but not in cache. Produce "arrived at university" events

This is a pretty naive algorithm, but let's set-forward implementing with one optimization up-front... What if we had a object that could calculate a difference represented as our events between itself and another instance of the same object. You might try something like:

class StudentTest extends PHPUnit_Framework_TestCase
{
    @test
    public function it_can_represent_differences_as_events()
    {
        $studentLastSemester = new Student('123456789', Gpa::fromGrade('C'));
        $studentToday = new Student('123456789', Gpa::fromGrade('B'));

        // Do the diff and pluck first event.
        $event = $studentToday->diff($studentLastSemester)[0];

        // Should be the right event and should have properties
        $this->assertInstanceOf('StudentGpaImproved', $event);
        $this->assertEquals('123456789', $event->studentId);
        $this->assertEquals(3.0, $event->gpa->points());        // B == 3.0
        $this->assertEquals(1.0, $event->delta);                // 3.0 - 2.0 = 1.0
    }
}

We have a test! Before we make it pass, think about the following:

Now that we're on our way to a single model that can produce the events we want, do we care about where it's persisted?

Would it be valid to say it could be retrieved from our MySQL database OR could it be sourced from the "summation" of every event we've calculated? I think so!

class Student
{
    /**
     * @var string
     */
    private $studentId;

    /**
     * @var GPA
     */
    private $gpa;

    public function __construct($studentId, GPA $gpa)
    {
        // ...
        $this->studentId = $studentId;
        $this->gpa = $gpa;
    }

    /**
     * Given another student, produce semantic events from differences in GPA.
     */
    public function diff(Student anotherStudent)
    {
        // ...

        // Current GPA improved over last semester...
        if ($this->gpa->gt($anotherStudent->gpa)) {
            $this->recordThat(new StudentGpaImproved($this->studentId, $this->gpa));
        }

        // ...

        return $this->uncommittedEvents();
    }

    // ...
}

Again, this is a trivial example and there will be times where additional complexity might necessarily creep in. For example, what if GPA being missing from our attribute release database wasn't the only requirement (and it isn't) for being able to say that a StudentLeftTheUniversity. We'd need to take that into account.

Once we have a test-suite that validates expectations we have for this model, we can start to talk about implementing mappings of this model to our traditional database as well as implementing an integration with an event store. I'm including the implementation of these in the GitHub repository. Obviously, if your team is already using packages that implement an event store, use it! Implementing GpaRegistrar should be straight-forward on the traditional database side (similar to our test, actually... as far as reconstitution of the Student model). Implementing GpaRegistrar as an event store is perhaps a little more interesting. That said, this starts to dip a little more into technical details of Event Sourcing and rather than re-explain that here, just check out some of the resources I've linked below. Basically, we put a factory method on the Student model; making it responsible for reconstituting a Student given a stream of events, in order.

After implementing mappings, we'll need a driver that actually carries out the long-running process of producing events when changes are detected between current and previous state. This can be accomplished a few ways, algorithmically. Again, check out the repository for examples of how I approach this:

  • Option #1: Source everything from traditional schema. By this, you would maintain two copies of the attribute release table(s); one representing current and one representing previous. You would use your traditional mapper to resolve current and previous instances of Student, diff them and commit those events to the event store for consumption.
  • Option #2: Project the cache from event stream to compare against traditional. You would use your traditional mapper to resolve current Student. You would use a similar mapper to resolve previous Student from a internal cache projected from your event stream. This projection represents "Students we're aware of", not necessarily the full-details. In fact, previous state is still queried off the event store to compare against new. Projection is almost used as simply a lookup or "things to process".

I prefer the second option for a variety of reasons. Probably the biggest (and most valuable) being that I'm "dog-fooding" the events I'm providing to other bounded contexts. This is a good practice for validating their usefulness and making sure they are just "fat" enough (that they include enough semantic detail). There is a little more implementation involved, but I believe the immediate validation is reason-enough to justify additional implementation. On a more technical note, consider how expensive this process is for more complex examples. You're basically saying, "I'm going to do this O(N^2) (best-case) comparison because I'm greedy". If we use a projection, we can start to represent a priority queue of "Students to check diffs on" and limit the runtime of our greedy algorithm.

Maybe this event stream (we're publishing to other BCs) doesn't need to be immediately consistent. Maybe an "hour old" is okay? Maybe a "day old" is okay? This is just a matter of establishing an SLA (or agreement) on what's acceptable. For this trivial example, it's a GPA... it changes once a semester... so we might decide to just run the process every semester and be greedy because it's a rare process! Of course, IT DEPENDS!

Conclusion

I know this wrapped up pretty quick and "all of a sudden". This was actually a lot longer than intended. The point of the article was to show that by thinking through the problem, you can build a model that isn't actually what's in your environment. It's an abstraction; and through that abstraction, we can afford ourselves opportunity to do some pretty powerful stuff using even the simplest of implementation strategy. If there are any questions, please feel free to grab me on Twitter at @mdwheele and we can talk. You can also leave a comment below. More importantly, if you have counter-examples where this type of approach falls down, let's talk about it! I am always interested in different domains and I fully recognize the fallacy in any "silver bullet"!

Things Our Implementation Doesn't Cover

  • How to expose these events as a JSON API for consumption (coming soon)
  • How to handle failures from connected systems. What happens when our attribute release database is simply unavailable for any reason? Does that mean that "EVERYBODY LEFT THE UNIVERSITY"? Probably not, haha.
  • There are some edge-cases or missed opportunities for elegance for sure, but the purpose is to paint the broad strokes. There's always room for improvement!
  • We talked about "what makes a perfect GPA". Perhaps there is room for more interesting events like StudentAchievedPerfectGpa or StudentDippedBelowUniversityStandard.

Resources