Building a way to handle queries and searches in multiple servers while also being able to turn the servers on
In the context of microservices, an orchestrator is the brain of the system, which distributes the task between every microservice by some rule. When a transaction has to happen, it is the orchestrator’s task to determine the workflow for the microservices. In this article, we’ll build a simple orchestrator using Rust.
What Are We Building?
Here we have a series of microservices that can be in different computers (each with a different database, different data, and different technologies) that will allow the clients to query by a certain parameter and then return the respective queried information to the client.
Each microservices has n ports open to allow for queries. These ports will be explored using a round-robin scheduling algorithm to distribute the calls evenly amongst ports.
The goal is also to make the orchestrator as parametric as possible to allow the addition of more microservices if needed.
Building Process
First steps
We need to figure out how we can remotely turn on the microservices to enable the orchestrator to activate the ports if they are down. This capability is known as resilience and can be implemented as a separate program. We have two options in mind: SSH connection or creating an HTTP service that enables on-demand activation of the microservices. I will choose the latter option for easier configuration, especially on client operating systems like Windows. From now on, I will call this system the igniter because it sounds catchy, doesn’t it?
I will use Rust with the Rocket crate to create an HTTP service that will allow me to call the endpoint /<port> that will turn the microservice on in that particular port. Thus we’ll use a YAML file to configure the igniter, like this:
start_command: "START COMMAND WITH THE ARG OF PORT AT THE END"
path: "PATH TO THE APPLICATION"
We will need the following dependencies in the Cargo.toml (I’ll explain their usage later):
[dependencies]
serde = {version="1.0.163", features=["derive"]}
serde_yaml = "0.9.21"
rocket = "=0.5.0-rc.3"
lazy_static = "1.4.0"
Now that we have the dependencies, we’ll make a Config struct that will host all the configurations saved into the YAML file. Here’s what that looks like:
#[derive(Debug, Deserialize, Serialize)]
struct Config {
start_command: String,
path: String,
}
/** Reads YAML config file from a given path and returns the config as struct*/
fn read_config(path: &str) -> Config {
// Open the file
let mut file = File::open(path).expect("Failed to open file");
// Read file contents as a String
let mut contents = String::new();
file.read_to_string(&mut contents)
.expect("Failed to read file");
// Use serde_yaml to parse the String into a Config Struct
let config_result: Result<Config, serde_yaml::Error> = serde_yaml::from_str(&contents);
// If the config struct couldn't be parsed panic, else declare the config
let config: Config = match config_result {
Ok(config) => config,
Err(err) => panic!("Failed to parse YAML: {}", err),
};
// return the parsed config
config
}
// Declares a global variable for the config
lazy_static! {
static ref CONFIG: Config = read_config("config.yaml");
}
The code above does the following:
- Defines a Config struct with the fields of the configuration of the igniter.
- Make a function that, given the path of the configuration file, parses it using serde_yaml into our Config struct.
- Declares a global variable called CONFIG that hosts the configuration of the igniter.
After that, we must configure Rocket to allow our code to be a web service and also define the endpoint /<port>. I also changed the default port of the igniter to port 4444 using $Env:ROCKET_PORT=4444 in the terminal.
// This endpoint turns the application on in the port passed in the request
#[get("/<port>")]
fn index(port: i32) -> String {
// the function that instantiates the application in the port
run_server(port)
}
// Default Rocket launch
#[launch]
fn rocket() -> _ {
rocket::build().mount("/", routes![index])
}
The code above allows our code to be a Rust web service, but we need to code how to actually turn the application on.
/** Starts the server given a port, the command and path of a given application */
fn start_server( port: i32) -> Result<(), Box<dyn Error>> {
// creates a new cmd instance
let mut cmd = Command::new("cmd");
// tells the cmd to run the command in a shell and close the shell
// when done
cmd.arg("/C")
.arg(format!("cd {}", CONFIG.path)) // change directory to the specified path
.arg(format!("& {} {}", CONFIG.start_command, port)) // run start command with port
.stdout(Stdio::piped()) // capture the stdout of the program
.stderr(Stdio::piped()); // capture the stderr of the program
// Start the command and get a handle to its standard output and error streams
let mut child = cmd.spawn().expect("failed to start");
// Wait for 1 seg
thread::sleep(Duration::from_millis(1000));
// Try to wait for the child process to end
match child.try_wait() {
//If program exited means it probably failed.
Ok(Some(status)) => {
return Err(Box::new(IoError::new(
ErrorKind::Other,
format!("process exited with status code: {}", status),
)));
}
// If it has not ended then asume it starts
Ok(_) => {
println!("SERVER SUCCESFULLY STARTED AT PORT {}", port);
return Ok(());
}
// If any other error happens return it
Err(e) => {
return Err(Box::new(IoError::new(ErrorKind::Other, format!("{}", e))));
}
}
}
I know it’s a lot of code, but this might not be the cleanest implementation because it assumes that if the process doesn’t end, it’s running, but it works. Let’s describe the steps that the code has:
- Creates a cmd instance, goes to the application directory, and executes the start command.
- Spawns the cmd instance to actually run in the system.
- Waits for one second to allow for the process to start.
- If the process has not ended by then, it’s running (hopefully).
This code that can be seen in the GitHub repository allows for a working igniter that turns on the application on the requested port.
Building the orchestrator
It’s time to build the actual orchestrator. For this, we’ll need to read another YAML to allow for the configuration of the orchestrator, making it fully customizable. Our YAML will look like this:
servers:
- address: "localhost"
ports: [8001, 8002, 8003, 8004]
ignite_port: 4444
path: "/a/parts/search"
company: "A"
- address: "127.0.0.1"
ports: [8001, 8002, 8003, 8004]
ignite_port: 4444
path: "/a/parts/search"
company: "B"
Where:
- Servers are the applications that must run.
- Address is where the application is located.
- Ports are the ports that are being used to make the queries.
- The ignite port is the port in which the igniter is located.
- The path is where the query endpoint is in the microservice.
- And the company is the name of the microservice.
This YAML we’ll be loaded is similar to the igniter one after just changing the struct value.
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::prelude::*;
#[derive(Debug, Deserialize, Serialize)]
pub struct Server {
pub address: String,
pub ports: Vec<i32>,
pub ignite_port: i32,
pub path: String,
pub company: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
pub check_time: i32,
pub servers: Vec<Server>,
}
impl Config {
pub fn new(path: &str) -> Config {
let mut file = File::open(path).expect("Failed to open file");
let mut contents = String::new();
file.read_to_string(&mut contents)
.expect("Failed to read file");
let config_result: Result<Config, serde_yaml::Error> = serde_yaml::from_str(&contents);
let config: Config = match config_result {
Ok(config) => config,
Err(err) => panic!("Failed to parse YAML: {}", err),
};
config
}
}
Note: the read_config function is now inside the implementation of the Config struct which is a better practice than a function!
The crates used in the orchestrator are as follows:
[dependencies]
serde = {version="1.0.163", features=["derive"]}
serde_yaml = "0.9.21"
rocket = "=0.5.0-rc.3"
reqwest = "0.11.18"
serde_json = "1.0"
rocket_contrib = "0.4"
lazy_static = "1.4.0"
We’ll declare the global config variable, but we will also make a lookup table where we’ll have the address of the servers as keys and the ports with their numbers of calls made and initiate our rocket web service to later allow the clients to query our microservices:
lazy_static! {
static ref CONFIG: Config = Config::new("config.yaml");
}
/**
* Gives ports in the following format
*
* ports = {
* "address1": {
* 20: 0,
* 21: 0,
* 22: 0,
* },
* "address2": {
* 50: 0,
* 51: 0,
* 52: 0,
* },
* }
*/
fn get_ports() -> HashMap<String, HashMap<i32, i32>> {
// Read configuration to get the addresses and ports
let config = Config::new("config.yaml");
// Generates the hashmap in the format described above
let ports: HashMap<String, HashMap<i32, i32>> = config.servers.into_iter().map(|ele| {
let server_ports: HashMap<i32, i32> = ele.ports.into_iter().map(|port| (port, 0)).collect();
(ele.address, server_ports)
}).collect();
ports
}
#[launch]
fn rocket() -> _ {
rocket::build().mount("/", routes![index])
}
It’s necessary to have a way to interact with the other HTTP services of the microservices and also the igniter we made. Thus, we have the following function:
/** Makes an HTTP request in the pattern http://address:port/uri/.../q */
async fn make_request(
port: i32,
address: &str,
q: &str,
uri: &str,
) -> Result<Value, Box<dyn std::error::Error>> {
// Generates the url as http://address:port/uri/q
let request_url = format!("http://{}:{}{}/{}", address, port, uri, q);
// Time that will be considered as a timeout in the request
let timeout = Duration::new(1, 0);
// Creates a HTTP Client
let client = ClientBuilder::new().timeout(timeout).build()?;
// Gets the response of the HTTP call to the url
let response = client.get(&request_url).send().await?;
// Checks if response is successful
if response.status().is_success() {
// Gets response text
let body = response.text().await?;
// Parses response text into a serde_json json
let json: Value = serde_json::from_str(&body)?;
Ok(json)
} else {
Ok(Value::Null)
}
}
The code above provides a basic implementation for making an HTTP GET request to a specified address, port, URI, and query string. It handles timeouts, retrieves the response, and parses the response body as JSON if the response is successful.
We also want to clarify to the client where this information came from. We will do that with the following function:
async fn make_request_data(
port: i32,
address: &str,
q: &str,
uri: &str,
company: &str,
) -> content::RawJson<String> {
// Makes an http request
let res = make_request(port, address, q, uri).await;
// Makes the parsed json into a string to allow the addition of the source
let json_str = match res {
Ok(json) => json.to_string(),
Err(err) => {
eprintln!("Error: {}", err);
Value::Null.to_string()
}
};
// Returns a RawJson of the data and the source of this data
content::RawJson(String::from(format!(
"{{"company": "{}", "data": {} }}",
company, json_str
)))
}
With this, we combine the functionality of the make_request function with additional data (the source name) and returns a JSON string containing the data received from the HTTP request and the company name.
Implementing the round-robin scheduling algorithm
To implement the round-robin algorithm, we’ll use our ports HashMap and iterate over every server and then over every port to obtain the port which has been used the least.
fn find_smallest_port(server: &Server) -> Option<i32> {
// Declare the smallest port and smallest usage
let smallest_port;
let smallest_value;
// Get the ports hashmap
let mut data_lock = PORTS.lock().unwrap();
// Get the ports of this specific server
let ports_usage = data_lock.get(&server.address).unwrap();
// Iterate through the hashmap and an find the port with the least usage
smallest_port = ports_usage
.iter()
.min_by_key(|(_, used)| *used)
.map(|(port, _)| *port);
// If we got a port, add one to it's usage, because it will be used
if let Some(port) = smallest_port {
if let Some(value) = smallest_value {
*data_lock
.get_mut(&server.address)
.unwrap()
.get_mut(&port)
.unwrap() = value + 1;
}
}
// Return the smallest port used
smallest_port
}
Now, we take it all together and make a new GET endpoint in our orchestrator that will allow us to query our microservices using a string.
#[get("/<q>")]
async fn index(q: String) -> content::RawJson<String> {
// Creates a vector of the data recovered from the servers
let mut data_recovered = Vec::new();
// Iterates through the servers and recovers their data,
// querying the least used port
for server in &CONFIG.servers {
let port;
// Needed because async function
{
// Finds the smallest port
port = find_smallest_port(server)
.expect(format!("No port found for {}", &server.address).as_str());
}
// Request the data to the server in the least used port
let res = make_request_data(port, &server.address, &q, &server.path, &server.company).await;
// Retrives the json_string
let content::RawJson(json_string) = res;
// Check if the response indicates an error
let json_value: Value = serde_json::from_str(&json_string).unwrap_or_else(|_| Value::Null);
// If an error ocurred (couldn't connect to the server) ignite!
if *json_value.get("data").unwrap() == Value::Null {
// Pings the igniter to turn the application on on this port
let _ping = make_request_data(
server.ignite_port,
&server.address,
&format!("{}", port),
"",
&server.company,
)
.await;
// Calls the server again and appends the data to our responses
data_recovered.push(
make_request_data(port, &server.address, &q, &server.path, &server.company).await,
)
} else {
// If the call was successful just append it to our responses
data_recovered.push(content::RawJson(json_string));
}
}
// Parses and serializes all the data obtained from the server calls
let values: Vec<Value> = data_recovered
.into_iter()
.map(|content::RawJson(json_string)| serde_json::from_str(&json_string))
.collect::<Result<Vec<Value>, _>>()
.expect("Failed to parse JSON");
// Gets the list of json objects as a string
let json_string = serde_json::to_string(&values).expect("Failed to serialize JSON");
// Print the port usage
print_ports();
// Send the results as a the response
content::RawJson(String::from(format!("{{"responses": {}}}", json_string)))
}
Thus this code can handle incoming requests, iterate over multiple servers, retrieve data from each server, and construct a response containing all the collected data. It performs error handling for failed requests, handles server ignition if the port is offline, and provides a combined response of JSON data from multiple servers.
After all these steps, we have a complete orchestrator that handles a query and searches in multiple servers while also being capable of turning the servers on. We can test it out using postman, as shown below:
Conclusion
We made an orchestrator from scratch that can query multiple microservices. This orchestrator isn’t perfect, though. It can be improved upon to do more things, like the following:
- Handling persistent failure of a port.
- Using SSH when easily available in Server OS.
- Securing the igniter so it doesn’t allow random calls to the HTTP service.
- Improving the code structure and reducing code smells.
- Whatever your dreams and hopes are!
The code is available on my GitHub.
Making an Orchestrator From Scratch With Rust was originally published in Better Programming on Medium, where people are continuing the conversation by highlighting and responding to this story.