When building systems to process messages, it’s not unlikely to find yourself in a situation where you need to process a number of inputted heterogeneous messages (i.e. messages of varying shapes/types). For example, consider a situation where you are processing messages from an SQS queue via a Lambda function. This post attempts to highlight how this can be achieved in a clean and elegant manner by utilizing Pydantic, Python’s typing system, and some helpers from the Python standard library.
Categorizing messages of unknown type#
The first thing you likely need to do is identify the type of an inputted message by its properties. We can use Pydantic to model the types of messages we expect to have coming into our system. We can then utilize Pydantic’s parse_obj_as
function to cast these messages to their respective Pydantic classes.
In the following example, we are able to distinguish between messages based on the attributes that they contain:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| import pydantic
import typing
class Email(pydantic.BaseModel):
to: typing.List[str]
subject: str
message: str
class Sms(pydantic.BaseModel):
to: str
message: str
# Create a type demonstrating all of the expected messages
SupportedMessages = typing.Union[Email, Sms]
# Pass in our new type with a list of uncategorized messages
messages = pydantic.parse_obj_as(
typing.List[SupportedMessages],
[
{
"to": ['[email protected]', '[email protected]'],
"subject": "BBQ Emergency",
"message": "Need more ketchup!"
},
{
"to": "911",
"message": "Burnt finger at BBQ :("
}
]
)
# They have now been cast to their appropriate types
print(messages)
#> [Email(to=['[email protected]', '[email protected]'], subject='BBQ Emergency', message='Need more ketchup!'), Sms(to='911', message='Burnt finger at BBQ :(')]
|
Other times, messages are differentiated based on the value of a particular attribute. The same pattern applies:
Example of value-based differentiation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| import pydantic
import typing
class Email(pydantic.BaseModel):
type: typing.Literal["email"]
to: str
message: str
class Sms(pydantic.BaseModel):
type: typing.Literal["sms"]
to: str
message: str
# Create a type demonstrating all of the expected messages
SupportedMessages = typing.Union[Email, Sms]
# Pass in our new type with a list of uncategorized messages
messages = pydantic.parse_obj_as(
typing.List[SupportedMessages],
[
{
"type": "email",
"to": "[email protected]",
"message": "Head's up, Ringo has a new idea"
},
{
"type": "sms",
"to": "867-5309",
"message": "New phone, who dis?"
}
]
)
# They have now been cast to their appropriate types
print(messages)
#> [Email(type='email', to='[email protected]', message="Head's up, Ringo has a new idea"), Sms(type='sms', to='867-5309', message='New phone, who dis?')]
|
Edge Cases#
Unknown types#
In the event that a message does not fit any model, a pydantic.ValidationError
will be thrown:
Example of an unexpected message
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import pydantic
import typing
class Email(pydantic.BaseModel):
to: typing.List[str]
subject: str
message: str
pydantic.parse_obj_as(Email, {"foo": "bar"})
#> Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
# File "pydantic/tools.py", line 38, in pydantic.tools.parse_obj_as
# File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
# pydantic.error_wrappers.ValidationError: 3 validation errors for ParsingModel[Email]
# __root__ -> to
# field required (type=value_error.missing)
# __root__ -> subject
# field required (type=value_error.missing)
# __root__ -> message
# field required (type=value_error.missing)
|
Similar types#
You can find yourself in challenging situations when one type is a subset of another:
Example of a situation where you can differentiate between similar message types
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| import pydantic
import typing
class Person(pydantic.BaseModel):
name: str
class Pet(pydantic.BaseModel):
name: str
breed: str
print(pydantic.parse_obj_as(typing.Union[Person, Pet], {"name": "Bob"}))
#> Person(name='Bob')
pydantic.parse_obj_as(typing.Union[Person, Pet], {"name": "Fido", "breed": "poodle"})
#> Person(name='Fido')
|
By default, Pydantic permits extra attributes on models. By specifying that extra attributes are forbidden via the extra
option, we can help Pydantic narrow in on the correct type.
Example of using `extra` to help differentiate between similar message types
1
2
3
4
5
6
7
8
9
10
11
| class Person(pydantic.BaseModel, extra=pydantic.Extra.forbid):
name: str
class Pet(pydantic.BaseModel, extra=pydantic.Extra.forbid):
name: str
breed: str
pydantic.parse_obj_as(typing.Union[Person, Pet], {"name": "Bob"})
#> Person(name='Bob')
pydantic.parse_obj_as(typing.Union[Person, Pet], {"name": "Fido", "breed": "poodle"})
#> Pet(name='Fido', breed='poodle')
|
Processing Messages#
Now that we have our messages categorized, it’s likely that you’ll want to process each message according to its type. We could write a long if isinstance(msg, TypeA): ... elif isinstance(msg, TypeB): ...
, but that’s no fun. Instead, we can reach for Python’s functools
module, which has a convenient singledispatch
decorator.
For those of us who aren’t function programming wizards (e.g. myself), here are some helpful definitions from Python’s glossary:
- single dispatch
- A form of generic function dispatch where the implementation is chosen based on the type of a single argument.
- generic function
- A function composed of multiple functions implementing the same operation for different types. Which implementation should be used during a call is determined by the dispatch algorithm.
Let’s take a look at how that could work:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import functools
import pydantic
import typing
class Foo(pydantic.BaseModel):
type: typing.Literal["foo"]
class Bar(pydantic.BaseModel):
type: typing.Literal["foo"]
@functools.singledispatch
def send(msg):
# this is the default sender, which should only ever be called if a message
# comes in with a type for which we haven't registered a handler. in this
# situation, we may want to throw an error to signal that we don't know how
# to handle this message; alternatively we may want to have a default handler
# that applies to any types without explicitly registered handlers.
...
@send.register
def handle_foo(msg: Foo):
...
@send.register
def handle_bar(msg: Bar):
...
|
Example of applying this pattern to our previous Email/SMS types
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| import functools
import pydantic
import typing
class Email(pydantic.BaseModel):
to: typing.List[str]
subject: str
message: str
class Sms(pydantic.BaseModel):
to: str
message: str
@functools.singledispatch
def send(msg):
# this is the default sender, which should only ever be called if a message
# comes in with a type for which we haven't registered a handler. in this
# situation, we may want to throw an error to signal that we don't know how
# to handle this message; alternatively we may want to have a default handler
# that applies to any types without explicitly registered handlers.
raise Exception(f"Unexpected message type ({type(msg)=}, {msg})")
@send.register
def send_email(msg: Email):
print(f"Sending email to {' and '.join(msg.to)}")
@send.register
def send_sms(msg: Sms):
print(f"Sending SMS to {msg.to}")
def handle_message(message: typing.Dict[str, typing.Any]):
parsed_message = pydantic.parse_obj_as(typing.Union[Email, Sms], message)
send(parsed_message)
handle_message({
"to": ['[email protected]', '[email protected]'],
"subject": "BBQ Emergency",
"message": "Need more ketchup!"
})
#> Sending email to [email protected] and [email protected]
handle_message({
"to": "911",
"message": "Burnt finger at BBQ :("
})
#> Sending SMS to 911
|
Testing#
This type-based message handling is neat and all, but can we test this? I found it to be a bit challenging to integrate mocking with the @functools.singledispatch
, but ended up using a simple context manager to conveniently swap out registered type handlers with mocks:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import contextlib
import typing
import unittest.mock
@contextlib.contextmanager
def override_registry(
dispatch_callable: "_SingleDispatchCallable[Any]",
cls: typing.Type,
mock: unittest.mock.Mock,
):
"""
Helper to override a singledispatch function with a mock for testing.
"""
original = dispatch_callable.registry[cls]
dispatch_callable.register(cls, mock)
try:
yield mock
finally:
dispatch_callable.register(cls, original)
|
Example of a full set of tests to validate that routing logic is appropriately configured
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
| import functools
import pydantic
import typing
import contextlib
import unittest.mock
class Email(pydantic.BaseModel):
to: typing.List[str]
subject: str
message: str
class Sms(pydantic.BaseModel):
to: str
message: str
@functools.singledispatch
def send(msg):
# this is the default sender, which should only ever be called if a message
# comes in with a type for which we haven't registered a handler. in this
# situation, we may want to throw an error to signal that we don't know how
# to handle this message; alternatively we may want to have a default handler
# that applies to any types without explicitely registered handlers.
raise Exception(f"Unexpected message type ({type(msg)=}, {msg})")
@send.register
def send_email(msg: Email):
print(f"Sending email to {' and '.join(msg.to)}")
@send.register
def send_sms(msg: Sms):
print(f"Sending SMS to {msg.to}")
def handle_message(message: typing.Dict[str, typing.Any]):
parsed_message = pydantic.parse_obj_as(typing.Union[Email, Sms], message)
send(parsed_message)
@contextlib.contextmanager
def override_registry(
dispatch_callable: "_SingleDispatchCallable[Any]",
cls: typing.Type,
mock: unittest.mock.Mock,
):
"""
Helper to override a singledispatch function with a mock for testing.
"""
original = dispatch_callable.registry[cls]
dispatch_callable.register(cls, mock)
try:
yield mock
finally:
dispatch_callable.register(cls, original)
def test_handling_email():
"""
Ensure that the system properly handles Email messages.
"""
with override_registry(
send, Email, unittest.mock.MagicMock()
) as called_mock, override_registry(
send, Sms, unittest.mock.MagicMock()
) as not_called_mock:
output = handle_message({
"to": ['[email protected]', '[email protected]'],
"subject": "BBQ Emergency",
"message": "Need more ketchup!"
})
assert called_mock.call_count == 1
assert not not_called_mock.call_count
def test_handling_sms():
"""
Ensure that the system properly handles SMS messages.
"""
with override_registry(
send, Sms, unittest.mock.MagicMock()
) as called_mock, override_registry(
send, Email, unittest.mock.MagicMock()
) as not_called_mock:
output = handle_message({
"to": "911",
"message": "Burnt finger at BBQ :("
})
assert called_mock.call_count == 1
assert not not_called_mock.call_count
test_handling_email()
test_handling_sms()
|
This pattern is all a bit new to me and fresh in my mind. Hoping it can prove useful to others. I’m very open to hearing concerns or suggestions for improvement from others if anyone has them.