When building systems to process messages, it’s not unlikely to find yourself in a situation where you need to process a number of inputted heterogeneous messages (i.e. messages of varying shapes/types). For example, consider a situation where you are processing messages from an SQS queue via a Lambda function. This post attempts to highlight how this can be achieved in a clean and elegant manner by utilizing Pydantic, Python’s typing system, and some helpers from the Python standard library.

Categorizing messages of unknown type

The first thing you likely need to do is identify the type of an inputted message by its properties. We can use Pydantic to model the types of messages we expect to have coming into our system. We can then utilize Pydantic’s parse_obj_as function to cast these messages to their respective Pydantic classes.

In the following example, we are able to distinguish between messages based on the attributes that they contain:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import pydantic
import typing

class Email(pydantic.BaseModel):
    to: typing.List[str]
    subject: str
    message: str

class Sms(pydantic.BaseModel):
    to: str
    message: str

# Create a type demonstrating all of the expected messages
SupportedMessages = typing.Union[Email, Sms]

# Pass in our new type with a list of uncategorized messages
messages = pydantic.parse_obj_as(
    typing.List[SupportedMessages],
    [
        {
            "to": ['[email protected]', '[email protected]'],
            "subject": "BBQ Emergency",
            "message": "Need more ketchup!"
        },
        {
            "to": "911",
            "message": "Burnt finger at BBQ :("
        }
    ]
)

# They have now been cast to their appropriate types
print(messages)
#> [Email(to=['[email protected]', '[email protected]'], subject='BBQ Emergency', message='Need more ketchup!'), Sms(to='911', message='Burnt finger at BBQ :(')]

Other times, messages are differentiated based on the value of a particular attribute. The same pattern applies:

Example of value-based differentiation
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import pydantic
import typing

class Email(pydantic.BaseModel):
    type: typing.Literal["email"]
    to: str
    message: str

class Sms(pydantic.BaseModel):
    type: typing.Literal["sms"]
    to: str
    message: str

# Create a type demonstrating all of the expected messages
SupportedMessages = typing.Union[Email, Sms]

# Pass in our new type with a list of uncategorized messages
messages = pydantic.parse_obj_as(
    typing.List[SupportedMessages],
    [
        {
            "type": "email",
            "to": "[email protected]",
            "message": "Head's up, Ringo has a new idea"
        },
        {
            "type": "sms",
            "to": "867-5309",
            "message": "New phone, who dis?"
        }
    ]
)

# They have now been cast to their appropriate types
print(messages)
#> [Email(type='email', to='[email protected]', message="Head's up, Ringo has a new idea"), Sms(type='sms', to='867-5309', message='New phone, who dis?')]

Edge Cases

Unknown types

In the event that a message does not fit any model, a pydantic.ValidationError will be thrown:

Example of an unexpected message
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import pydantic
import typing

class Email(pydantic.BaseModel):
    to: typing.List[str]
    subject: str
    message: str

pydantic.parse_obj_as(Email, {"foo": "bar"})
#> Traceback (most recent call last):
#    File "<stdin>", line 1, in <module>
#    File "pydantic/tools.py", line 38, in pydantic.tools.parse_obj_as
#    File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
#  pydantic.error_wrappers.ValidationError: 3 validation errors for ParsingModel[Email]
#  __root__ -> to
#    field required (type=value_error.missing)
#  __root__ -> subject
#    field required (type=value_error.missing)
#  __root__ -> message
#    field required (type=value_error.missing)

Similar types

You can find yourself in challenging situations when one type is a subset of another:

Example of a situation where you can differentiate between similar message types
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import pydantic
import typing

class Person(pydantic.BaseModel):
    name: str

class Pet(pydantic.BaseModel):
    name: str
    breed: str

print(pydantic.parse_obj_as(typing.Union[Person, Pet], {"name": "Bob"}))
#> Person(name='Bob')
pydantic.parse_obj_as(typing.Union[Person, Pet], {"name": "Fido", "breed": "poodle"})
#> Person(name='Fido')

By default, Pydantic permits extra attributes on models. By specifying that extra attributes are forbidden via the extra option, we can help Pydantic narrow in on the correct type.

Example of using `extra` to help differentiate between similar message types
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class Person(pydantic.BaseModel, extra=pydantic.Extra.forbid):
    name: str

class Pet(pydantic.BaseModel, extra=pydantic.Extra.forbid):
    name: str
    breed: str

pydantic.parse_obj_as(typing.Union[Person, Pet], {"name": "Bob"})
#> Person(name='Bob')
pydantic.parse_obj_as(typing.Union[Person, Pet], {"name": "Fido", "breed": "poodle"})
#> Pet(name='Fido', breed='poodle')

Processing Messages

Now that we have our messages categorized, it’s likely that you’ll want to process each message according to its type. We could write a long if isinstance(msg, TypeA): ... elif isinstance(msg, TypeB): ..., but that’s no fun. Instead, we can reach for Python’s functools module, which has a convenient singledispatch decorator.

For those of us who aren’t function programming wizards (e.g. myself), here are some helpful definitions from Python’s glossary:

single dispatch
A form of generic function dispatch where the implementation is chosen based on the type of a single argument.
generic function
A function composed of multiple functions implementing the same operation for different types. Which implementation should be used during a call is determined by the dispatch algorithm.

