Enterprise Java with Spring Boot

Multi-Channel Integration Flows

Josh Long
Broadcom
Enterprise Java with Spring Boot

Lesson Description

The "Multi-Channel Integration Flows" Lesson is part of the full, Enterprise Java with Spring Boot course featured in this preview video. Here's what you'd learn in this lesson:

Josh adds MessageChannels to the application, serving as the logical pipe through which the messages flow. This allows the application to have multiple sources for the inbound flow. One source is the original file system flow. The other is a post request.

Preview
Close

Transcript from the "Multi-Channel Integration Flows" Lesson

[00:00:00]
>> Josh Long: The other thing is that in Integration world, I might want to reuse this flow. This flow is pretty silly, but it does do a thing. And I might want to make that available via multiple channels, right? So the way you do that is you create what's called a message channel.

[00:00:14]
A message channel, let's call this Inbound is a logical concept. It's the pipe through which messages flow, right, .direct. And I'm going to say that all requests going into this flow must originate in that channel. So message channel inbound. And I'm going to move all this file handling logic outside of that.

[00:00:36]
So now I'm going to have another integration flow. So this will be a file inbound flow. And the whole point of this flow is to do the file stuff that we just saw and then once it's converted it to a string, to send it into this second flow, right?

[00:00:50]
So here we go. We're gonna take that back over here. Okay. File. And then here's the directory. Put that there. Good. Directory. Good, okay? So now instead of looking at the inbound channel adapter, we're going to look at a channel itself and we're going to send the data after we've converted it into a string onward to this other flow by sending it into the channel, okay?

[00:01:24]
So file inbound adapter. And then we're going to send it off to the channel, okay, .channel(inbound). And we can actually inject the channel here as well. Message channel inbound. There you go. Okay, so this thing will look at the file system and. And then whenever it gets some data, it's gonna take the data, turn it into a string, and then take the string, turn it into a JSON object, and then send it off into a channel, another pipe.

[00:01:52]
And that pipe we're gonna listen to over here in this other flow downstream, right? And that pipe will take the purchase order and we're gonna print it out and we're gonna split it and aggregate it and so on. What does that mean for us? Well, it means I can also send, like let's imagine I also created a controller, right.

[00:02:07]
Response body class purchase order controller, right? I'm going to post some data. Right. Post or I can say void. There you go, purchase order. And the payload is gonna be request body multipart file. Okay. If I want to do file uploads, this is how you do it. So now I'm going to inject the channel, which is the thing I use to start the flow, right?

[00:02:38]
I put messages in this channel here. So once I've got the file, so I can do file.get bytes, get input stream, I guess, get bytes, that's probably fine, right? So var content. Let's just make sure this works. New string, okay? And I'm gonna say this.inbound.send Message Builder with payload content build, okay?

[00:03:10]
So unless I miss my mark, which is entirely possible, what you have here is a string that has the content of the file, which is JSON. I'm sending that as a message, wrapping it in this message envelope, and I'm putting that in the pipe and then that pipe is being delivered to the first integration flow, which will, you know, the second integration flow rather.

[00:03:29]
So I have two genesis, two origins, two things that will produce the message, right? Either the message comes from the embedded adapter looking at the file system and it gets put in the channel, or it comes from this rest controller that we just built and it gets put in the channel.

[00:03:42]
Either way, all roads lead to the purchase order integration flow, which is expecting as its input an object of type purchase order. So far, so good. So actually I need to do that conversion here. Maybe I should do that here, there. So the contract for this is now that I expect a JSON string and I'm gonna convert it to a purchase order.

[00:04:03]
Whether that JSON string comes from a file system or from a file upload, either way the result is the same. Okay, so let's try that. And did I do something smart enough to read data from the file to post? Yes, I did. Okay, so here we go. We're going to do this.

[00:04:26]
We're going to do HTTP form post, 8080 forward slash, what did I call it? Post. Yeah, and then. Boy, we're going to call it, it's called file. So file at Tilde Desktop. Purchase orders. Purchase orders. I just need 1001.json, 1001.json. And that should do it. So that got sent into here.

[00:05:03]
So if I clear this, the whole flow happened, so every fall, that. So we got a channel. I've used this channel to create a little bit of indirection between the two. The origin of the message and the processing of the message. There's two sources of the message. As long as I have the channel, I can send any message I want into the channel.

