Skip to main content

Break the chain asynchronously

Different variations of the Chain of Responsibility pattern can be found in middleware like OWIN and ASP.NET Core. They all share a common approach: nesting functions inside functions, also known as functional composition. In this talk, we’ll build the Chain of Responsibility from scratch, apply it to a message pump for a service bus library and combine it with Async / Await to unleash the full power of asynchronous I/O.

Join this talk to learn how async and recursion plays together. Discover how composition allows changing behavior at runtime. Finally, float state into your Chain of Responsibility so that you don’t sacrifice thread safety. Break the Chain of Responsibility with me.

🔗Transcription

00:00:04 Daniel Marbach
Okay. Cool. Can everybody hear me? Cool. With the font size, later I'm going to do a lot of coding. If the font size is not big enough for the last row, please tell me. And I'm going to increase it. It's currently 22. It should be enough, but we'll see. Okay. A warm welcome from my side to my talk about the changes of responsibility pattern combined with async/await in I/O bound domains. Who has been in my talk on Wednesday? A few. Okay. Cool. So my name is Daniel Marbach. I'm a solution architect and engineer at Particular So ftware. I live in central Switzerland in Lucerne, not Lausanne. That's the French-speaking part. I'm in the German-speaking part of Switzerland. If you want to know more about me and what I do in my free time, I suggest you listen to David Rael's Podcast Developer On Fire Episode 77.
00:01:01 Daniel Marbach
You can reach me on Twitter under @danielmarbach and I blog on planetgeek.ch and also on the Particular blog, particular.net/blog. I hope you subscribe on these two blogs. This talk is divided into three sections. The first I'm going to show the patterns, chain of responsibility itself. Then I'm going to build it live on stage. First, a synchronous version of the chain of responsibility and then an asynchronous version of the chain of responsibility. And then we do a little wrap-up at the end. And of course questions, if you have any. I think since we are a pretty small group, if you have any questions and you feel it's blocking you to understand what's happening on the screen, please feel free to shout in. And I'm going to answer the question and if other questions are rising then just please ask them at the end. So, who has ever used NancyFX?
00:02:01 Daniel Marbach
Hands up. Okay. Who has ever used NancyFX before and after module hooks? Okay. Just a few. Okay. Who has used FubuMVC from Jeremy Miller before? Okay. Who has used behaviors in FubuMVC? Just one. Okay. Who has used OWIN before? Hands up. A few. Okay. Who has written that type of code I'm showing here on the slides? Okay. A few. So what this code does, it registers a function on the OWIN Middleware which is an async lambda function. And you get passed in a context and the next delegate. And if you want to continue the chain, you call await dot next. And if you want to do something before all the other things in the chain before this function is called, you can then just wrap the code on the do something things here. And if you want to do something at the end of the chain then you add your codes after the next call.
00:03:07 Daniel Marbach
So who has used Web API or MVC action filters before? The async versions? Hands up. Okay. Only a few. So this is a bunch of codes from API action filters. It's also pretty similar. You inherit, you implement the action filter and then you implement a method which is called execute action filter async. You get in an HTP action context. You get in cancellation token and you get in a function func of task which returns and HTP response message, which is called continuation. It's not called next like in the OWIN Middleware, but it's called continuation. And then you do whatever you need to do in your action filter. And when you feel it's time to call the continuation function, then you call await continuation. And you can for example, a wrap it in a try catcher whatever. So this in then end is going to invoke your API controllers and your other stuff that is currently in the HTP pipeline with the Web API stuff.
00:04:12 Daniel Marbach
So all these variations, we just discussed NancyFX before and after module hooks, OWIN pipeline, OWIN Middleware. And of course also action filters. These are variations and implementations of the chain of responsibility pattern. So the goal of this talk today is I want to show you how the chain of responsibility pattern works. I want to show you how you can build it yourself. This sounds a bit weird. Why do we want to go that deep? I'm a firm believer that if you're using something like action filters, OWIN Middleware, it's crucial to understand the patterns behind it, because if it's going to fail in production and you don't know how it actually works under the hood, you're going to be screwed pretty much. That's my opinion. Of course, you can make up your own opinion, but that's why I'm currently standing here and trying to explain this pattern to you guys.
00:05:04 Daniel Marbach
I hope you find it useful. And if not, tell me at the end. The chain of responsibility pattern is a nice pattern. It allows you to extend behavior during runtime, for example. Like the action filters, you can plug in your own action filters into the whole HTP pipeline and do whatever you need to do for your own business logic, like identification and so on. So the pattern itself, it's defined by you have a client which calls the chain and then the chain has a few different link elements in that chain. What's important in this pattern is that each link element has an abstract notion of the next element in the chain. And we saw that it's the next delegate or the continuation delegate that is passed into the action filter or the next delegate which is passed into the OWIN pipeline.
00:06:05 Daniel Marbach
So they usually all share a common approach. They nest functions inside functions. So it's kind of a functional design pattern. A brief explanation, how you can find the chain of responsibility in your own life or at least in my life. So when I empty the dishwasher, I sometimes play a little game with my son and with my wife. My son has about that height maybe a little bit more than a meter. And then my wife is 163-ish or maybe a bit more. And I'm one meter and 92 centimeters. So we can build a chain of responsibility in our kitchen. So my son is the first element, the first link in the chain. Then my wife is the next element. And then I'm the last element. And what we want to do is we want to empty the dishwasher. And since we have different cupboards in the kitchen on different heights.
00:07:03 Daniel Marbach
So when my son takes something out of the dishwasher, he can look at it and he knows that, oh, it's something in the cupboards which he can reach. And then he puts it into the cupboards. If he can't reach a cupboard, he can hand it over to my wife. And if my wife can reach the cupboards, then she puts it into the cupboards. And if she can't reach the cupboards, she hands it over to me, for example, our wine glasses, because we don't use them that often. We have it in the highest cupboards in the kitchen which is only reachable by me. So if she finds a wine glass in the dishwasher, then she hands it over to me and I put it into the cupboards. So that's a chain of responsibility pattern. So what's important that each person in this chain of responsibility in our kitchen fulfills the single responsibility pattern.
00:07:56 Daniel Marbach
So, my son has a responsibility, like I said, the lowest cupboards and I have a responsibility, maybe the highest cupboards in the kitchen. And like I said, each element, my son knows that he has to hand it over to my wife and my wife knows that she has to hand it over to me. So if we were to translate this to codes, it would basically look like this. We have a method, static void person, which gets an action delegate here called next. And then we have an implementation. And then whenever we are done, we call the next delegate. So we are calling the next link in the chain. So the dishwasher unloading process in our kitchen at home would probably look like this code here. So my son would declare a lambda delegate to call my wife. My wife would declare a lambda delegate to call me, the husband. And at the end, the chain is done. So I would call the done delegate. So let's build this thing in visual studio.
00:09:20 Daniel Marbach
Can you all see it? Is the font size good enough? Okay. So I have here prepared son, wife, and husband. So like I said, if my son wants to call the next, we declare an action delegate, and then we call it next or whatever. And at some point when my son is done executing the chain of responsibility, he's going to call next, the next delegate. Okay. And of course the same applies for my wife, passing an action delegate and she calls the action delegate next. And then the same applies for me, the husband. So I'll call next or next section. Doesn't really matter how we call it and I call next again. So now all the individual elements are prepared to actually be hooked together. So let's call the thing. So, like I said, we need to call son. And since it's a method, which accepts a method again, which fulfills the signature of returning voids and getting in an action delegate, we can then declare an action delegate of type action and call the next one, which is wife.
00:10:39 Daniel Marbach
And then again an action delegate and we call husband and now we need to have a done condition, right? So we can do this by, for example, declaring an anonymous action delegate. I call it here done and let's implement it pretty quickly. And let's just output done. Output is an extension method which does nothing more than just calling console right line in a simpler way. So and then I just pass in the done delegate. And of course I need to execute it. So this is simple synchronous chain of responsibility. Let's execute it in visual studio. So what we see now, we get son, wife, husband done. Really simple. So let's go back to these lives and do a brief recap.
00:11:39 Daniel Marbach
So what's going to happen during runtime is that we get pretty deep call stacks. So we get here a depth of three elements, well done as well. But I omitted this on this drawing, but basically we go down, my son is the outer element and the outer element wraps all the inner elements. And then calls wife next and husband. So what's happens is during runtime, we basically go down the call stack and then we go up again. So the benefit of this pattern is because each link element has access to the next one. Each link element can wrap the execution of the entire pipeline, which is coming after that element. So for example, we can wrap it in try catchers, we can use using statements around the next execution and so on. But of course, if you would need to write the code I just wrote in the previous code examples by hand, each and every time we rebuilt this chain of responsibility, this would be really, really cumbersome, right?
00:12:48 Daniel Marbach
And in a real production scenario, we want to be able to compose this by some maybe abstract notion, by maybe applying some reflection stuff to load all the link elements and compose it dynamically together. Because we want to benefit from the open-close principle or the extensibility of this pattern. So how do we write this pattern in a more generic way? I'm sorry. I'm missing configure with false here on these slides. I'm going to fix this later. So, okay. So let's build this one in codes. So the more generic version. So the goal is we want to call, we want to get the same output, we want to see on the screen son, wife, husband done. So how do we do that? So what we need to do, we need to have a container for these actions. What would be a good container for these actions? So that we can call them in more generic ways. Any ideas from the audience?
00:13:54 Speaker 2
List.
00:13:57 Daniel Marbach
List? Yeah. Cool. So I call it actions. Yup.
00:14:02 Daniel Marbach
Yes. Thanks. You're my hero. Here it is. Okay. So I call a list of actions. So, but what kind of types would we pass in here? Any ideas?
00:14:20 Daniel Marbach
Probably action. Is this enough? No, it's not because we want to put in into this list methods, which accept the methods of type action. So what we can do is we declare this one as action of type action. So we have a list which is a type of action of action. So what does that mean? That means we have delegates in there which returned voids, which accept the parameter of type action. Okay? Everyone still here? Good. So and then of course we can add to this list our stuff. So we can enter here.
00:15:11 Daniel Marbach
We can add, for example, my son and then we can add my wife. So and then we can add myself. And of course also we can declare done delegates. I'm going to copy-paste this one. Forgive me for copy-pasting codes. So and we're going to add this one as well here. So of type done. Of course, unfortunately, this doesn't really compile, right? Because these methods doesn't have the signature of action of action. So what we need to do is we need to basically insert here a lambda delegate which then calls the action. And then it should work. It doesn't.
00:16:05 Speaker 5
That said action.
00:16:08 Daniel Marbach
Sorry.
00:16:09 Speaker 5
That said action parameter.
00:16:12 Daniel Marbach
Yes, exactly. Because what we're doing is, this delegate actually gets in again, for example, the next one, and but here we don't want to call the next one we just ignore it and we just want to call the done methods. Correct. So now how do we execute this? Well, I have here a method invoke and I pass in here, the list of actions of actions. So I call this actions and the simplest way to do this is I declare current index and assign to zero. Sorry. Okay. And then I fixed my spelling mistakes. Thanks. And now what else do I need?
00:17:02 Daniel Marbach
Well, I have now a list of actions of actions and I have a current index. So I'm going to apply here recursion. So of course for the recursion, what I need is I need a done condition or a next condition to abort the recurse. So one way to do it is we could say, for example, if current index equals equals actions .count, really simple one, then I'm just returning from my list. I'm returning from my method. And then the next thing, what I want to do is I want to get the first thing I need to execute. So I do here actions. I get over the indexer. I get the element of the current index. Now I have the action. So what do I execute right now to basically spin off the chain of responsibility? Any ideas? What do I need to execute?
00:17:58 Speaker 7
Current with the next?
00:18:00 Daniel Marbach
Current with the next. So current, like you said, here is action, right? So I'm executing this one. And action again, accepts an action, right? So what we can do is we can basically declare an inner delegate again and then we invoke ourself. So we applied the recursion inside the lambda expression here so we invoke the invoke methods, passing again the action lists. And then we pass in current index and increase the current index. Okay. Any questions so far? Is that clear for everyone? Because we're going to build on this pattern.
00:18:46 Daniel Marbach
Cool. So now, what we have to do is we have here this chain of responsibility. And now we can call this invoke methods. We pass in the action list and since current index is already set to zero by default, we don't need to pass in anything else, but when we call it internally, we increase the current index by one. So if I didn't do wrong, that should work. Let's try it out. Let's execute this test. And as we can see, surprise, surprise, we have the same result as before and now we have a generic pattern to basically extend it. So assuming my wife and I get more kids, more sons or wishful thinking another wife. Let's see, as you can see here, we now have a much deeper call stack. Oh, it's not ending. It goes forever until we have a stack trace exception. Any ideas why?
00:20:08 Daniel Marbach
So, by the where? Like this? Cool. Just fix the back. Luckily we have unit tests. Okay. So, but what does this allow to do for us? Well, what we can do is we can now for example, introduce filters into that one. I just showed you how I added more sons and wishful thinking another wife, let's reduce this a little bit again. So we could also, for example, say, if anything happens in there, we don't really want to abort the whole pipeline. We just want to, for example, catch exceptions. So we can do this by declaring another method. Let's call it static void filter exceptions and we accept an action delegate of next. And then what you can do is we can do a try catch and we can call the next delegate.
00:21:15 Daniel Marbach
And well, instead of rethrowing we can just basically whatever we want to do, but let's see. We just output the exception message into the console window but we don't want to raise the exception to the color. We just abort it. So what we can do is assuming we have somewhere, let's call it static voids evil method. And again this one gets an action delegate of next and this evil methods just basically froze a new invalid operation exception. And we plug it in somewhere. Let's say we plug it in here just before the done. What we can now do is when we execute this code again, while of course we will see the invalid operation being raised to the caller over the unit test. But if we enter at any place here in the pipeline, we add this filter exception. What's going to happen is that the test now is green.
00:22:28 Daniel Marbach
We just log out. The operation is not valid to the current state of the object because we filtered it out, but the exception will not be raised to the color. Okay. So, let's see what happens during runtime in this piece of code. So I'll set a break point in this thing here, and let's see if I execute it in the debugger and we step into the whole process. So as we can see, we have now six items in that list and currently index is no. And by the way this is OS codes. There is a booth down there. I'm using that as well. So now I'm going into discern one and then I'm going into next, going back again, recursion, picking the next element and going to the next one, and next one. Let's see. I'm diving here, the call stack, sorry. And deeper and deeper into the call stack.
00:23:30 Daniel Marbach
So if you would look into the call stack window, let's see, it's a bit small, but what you're going to see is the call stack. The more elements we have in that list, the deep protocols that gets, right? So at some point, this pattern has limitation because we're going to explore the call stack. And of course, if something deep down in the chain of responsibility happens, that exception is going to be raised the call stack up. And therefore each link element in that call stack can influence the behavior, but also can influence the call stack of that exception.
00:24:11 Daniel Marbach
Because, well, that's the size of the thing we were going to see. So you should try to avoid to put too many things into the chain of responsibility. It's hard to say how many things you should actually put in. But for example, in NServiceBus, we have maybe roughly in a incoming pipeline, we're going to talk about that later bit and then outgoing pipeline. Together we probably have 40 elements or so, not really much more. Because each individual element in that chain of responsibility will also influence the throughput of a message handling pipeline of course. Okay. So, let me abort that one and let's briefly go back to the slides. So, I showed you that we can essentially just add filter elements, like exception, catching and so on.
00:25:06 Daniel Marbach
But since you know me and I've been talking a lot about async/await and I'm also going to talk here in this talk about message handling. Well, the best thing to actually use in an I/O bound domain is async/await and task returning APIs. So what I'm going to show you is how you use this synchronous version and refactor it to an async-enabled chain of responsibility pattern. So we are going to build this together. So let me switch again to visual studio. So let's do the generic. No. So let's do the step-by-step approach first. So we have again here, son, wife and husband. So in order to not clash too many things, what I'm going to do is I'm going to copy-paste these methods here and we're going to call them son async and wife async and husband async just so. Okay. So, in order to have a fully async chain of responsibility, what we need to do is we need to change the return type of this method. To what?
00:26:25 Speaker 9
Task.
00:26:28 Daniel Marbach
Task. Exactly. So we return to task. So that we can float the asynchronous execution context through everything, we need to be async all the way, remember? So, in order to do that, we also need to change the next delegate. So the next delegate in this example needs to become a func of tasks. So next needs to return a task. And what we can do now is we can call await next. So let's do that. We call await next. And of course, because we have now here in await statement, we need to mark the method as async. So now what we can do is, and that's really beneficial if you have deep chain of responsibilities, is if you only do something before the await call and you have only one single await call in a method, the best thing, what you can do is return the next instead of awaiting it.
00:27:33 Daniel Marbach
And then you need to remove the async prefix from the methods. So what's going to happen here is the compiler will not generate the state machines under the hood. And therefore it will not generate the necessary classes, which do the locations and yada yada for the state machine. So you will just return the task here. And it's a bit more efficient during runtime. But of course, when we do that, we can no longer wrap the next inside a try catch or a using statement. That only works if we only have one next and we only do something before. Okay. But now let's change this on all the things. So here again, we return a task and we call wife return and then here again, the same.
00:28:27 Daniel Marbach
Okay. And also task returning and then return next. So that's it. So how do we call this one? Well, first thing await, right? So again, we call son and then we call wife, but of course async and wife async and then again husband async and then of course we also need a done async. So let's copy-paste and let's briefly change the signature of this done thing to be async compatible. So we make again a func of task here and we call done and now we need to return a task. So since we're not doing anything async in here, what we can do here is instead of marking this one as async, we can just return. If you're on, for example, on .NET 4.6, you can use task .completed task, which is cached already completed task you can return. Or if you have a lower .NET version you can use task .from result zero, one falls through whatever you want to use, right?
00:29:45 Daniel Marbach
So as long as you make it consistent. And then again, we call that done delegate inside this pipeline and that's it. As you can see here, of course, again, we could mark these delegates as well async and we could, again, await inside these lambda expressions. Again, if we do that, we would also increase the call stack unnecessarily because we have a lot of await statements in there. We have a lot of compiler-generated code, which then during runtime bloats the allocations. So this optimization is really important if you do high throughput systems, but if not, it doesn't really matter for most of the cases. So now we have son, wife, husband, done. So the same is now async. So what we can do is while this took maybe 60 milliseconds. So let's do a quick demonstration that you actually believe that the thing is async.
00:30:45 Daniel Marbach
So let's do, for example, await next. And then let's mark this one as async and we can do in await task or delay let's say of one second, a thousand milliseconds and now if we execute this one again, what you are going to see at some point is it's going to take more than one second. But it's not blocking. We're not blocking any threats. Completely async-enabled pipeline. So, but again, we want a generic version of this one because we want to write this every time again. So let us copy-paste the previous incurrence version because surprise, surprise it's not really different. So let us do this. So what do we need to change? Let me introduce, let me copy-paste parts of this test for the infrastructure up here. And let's call it async automatic dishwasher unloading. Of course, we need this done delegate again. So any ideas? What we need to change? Again, we have a list of action of action. What do we have now for the asynchronous version?
00:32:06 Speaker 10
You need to await your invoke.
00:32:06 Daniel Marbach
Sorry.
00:32:06 Speaker 10
You need to await your invoke.
00:32:09 Daniel Marbach
I need to await my invoke. Yes. Correct. So you're saying we need to return here task first step. Cool. Thanks.
00:32:23 Daniel Marbach
A list of what?
00:32:24 Speaker 11
Action func of action.
00:32:26 Daniel Marbach
List of action of func of action. Okay. Let me read that to you. I have a list of methods, which return void, which accept a function, which returns a function, which returns voids. So what is the signature of this method? Remember? Have a look at it again. So the signature is it returns a task. So it's a func of task, which accepts a func of task. Okay. But, okay, so let's change it. So we call this one invoke async because we are now returning, we're returning a task and now we're saying, okay, we have a function which gets a function, which returns the task.
00:33:25 Daniel Marbach
And now what we return is a task. Okay. That's the signature we want here. And again, the same current index and stuff, but now we can use the same trick because we want to avoid to await unnecessarily. So what we do here is, we have two exit points in this method. One is the exit condition for the recursion. So we do return tasks from result. We grabbed the first one. I'm still calling this one action is not entirely accurate, but please bear with me. So, and then passing in again the list of funcs. But of course now I need to call the asynchronous version. And now it's important. I need to return to tasks.
00:34:15 Daniel Marbach
Okay. Remember with async voids, task returning becomes the new void. So now we are able to entirely float the asynchronous execution through everything. So now we mark this one as async task. And of course now we need to mark this list here as well of list of func or func of task. And then what we need to do, we need to pass in the asynchronous versions of the methods. So son async and of course also wife async and of course also husband async. Of course the filter methods, evil methods, and so on would also need to change in my example, but I'm not doing this for this specific example. So now execute this. And as we can see here is we have son, wife, husband and it also takes a bit more than a second, because we still have a delay inside to some methods. Okay. So that's an asynchronous chain of responsibility pattern implemented live on stage at NDC. Are there any questions here so far?
00:35:42 Daniel Marbach
No. Okay. So let me briefly go back to explain what happens in a message-based architecture. So for example with NServiceBus or maybe also other message-based systems like MassTransit, RhinoASB, whatever. You usually have an architecture like this. You have a queue which contains your messages, and then you have something what we call for example message pump. Well, if you don't have a reactive transport, which pushes messages towards your codes, then you have to poll the queue for messages, for example, natural storage queues, you need to poll, on Azure Service Bus, you have a more reactive style of communication. Azure Service Bus pushes the messages to your code, but this is a generic approach which works for both designs. So we have a message pump and then this message pump calls the chain of responsibility.
00:36:40 Daniel Marbach
And chain of responsibility is these lines here. Sorry. The lines here. And in this chain of responsibility, you can have multiple elements, for example, retry on failure. So if anything happens during the message handling, for example, you have a lock on your database, you might say, "Okay, this is a transient failure. Let us retry the execution of this code because potentially the next time or the third time it's going to be successful." So this could be a filter in this pipeline or a link which then does the retry.
00:37:16 Daniel Marbach
Then we pick something, some additional information, maybe from the queuing system. Then we deserialize the message payloads. And then we determine the code we need to execute. For example, we call that in NServiceBus message chandlers. Then we call this code and then this code of course contains the customer codes and is executed. So everything inside this chain of responsibility, including the customer code is completely wrapped in this chain of responsibility. So any exception that is happening inside that chain is going to ripple up and each individual element has the possibility to intercept the exception, do some logging, do retry latching and so on.
00:37:59 Daniel Marbach
So we can build, for example, a retrier, pretty simple in this chain of responsibility. So let me do that together with you, again. So how would we do retries? Well, since we are already in a synchronous context, we can for example say, "Let us do a retry and let's wait for maybe a second or so before we retry again." So I have a method which returns a task again, I call it retrier. And this method gets again a func of task and we call it next. And well, the simple thing we need is potentially a try catch, right? Because we're saying, if anything happens like an exception we want to retry. So what do we need more for doing retries? Let's say we want to try it three times and if we still see an exception, we go on a rethrow.
00:39:08 Speaker 3
Yeah. For example, so I was planning to do a really, really simple implementation, just a four-loop or something, right. Okay. I'm saying, okay. Lets hard-code to three times. And what I need to do is I need to do a try catch here. And then of course I need to call here the next delegate. So I'm wrapping the next delegate within await inside the try catch. The compiler-generated code will make sure that any exception that is happening is inside the compiler-generated code, wrapped and catched in exception dispatching and rethrown in this exact line so that we can wrap a try catch around it.
00:39:56 Speaker 3
That's all done by the infrastructure. We don't need to worry about that. So we have an exception. So what we can, for example do is, we can create an exception dispatch info ourself. Let's call it info because I want to capture the exceptions. So when an exception happens, what I'm going to say is exception dispatch info dot capture. And I capture the exception of what's happening here. So what this is going to do, it's going to basically freeze the stack trace for us. Because my idea is I don't want to modify the stack trace unnecessarily by the infrastructure. So I'm freezing the stack trace so that later I can rethrow it without influencing the stack trace. So, and then for example I assign it to this local.
00:40:49 Speaker 3
And what I'm doing is I'm saying since I want to retry, and at some point though, I want to rethrow but only if there wasn't an exception anymore. So what I'm doing is right after the await call, I'm basically nulling this again because I know that when I reach this line, no exception happens, right. And the next thing I need to do, because I don't want to retry unnecessarily, I'm going to break out of the loop a bit nasty code, but let's keep it simple. I'm breaking out of the loop so that I don't do unnecessary retrives. And at the end, what I'm saying is, well, if I still have an exception then I'm going to rethrow. And if not, I do nothing. So what I do is here, info the Elvis and then I do a throw. So the exception operator has the possibility to basically assign this.
00:41:55 Speaker 3
So I can assign this info local when I do a capture of DSF in capture and of course I need to remove this throw here. And then I have now a retrier function. So let's try this. So let me code again an evil method, but a bit more sophisticated evil method that async one. So what I'm doing is alert here an evil method, again, which returns to task, gets a func of task passed in and it just throws an exception. Well, since we want to see if our exception, our retrier actually works, let's do with static int. Static int counter equals zero. And then we just say, okay, if counter is less than two, let's throw and let's increment the counter briefly. And then of course now, because I modified the code flow, I need to here, because I'm not doing anything in here which is async, I need to return task dot completed task. So let's plug in this evil method somewhere in our call stack. Let's call it evil method async so that we don't get confused.
00:43:17 Speaker 3
So I'll plug it in here. So let's execute that code. Okay. Well, I forgot to introduce the retrier. So let's introduce the retire on the top level. So because the pattern is so extensible, I just add the retrier to the list and execute the thing again. So what we're seeing now is we see son, wife, husband, wife, husband and then it's done. And if we remove the if statement in the counter, let's do this. And execute it again. What we are going to see is an invalid operation again raised to the color. Okay.
00:44:13 Speaker 3
So, we now fully async chain of responsibility. We have a retrier, we have exception filters. So we basically have everything to build robust service buses or robust HTP, Web API things, or OWIN pipelines. We all understand that. So but usually what we have is, I mean, I did that everything with methods, right? With action delegates, with function delegates. And since we are coming usually from an object-oriented world, we might want something a bit more object-oriented. So I have here on the top, an example. So what we can do is we can declare our own link elements. So an implementation of link element would, for example, look like this. An interface called ILink element which accepts something like a message or whatever, which gets passed into that method. So that's the message which is coming from the queuing system. And then again, we just calling a func of task. And then we just need an infrastructure which manages these implementations of these linked element interfaces.
00:45:26 Speaker 3
And we can basically almost copy-paste the code we just wrote together on stage into what I call here now chain. And that this chain as you can see is just holding an innumerable of linked elements. And we have an invoke method which applies exactly the same codes, which we just wrote in the unit test here to a more object-oriented design. And we just pick the first link element call on that interface, the invoke method, pass the necessary transport message in, and then call the inner invoke. So what you've just seen here is that, the transfer message itself, that's the stage, right? The stage we're passing into this chain of responsibility. And that stage is really important because I mean, a message handling pipeline without the transport message wouldn't make sense, right? But in a more generic context like a Web API or an OWIN, we actually want more than just the raw HTTP call.
00:46:35 Speaker 3
We want, for example, access to headers. We also want that in a ServiceBus, for example. So what we can do is we can introduce what most frameworks out there call a context. A context is basically a generic container which holds all the necessary state. And I'm showing you a brief implementation of such a thing how it could look like. So here is a full implementation of such a chain of responsibility which also contains state. So this looks now the following. So we have these elements, these linked elements and they implement an interface called ILinkedElement. And this interface now gets an input context and an output context. So we have something that is going into the element and we have something that is going out of the element. So and what we say here is that this thing needs to inherit from an interface and abstract base class.
00:47:48 Speaker 3
Here I have just the base class, which I call context. And what we, for example, in NServiceBus do is, we have a dictionary of string of object. It's not the most efficient implementation in terms of allocations, but it allows us to basically in a really easy and simple way to compose anything our users need to float into the pipeline into that dictionary. And this is then flowing through the whole asynchronous context without us needing to use things like FRET static, which don't work with async/await or async locals or even per FRET scope or other weird scopes on the user containers. So that's a really simple approach. And here, for example, in this linked element what we do then is we create inherited things from the context. So for example, we have here an incoming physical context.
00:48:48 Speaker 3
So this incoming physical context will then contain the transport message and everything else, which is important for the context. So I'm free for showing you this implementation. We have now here property of type transport message, but users can then add all the necessary stuff to this incoming context. But the thing we have a little problem here in our pipeline, is because when we start to generically extend this chain of responsibility with linked elements, we have basically something the context which contains the state and all these elements are going to basically mutate that state, right? So that's well, not really a pure design, but we decided, well, we were going to use this, but still we need a way for the end-users to know in which part of the pipeline is the link element placed? Because when we're saying when a user says, "I want to access things on the message when the message is deserialized."
00:50:01 Speaker 3
Well, how does the user know where to put the element in? The user would need to have a lot of knowledge where he needs to put himself besides the frameworks. So what we decided to introduce is we're saying here on a high level, we have actually two phases. We call them stages. We're saying, okay, we have the physical phase. The physical phase is when the transport message comes from the wire or the HTP payloads. And then we have everything executed in that physical phase, which gets only access to the raw message information like headers and like the raw stream payload but not to deserialize stuff. And then we have the logical phase and the logical phase is when the messages deserialized. And like I said, we call these stages. And now we have a strongly typed way for the end-users to extend this pipeline and what we are going to do.
00:51:02 Speaker 3
For example, in NServiceBus, we detect based on reflection what type of context you have, either it's a physical context or a logical context. And then we're doing a topological sort of the linked elements we have in the pipeline. And we put the user automatically into the right stage of the pipeline without the users need to worry about the exact placement in the pipeline. But in order to do that, we need I think what we call stage connector. A stage connector bridges from one stage, the physical stage to the logical stage. So and we can do that pretty simple. I already showed it a little bit.
00:51:49 Speaker 3
From a generic sense just declare a linked of type T-in context to T-out context. So now a normal element, which is placed, for example, in a physical stage, we'll have T-in context of type physical context and T-out context of type physical context. In the logic state, you will have T-in context of type a logical context and the out context of type logical context and the connector then has physical context to logical context. And we can see this here in my code example.
00:52:32 Speaker 3
Hold on a second. So, as you can see here, the physical to logical connector now inherits from element connector, incoming, physical to incoming logical. Normally we would, for example, here call our chase and deserialize or whatever. Then we would deserialize the payloads and then we create an incoming logical context. And we pass in the logical message, so the deserialized payloads. And we pass in the parent context. And because this one is a dictionary of string options, we can essentially merge together all the states in the pipeline into that context. And then we have an inheritance hierarchy of context information, which then automatically floats through all the pipeline.
00:53:25 Speaker 3
And of course all the other elements they just need to say, okay, I'm a link element of incoming logical context or I'm a link element of incoming physical context. And then you will be automatically placed in the right part of the pipeline. The complexity of this thing becomes a bit apparent when you look how the behavior chain now becomes, because we now have a lot of generics and we don't have that many flexibilities like in F# we need to do a little trickery with reflection. So what we're going to do is we have here now an abstract notion of linked element, which I call here in this code example element instance.
00:54:09 Speaker 3
And what it does is it does a little bit reflection. It's creates a generic thing called stepping walker, which then contains the T-in context and T-out context. And this one automatically basically casts the step or the link element to the right generic type during runtime. This is one of the drawbacks of this pattern and the generics, but I think it's worth so that our users can have a better code flow.
00:54:42 Speaker 3
And of course, it also needs to cast the context and well, that's it. We now have a completely asynchronous pipeline, which our users can use await statements in there. We can call into persistence. We never blocked the code, but also we are strongly typed so that our users don't need to care that much about where they are getting placed. Yeah. And of course, but the answer is, bus pipeline for example, is even a bit more complex because depending for example, we do a number of retrives.
00:55:17 Speaker 3
And if we can't retrive for, let's say five times, what we're going to do is we're going to move the messaging to the error queue. So that that message doesn't block your other messages in the queue. And for that, we for example, need to fork off another pipeline or another chain of responsibility, which we call the four words or the error pipeline. And for that we have a thing what we call fork connector. And fork connectors from a generic perspective, they're not different than all the other elements, but the implementation internally is a bit different, but the code is completely the same. We invoke it with this generic step in walker, but this then has the possibility to say, "Okay, I'm bridging from from context to from context so the stage remains the same but then forking off into a sub-chain of responsibility."
00:56:12 Speaker 3
And for even more complex scenarios, we have what we call stage four connectors, which breach from from context to to context and fork off into another pipeline. So in the end, what we have in NServiceBus needed is not a chain of responsibility. It's actually a tree of responsibilities during runtime. And yeah, well that can of course lead to head explosions. But like I said, that's all abstracted in a pretty simple way from the end-users. And we take care, that's by using topological sorts and so on. Before we actually execute messages, everything is in the right place and it's going to work during runtime in the most efficient way possible. Little information, NServiceBus v6 implements the pattern I just showed you. That's the next major version that's coming out soon.
00:57:08 Speaker 3
It's async all the way. It uses the chain of responsibility pattern heavily. If you're curious, you can go to ./particularnet/nservicebus/pipeline customizing v6 and you'll see how this pattern is implemented. We call them behaviors, not link elements. And of course the code is also on GitHub. And the brief recap. I think the chain of responsibility pattern or some others called them also Russian dolls coming from JavaScript is a really flextensible pattern, ideally suited to build robust I/O bound pipelines and domains. The pattern is used as I showed in many open source projects and infrastructure things. And I'll say, know it, learn it, love it. I hope that's not copyrighted by the .NET Rocks guys, but maybe it is. So, yeah, my slides' links are available on github.com/danielmarbach/asyncdolls. You can have a look at it.
00:58:07 Speaker 3
I have also other implementations of a potential approach to reduce, for example, the nesting in the call stack. I call that partial dolce or partial chain of responsibility. If you're curious how that's implemented and a few other attempts to make it, for example, recursion free and so on. So if you want to deep dive into that stuff, feel free to have a look at the codes. And if you want to do a brief recap on async/await and how, for example, a message pump works. I haven't shown that. I implement in my webinar on goparticular.net/ndc16.async. I implement with TPL and async a complete message pump live in this webinar. You can just register on this link. And yeah, if you have any questions feel free to shoot me any questions. I will stay here until roughly 6:00 PM at the Particular booth. If you have questions afterwards just approach me or right now. Any questions?
00:59:08 Daniel Marbach
Sorry. Which one?
00:59:09 Daniel Marbach
This one? Any other questions?
00:59:24 Speaker 14
I have one.
00:59:24 Daniel Marbach
Why we move it out?
00:59:25 Speaker 14
Yeah.
00:59:26 Daniel Marbach
Okay. Well, so for example, if we have per end point what we call, it's basically a thing that consumes off a queue, we have one message pump which consumes messages. So if you have a thousand messages, let's say you consume five and then you've one message in there which is going to fail. And you retry and you retry. And let's say you have just concurrency set to one, right? You're going to retry this one indefinitely, right?
00:59:54 Daniel Marbach
So what we do is, instead of at some point we say, "Okay, this one has now failed a number of times." It doesn't really make sense to retry it again because it's more of a permanent failure. So we move it out of the queue so that the processing can continue. And all the other messages in the queue are continued. And then we basically raise an event to the operators of the system saying, "Hey, there is a message in there which you need to have a look at and decide for yourself what you want to do with that message."
01:00:28 Daniel Marbach
Yes you can. Yeah. Of course. Yeah.
01:00:30 Speaker 14
If you have it inside.
01:00:30 Daniel Marbach
I have it inside. Yes. Yeah. But that's an implementation detail. Yeah. Yeah. Any other questions? Yeah.
01:00:45 Speaker 15
How do you get the concurrency?
01:00:46 Daniel Marbach
How do you get the concurrency?
01:00:54 Daniel Marbach
In parallel? Okay. So what I like to say, it's concurrent. So it's not parallel because we are doing async/await. It can be parallel but it doesn't necessarily have to be. But what you can do is inside, for example, if you have a tree of responsibilities, each fork connector can for example, to decide to create multiple forks. And instead of awaiting each fork, you can basically spawn off all the forks and then do a task in all on these forks. And then you have concurrent execution of forks.
01:01:31 Daniel Marbach
But what for example, we also do... Let me briefly show you this here on this picture. So what the message pump does is it has a concurrency setting. For example, it can be 100, right? So what we do is we basically pick 100 messages and then we have 100 concurrent chain of responsibilities, with potential multiple forks inside that chain of responsibility running on one or multiple threats. Does that answer your question?
01:02:01 Speaker 15
Yeah. I think so.
01:02:02 Daniel Marbach
Cool. Any other questions? Oh, then thanks.