Let’s take a look at how that could work:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import functools
import pydantic
import typing

class Foo(pydantic.BaseModel):
    type: typing.Literal["foo"]

class Bar(pydantic.BaseModel):
    type: typing.Literal["foo"]


@functools.singledispatch
def send(msg):
    # this is the default sender, which should only ever be called if a message
    # comes in with a type for which we haven't registered a handler. in this
    # situation, we may want to throw an error to signal that we don't know how
    # to handle this message; alternatively we may want to have a default handler
    # that applies to any types without explicitly registered handlers.
    ...

@send.register
def handle_foo(msg: Foo):
    ...

@send.register
def handle_bar(msg: Bar):
    ...
Example of applying this pattern to our previous Email/SMS types
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import functools
import pydantic
import typing


class Email(pydantic.BaseModel):
    to: typing.List[str]
    subject: str
    message: str


class Sms(pydantic.BaseModel):
    to: str
    message: str


@functools.singledispatch
def send(msg):
    # this is the default sender, which should only ever be called if a message
    # comes in with a type for which we haven't registered a handler. in this
    # situation, we may want to throw an error to signal that we don't know how
    # to handle this message; alternatively we may want to have a default handler
    # that applies to any types without explicitly registered handlers.
    raise Exception(f"Unexpected message type ({type(msg)=}, {msg})")

@send.register
def send_email(msg: Email):
    print(f"Sending email to {' and '.join(msg.to)}")

@send.register
def send_sms(msg: Sms):
    print(f"Sending SMS to {msg.to}")

def handle_message(message: typing.Dict[str, typing.Any]):
    parsed_message = pydantic.parse_obj_as(typing.Union[Email, Sms], message)
    send(parsed_message)

handle_message({
    "to": ['[email protected]', '[email protected]'],
    "subject": "BBQ Emergency",
    "message": "Need more ketchup!"
})
#> Sending email to [email protected] and [email protected]
handle_message({
    "to": "911",
    "message": "Burnt finger at BBQ :("
})
#> Sending SMS to 911

Testing

This type-based message handling is neat and all, but can we test this? I found it to be a bit challenging to integrate mocking with the @functools.singledispatch, but ended up using a simple context manager to conveniently swap out registered type handlers with mocks:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import contextlib
import typing
import unittest.mock

@contextlib.contextmanager
def override_registry(
    dispatch_callable: "_SingleDispatchCallable[Any]",
    cls: typing.Type,
    mock: unittest.mock.Mock,
):
    """
    Helper to override a singledispatch function with a mock for testing.
    """
    original = dispatch_callable.registry[cls]
    dispatch_callable.register(cls, mock)
    try:
        yield mock
    finally:
        dispatch_callable.register(cls, original)
Example of a full set of tests to validate that routing logic is appropriately configured
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import functools
import pydantic
import typing
import contextlib
import unittest.mock


class Email(pydantic.BaseModel):
    to: typing.List[str]
    subject: str
    message: str


class Sms(pydantic.BaseModel):
    to: str
    message: str


@functools.singledispatch
def send(msg):
    # this is the default sender, which should only ever be called if a message
    # comes in with a type for which we haven't registered a handler. in this
    # situation, we may want to throw an error to signal that we don't know how
    # to handle this message; alternatively we may want to have a default handler
    # that applies to any types without explicitely registered handlers.
    raise Exception(f"Unexpected message type ({type(msg)=}, {msg})")

@send.register
def send_email(msg: Email):
    print(f"Sending email to {' and '.join(msg.to)}")

@send.register
def send_sms(msg: Sms):
    print(f"Sending SMS to {msg.to}")

def handle_message(message: typing.Dict[str, typing.Any]):
    parsed_message = pydantic.parse_obj_as(typing.Union[Email, Sms], message)
    send(parsed_message)

@contextlib.contextmanager
def override_registry(
    dispatch_callable: "_SingleDispatchCallable[Any]",
    cls: typing.Type,
    mock: unittest.mock.Mock,
):
    """
    Helper to override a singledispatch function with a mock for testing.
    """
    original = dispatch_callable.registry[cls]
    dispatch_callable.register(cls, mock)
    try:
        yield mock
    finally:
        dispatch_callable.register(cls, original)

def test_handling_email():
    """
    Ensure that the system properly handles Email messages.
    """
    with override_registry(
        send, Email, unittest.mock.MagicMock()
    ) as called_mock, override_registry(
        send, Sms, unittest.mock.MagicMock()
    ) as not_called_mock:
        output = handle_message({
            "to": ['[email protected]', '[email protected]'],
            "subject": "BBQ Emergency",
            "message": "Need more ketchup!"
        })

    assert called_mock.call_count == 1
    assert not not_called_mock.call_count

def test_handling_sms():
    """
    Ensure that the system properly handles SMS messages.
    """
    with override_registry(
        send, Sms, unittest.mock.MagicMock()
    ) as called_mock, override_registry(
        send, Email, unittest.mock.MagicMock()
    ) as not_called_mock:
        output = handle_message({
            "to": "911",
            "message": "Burnt finger at BBQ :("
        })

    assert called_mock.call_count == 1
    assert not not_called_mock.call_count

test_handling_email()
test_handling_sms()

This pattern is all a bit new to me and fresh in my mind. Hoping it can prove useful to others. I’m very open to hearing concerns or suggestions for improvement from others if anyone has them.