Break the chain asynchronously

Different variations of the Chain of Responsibility pattern can be found in middleware like OWIN and ASP.NET Core. They all share a common approach: nesting functions inside functions, also known as functional composition. In this talk, we’ll build the Chain of Responsibility from scratch, apply it to a message pump for a service bus library and combine it with Async / Await to unleash the full power of asynchronous I/O.

Join this talk to learn how async and recursion plays together. Discover how composition allows changing behavior at runtime. Finally, float state into your Chain of Responsibility so that you don’t sacrifice thread safety. Break the Chain of Responsibility with me.


Okay. Cool. Can everybody hear me? Cool. With the font size, later I'm going to do a lot of coding. If the font size is not big enough for the last row, please tell me. And I'm going to increase it. It's currently 22. It should be enough, but we'll see. Okay. A warm welcome from my side to my talk about the changes of responsibility pattern combined with async/await in I/O bound domains. Who has been in my talk on Wednesday? A few. Okay. Cool. So my name is Daniel Marbach. I'm a solution architect and engineer at Particular So ftware. I live in central Switzerland in Lucerne, not Lausanne. That's the French-speaking part. I'm in the German-speaking part of Switzerland. If you want to know more about me and what I do in my free time, I suggest you listen to David Rael's Podcast Developer On Fire Episode 77.
You can reach me on Twitter under @danielmarbach and I blog on and also on the Particular blog, I hope you subscribe on these two blogs. This talk is divided into three sections. The first I'm going to show the patterns, chain of responsibility itself. Then I'm going to build it live on stage. First, a synchronous version of the chain of responsibility and then an asynchronous version of the chain of responsibility. And then we do a little wrap-up at the end. And of course questions, if you have any. I think since we are a pretty small group, if you have any questions and you feel it's blocking you to understand what's happening on the screen, please feel free to shout in. And I'm going to answer the question and if other questions are rising then just please ask them at the end. So, who has ever used NancyFX?
Hands up. Okay. Who has ever used NancyFX before and after module hooks? Okay. Just a few. Okay. Who has used FubuMVC from Jeremy Miller before? Okay. Who has used behaviors in FubuMVC? Just one. Okay. Who has used OWIN before? Hands up. A few. Okay. Who has written that type of code I'm showing here on the slides? Okay. A few. So what this code does, it registers a function on the OWIN Middleware which is an async lambda function. And you get passed in a context and the next delegate. And if you want to continue the chain, you call await dot next. And if you want to do something before all the other things in the chain before this function is called, you can then just wrap the code on the do something things here. And if you want to do something at the end of the chain then you add your codes after the next call.
So who has used Web API or MVC action filters before? The async versions? Hands up. Okay. Only a few. So this is a bunch of codes from API action filters. It's also pretty similar. You inherit, you implement the action filter and then you implement a method which is called execute action filter async. You get in an HTP action context. You get in cancellation token and you get in a function func of task which returns and HTP response message, which is called continuation. It's not called next like in the OWIN Middleware, but it's called continuation. And then you do whatever you need to do in your action filter. And when you feel it's time to call the continuation function, then you call await continuation. And you can for example, a wrap it in a try catcher whatever. So this in then end is going to invoke your API controllers and your other stuff that is currently in the HTP pipeline with the Web API stuff.
So all these variations, we just discussed NancyFX before and after module hooks, OWIN pipeline, OWIN Middleware. And of course also action filters. These are variations and implementations of the chain of responsibility pattern. So the goal of this talk today is I want to show you how the chain of responsibility pattern works. I want to show you how you can build it yourself. This sounds a bit weird. Why do we want to go that deep? I'm a firm believer that if you're using something like action filters, OWIN Middleware, it's crucial to understand the patterns behind it, because if it's going to fail in production and you don't know how it actually works under the hood, you're going to be screwed pretty much. That's my opinion. Of course, you can make up your own opinion, but that's why I'm currently standing here and trying to explain this pattern to you guys.
I hope you find it useful. And if not, tell me at the end. The chain of responsibility pattern is a nice pattern. It allows you to extend behavior during runtime, for example. Like the action filters, you can plug in your own action filters into the whole HTP pipeline and do whatever you need to do for your own business logic, like identification and so on. So the pattern itself, it's defined by you have a client which calls the chain and then the chain has a few different link elements in that chain. What's important in this pattern is that each link element has an abstract notion of the next element in the chain. And we saw that it's the next delegate or the continuation delegate that is passed into the action filter or the next delegate which is passed into the OWIN pipeline.
So they usually all share a common approach. They nest functions inside functions. So it's kind of a functional design pattern. A brief explanation, how you can find the chain of responsibility in your own life or at least in my life. So when I empty the dishwasher, I sometimes play a little game with my son and with my wife. My son has about that height maybe a little bit more than a meter. And then my wife is 163-ish or maybe a bit more. And I'm one meter and 92 centimeters. So we can build a chain of responsibility in our kitchen. So my son is the first element, the first link in the chain. Then my wife is the next element. And then I'm the last element. And what we want to do is we want to empty the dishwasher. And since we have different cupboards in the kitchen on different heights.
So when my son takes something out of the dishwasher, he can look at it and he knows that, oh, it's something in the cupboards which he can reach. And then he puts it into the cupboards. If he can't reach a cupboard, he can hand it over to my wife. And if my wife can reach the cupboards, then she puts it into the cupboards. And if she can't reach the cupboards, she hands it over to me, for example, our wine glasses, because we don't use them that often. We have it in the highest cupboards in the kitchen which is only reachable by me. So if she finds a wine glass in the dishwasher, then she hands it over to me and I put it into the cupboards. So that's a chain of responsibility pattern. So what's important that each person in this chain of responsibility in our kitchen fulfills the single responsibility pattern.
So, my son has a responsibility, like I said, the lowest cupboards and I have a responsibility, maybe the highest cupboards in the kitchen. And like I said, each element, my son knows that he has to hand it over to my wife and my wife knows that she has to hand it over to me. So if we were to translate this to codes, it would basically look like this. We have a method, static void person, which gets an action delegate here called next. And then we have an implementation. And then whenever we are done, we call the next delegate. So we are calling the next link in the chain. So the dishwasher unloading process in our kitchen at home would probably look like this code here. So my son would declare a lambda delegate to call my wife. My wife would declare a lambda delegate to call me, the husband. And at the end, the chain is done. So I would call the done delegate. So let's build this thing in visual studio.
Can you all see it? Is the font size good enough? Okay. So I have here prepared son, wife, and husband. So like I said, if my son wants to call the next, we declare an action delegate, and then we call it next or whatever. And at some point when my son is done executing the chain of responsibility, he's going to call next, the next delegate. Okay. And of course the same applies for my wife, passing an action delegate and she calls the action delegate next. And then the same applies for me, the husband. So I'll call next or next section. Doesn't really matter how we call it and I call next again. So now all the individual elements are prepared to actually be hooked together. So let's call the thing. So, like I said, we need to call son. And since it's a method, which accepts a method again, which fulfills the signature of returning voids and getting in an action delegate, we can then declare an action delegate of type action and call the next one, which is wife.
And then again an action delegate and we call husband and now we need to have a done condition, right? So we can do this by, for example, declaring an anonymous action delegate. I call it here done and let's implement it pretty quickly. And let's just output done. Output is an extension method which does nothing more than just calling console right line in a simpler way. So and then I just pass in the done delegate. And of course I need to execute it. So this is simple synchronous chain of responsibility. Let's execute it in visual studio. So what we see now, we get son, wife, husband done. Really simple. So let's go back to these lives and do a brief recap.
So what's going to happen during runtime is that we get pretty deep call stacks. So we get here a depth of three elements, well done as well. But I omitted this on this drawing, but basically we go down, my son is the outer element and the outer element wraps all the inner elements. And then calls wife next and husband. So what's happens is during runtime, we basically go down the call stack and then we go up again. So the benefit of this pattern is because each link element has access to the next one. Each link element can wrap the execution of the entire pipeline, which is coming after that element. So for example, we can wrap it in try catchers, we can use using statements around the next execution and so on. But of course, if you would need to write the code I just wrote in the previous code examples by hand, each and every time we rebuilt this chain of responsibility, this would be really, really cumbersome, right?
And in a real production scenario, we want to be able to compose this by some maybe abstract notion, by maybe applying some reflection stuff to load all the link elements and compose it dynamically together. Because we want to benefit from the open-close principle or the extensibility of this pattern. So how do we write this pattern in a more generic way? I'm sorry. I'm missing configure with false here on these slides. I'm going to fix this later. So, okay. So let's build this one in codes. So the more generic version. So the goal is we want to call, we want to get the same output, we want to see on the screen son, wife, husband done. So how do we do that? So what we need to do, we need to have a container for these actions. What would be a good container for these actions? So that we can call them in more generic ways. Any ideas from the audience?
00:13:57 Daniel Marbach
List? Yeah. Cool. So I call it actions. Yup.
Yes. Thanks. You're my hero. Here it is. Okay. So I call a list of actions. So, but what kind of types would we pass in here? Any ideas?
Probably action. Is this enough? No, it's not because we want to put in into this list methods, which accept the methods of type action. So what we can do is we declare this one as action of type action. So we have a list which is a type of action of action. So what does that mean? That means we have delegates in there which returned voids, which accept the parameter of type action. Okay? Everyone still here? Good. So and then of course we can add to this list our stuff. So we can enter here.
We can add, for example, my son and then we can add my wife. So and then we can add myself. And of course also we can declare done delegates. I'm going to copy-paste this one. Forgive me for copy-pasting codes. So and we're going to add this one as well here. So of type done. Of course, unfortunately, this doesn't really compile, right? Because these methods doesn't have the signature of action of action. So what we need to do is we need to basically insert here a lambda delegate which then calls the action. And then it should work. It doesn't.
That said action.
00:16:09 Speaker 5
00:16:12 Daniel Marbach
00:17:02 Daniel Marbach
00:17:58 Speaker 7
00:18:00 Daniel Marbach
00:18:46 Daniel Marbach
00:20:08 Daniel Marbach
00:21:15 Daniel Marbach
00:22:28 Daniel Marbach
00:23:30 Daniel Marbach
00:24:11 Daniel Marbach
00:25:06 Daniel Marbach
00:26:25 Speaker 9
Task. Exactly. So we return to task. So that we can float the asynchronous execution context through everything, we need to be async all the way, remember? So, in order to do that, we also need to change the next delegate. So the next delegate in this example needs to become a func of tasks. So next needs to return a task. And what we can do now is we can call await next. So let's do that. We call await next. And of course, because we have now here in await statement, we need to mark the method as async. So now what we can do is, and that's really beneficial if you have deep chain of responsibilities, is if you only do something before the await call and you have only one single await call in a method, the best thing, what you can do is return the next instead of awaiting it.
And then you need to remove the async prefix from the methods. So what's going to happen here is the compiler will not generate the state machines under the hood. And therefore it will not generate the necessary classes, which do the locations and yada yada for the state machine. So you will just return the task here. And it's a bit more efficient during runtime. But of course, when we do that, we can no longer wrap the next inside a try catch or a using statement. That only works if we only have one next and we only do something before. Okay. But now let's change this on all the things. So here again, we return a task and we call wife return and then here again, the same.
Okay. And also task returning and then return next. So that's it. So how do we call this one? Well, first thing await, right? So again, we call son and then we call wife, but of course async and wife async and then again husband async and then of course we also need a done async. So let's copy-paste and let's briefly change the signature of this done thing to be async compatible. So we make again a func of task here and we call done and now we need to return a task. So since we're not doing anything async in here, what we can do here is instead of marking this one as async, we can just return. If you're on, for example, on .NET 4.6, you can use task .completed task, which is cached already completed task you can return. Or if you have a lower .NET version you can use task .from result zero, one falls through whatever you want to use, right?
So as long as you make it consistent. And then again, we call that done delegate inside this pipeline and that's it. As you can see here, of course, again, we could mark these delegates as well async and we could, again, await inside these lambda expressions. Again, if we do that, we would also increase the call stack unnecessarily because we have a lot of await statements in there. We have a lot of compiler-generated code, which then during runtime bloats the allocations. So this optimization is really important if you do high throughput systems, but if not, it doesn't really matter for most of the cases. So now we have son, wife, husband, done. So the same is now async. So what we can do is while this took maybe 60 milliseconds. So let's do a quick demonstration that you actually believe that the thing is async.
So let's do, for example, await next. And then let's mark this one as async and we can do in await task or delay let's say of one second, a thousand milliseconds and now if we execute this one again, what you are going to see at some point is it's going to take more than one second. But it's not blocking. We're not blocking any threats. Completely async-enabled pipeline. So, but again, we want a generic version of this one because we want to write this every time again. So let us copy-paste the previous incurrence version because surprise, surprise it's not really different. So let us do this. So what do we need to change? Let me introduce, let me copy-paste parts of this test for the infrastructure up here. And let's call it async automatic dishwasher unloading. Of course, we need this done delegate again. So any ideas? What we need to change? Again, we have a list of action of action. What do we have now for the asynchronous version?
You need to await your invoke.
00:32:06 Speaker 10
00:32:09 Daniel Marbach
00:32:23 Daniel Marbach
00:32:24 Speaker 11
00:32:26 Daniel Marbach
00:33:25 Daniel Marbach
00:34:15 Daniel Marbach
00:35:42 Daniel Marbach
00:36:40 Daniel Marbach
00:37:16 Daniel Marbach
00:37:59 Daniel Marbach
00:39:08 Speaker 3
00:39:56 Speaker 3
00:40:49 Speaker 3
00:41:55 Speaker 3
00:43:17 Speaker 3
00:44:13 Speaker 3
00:45:26 Speaker 3
00:46:35 Speaker 3
00:47:48 Speaker 3
00:48:48 Speaker 3
00:50:01 Speaker 3
00:51:02 Speaker 3
00:51:49 Speaker 3
00:52:32 Speaker 3
00:53:25 Speaker 3
00:54:09 Speaker 3
00:54:42 Speaker 3
00:55:17 Speaker 3
00:56:12 Speaker 3
00:57:08 Speaker 3
It's async all the way. It uses the chain of responsibility pattern heavily. If you're curious, you can go to ./particularnet/nservicebus/pipeline customizing v6 and you'll see how this pattern is implemented. We call them behaviors, not link elements. And of course the code is also on GitHub. And the brief recap. I think the chain of responsibility pattern or some others called them also Russian dolls coming from JavaScript is a really flextensible pattern, ideally suited to build robust I/O bound pipelines and domains. The pattern is used as I showed in many open source projects and infrastructure things. And I'll say, know it, learn it, love it. I hope that's not copyrighted by the .NET Rocks guys, but maybe it is. So, yeah, my slides' links are available on You can have a look at it.
I have also other implementations of a potential approach to reduce, for example, the nesting in the call stack. I call that partial dolce or partial chain of responsibility. If you're curious how that's implemented and a few other attempts to make it, for example, recursion free and so on. So if you want to deep dive into that stuff, feel free to have a look at the codes. And if you want to do a brief recap on async/await and how, for example, a message pump works. I haven't shown that. I implement in my webinar on I implement with TPL and async a complete message pump live in this webinar. You can just register on this link. And yeah, if you have any questions feel free to shoot me any questions. I will stay here until roughly 6:00 PM at the Particular booth. If you have questions afterwards just approach me or right now. Any questions?
Sorry. Which one?
This one? Any other questions?
I have one.
Why we move it out?
00:59:26 Daniel Marbach
Okay. Well, so for example, if we have per end point what we call, it's basically a thing that consumes off a queue, we have one message pump which consumes messages. So if you have a thousand messages, let's say you consume five and then you've one message in there which is going to fail. And you retry and you retry. And let's say you have just concurrency set to one, right? You're going to retry this one indefinitely, right?
So what we do is, instead of at some point we say, "Okay, this one has now failed a number of times." It doesn't really make sense to retry it again because it's more of a permanent failure. So we move it out of the queue so that the processing can continue. And all the other messages in the queue are continued. And then we basically raise an event to the operators of the system saying, "Hey, there is a message in there which you need to have a look at and decide for yourself what you want to do with that message."
Yes you can. Yeah. Of course. Yeah.
If you have it inside.
I have it inside. Yes. Yeah. But that's an implementation detail. Yeah. Yeah. Any other questions? Yeah.
How do you get the concurrency?
How do you get the concurrency?
In parallel? Okay. So what I like to say, it's concurrent. So it's not parallel because we are doing async/await. It can be parallel but it doesn't necessarily have to be. But what you can do is inside, for example, if you have a tree of responsibilities, each fork connector can for example, to decide to create multiple forks. And instead of awaiting each fork, you can basically spawn off all the forks and then do a task in all on these forks. And then you have concurrent execution of forks.
But what for example, we also do... Let me briefly show you this here on this picture. So what the message pump does is it has a concurrency setting. For example, it can be 100, right? So what we do is we basically pick 100 messages and then we have 100 concurrent chain of responsibilities, with potential multiple forks inside that chain of responsibility running on one or multiple threats. Does that answer your question?
Yeah. I think so.
Cool. Any other questions? Oh, then thanks.