[00:05:27]
And so I have a controller, just a REST controller like we looked at yesterday. Could have been GraphQL, could have been GRPC, could have been whatever. And I'm doing a post. I'm using some spring negation to create a message. I put the message into the channel. Great. It goes off somewhere.

[00:05:42]
Or alternatively, I drop a new file in this folder and this folder looks at the new file and it turns it into a string and it puts it in this channel. Either way, messages go into the channel and they end up here in the integration flow, which has injected the inbound channel and is reading from it.

[00:05:59]
So as soon as a message enters that channel, it comes out the other side and this flow kicks into action. Yeah, so that's just basic integration, kind of 101. And from this I've been using file inbound adapters. But this could be Kafka, this could be Pulsar. It's the same.

[00:06:14]
It'll be almost the same, right? If you're using RabbitMQ, you do amqp.inboundadapter instead, right? It's a convention, basically. Okay, that's it. I know it's a lot, but any questions on this stuff?
>> Speaker 2: What happens if there's failures at certain points in the flow?
>> Josh Long: Yes, so we have, by default, there's a spring integration creates an error channel, right?

[00:06:39]
And you can listen for that. You can actually write an integration flow like this. Integration flow. Error integration flow. And you can qualify. I think it's literally just called error channel, message channel, error channel. And you can respond to it from that, basically. Actually you don't even need to do this.

[00:06:58]
Looks like. Yeah, you can just use the string name of the bean if you don't have it on hand. So that will work. But if you don't want to do the string name, this would work as well, right? And then you can process that. You can redeliver, you can do whatever you want.

[00:07:11]
That's a great question. There are also. These channels, they're the connective tissue of your enterprise integration flow. So you can actually have those channels backed by a database. You can actually have every message that enters a channel be written to a SQL table, for example. You can also add filters to those channels.

[00:07:29]
So, for example, if you're doing security, as we'll look at later, right, you might wanna like validate that the person who sent the request has the right to do so. So you can pack a JWT token, a JWT token in the body of the message and then unpack that in a filter in the channel and go, okay, let me call the OAUTH issuer to make sure this is valid, right?

[00:07:49]
And if it doesn't, it's not valid. I can refuse to process the request. Another question?
>> Speaker 3: Do you have any tips for how to test these channels?
>> Josh Long: Yeah, they're just beans. So you can just, you can do, you can inject them in any test code, but they're also, if you downcast them, like there's a bunch of different downcast able versions.

[00:08:07]
So message channel here, right. You can do, you can get the subscribable ones. And that if you do that, you can actually programmatically call, subscribe and affix a message handler after that and then assert whatever you want to assert in that handler. The other thing is there's a great library called Awaitility.

[00:08:29]
It's not part of Spring, it's just a really great third party library, awaitility. And it's just a set of tools for testing asynchronous stuff. I want to wait five seconds and then check something. This gives you very nice clean dislike to do that kind of stuff. And by the way, you mentioned another thing, you didn't exactly ask about this, but it does remind me your question about error handling and Spring integration.

[00:08:53]
By default, the dispatch from one component to another in the same integration flow is synchronous. You can put an executor in there and have them be all asynchronous, right? And it's very expected that you would do that. And by definition, when you talk to Kafka or whatever, it's out of your hands.

[00:09:09]
It's no longer synchronous anyway, right? So it's fine if the code like the handoff between these different steps is all synchronous. It's all happening at the same time, which means that they're all in the same thread. So if you have transactions, if you in your controller over here started a database transaction and then you needed that all to happen, that flow would actually get dispatched in the same thread, right?

[00:09:34]
And maybe that's what you want, maybe that's convenient, maybe it's not, you know. Any other questions? Do you see potential here? Okay, good, some of you at least, that's good. Not everybody's doing messaging. I would argue that you should be. I would argue that REST is actually a really bad way to build decoupled systems at scale, right?

[00:09:55]
The web is the existence proof of the contrary, obviously, right? But systems, everybody that's ever built a system at scale moves away from synchronous blocking RPC style interactions by default. They have to. This is why Google created GRPC this is why LinkedIn created Kafka. This is why Amazon has so many other things besides this, you know.

Learn Straight from the Experts Who Shape the Modern Web

  • In-depth Courses
  • Industry Leading Experts
  • Learning Paths
  • Live Interactive Workshops
Get Unlimited Access Now