twenty four merry days of Perl Feed

Out of Order Perl

AnyEvent - 2014-12-24

This article will cover asynchronous programming with the AnyEvent library and will show use cases for managing multiple asynchronous requests in a single application. In addition, I hope to introduce good techniques for using metrics to drive technology decisions!

Is an Asynchronous Solution right for me?

New software techniques and practices are always rearing their heads in the industry. While asynchronous functionality is not new to Perl, it is not widely used where it could be, and maybe over-used where it shouldn't be. When considering whether or not to adopt some new methodology on existing software, it's important to make sure you have clearly identified what problem you are trying to solve. It's fine if you want to play around with something new, but remember that every solution has a cost. The cost of asynchronous functionality is that it can be difficult to read and debug (callback soup anyone?) and you have to consider compatibility of your current web framework as well as your current code base.

Step 1: Identify the Problem

“70% of users are unable to download the TPS Report from the website because the website times out.”
“Web site latency is reported for 75% of users.”

Step 2: Identify the cause the clarify the problem (with numbers!)

To identify the cause, you can check logs, collect metrics (a lot of folks use statsd), or attempt to reproduce the problem in a staging environment.

“The TPS report database query takes 30 seconds to run”
“Requests to HTTP services are averaging 1 second because each request is made individually and is only made once the previous request is complete.”

Step 3: Propose a solution (based on data and facts)

Now that you've collected data and clarified your problem in terms of metrics, you can say "Asynchronous HTTP requests would reduce overall report loading time to only the amount of time it takes the longest request to return."

Implement a Solution based on your results... and use metrics to determine success There is a fully functioning sample application! The sample application from which the code snippets in this article come is located on github.

The sample application is a simple Mojolicious::Lite application the generates a “TPS” report. This report makes calls to 2 external services to collect data and then uses that data to make a complex call to the database. I'm just using simple timing metrics to show the results for each of the reports. You can run it yourself to see the results as it records them in the database. First Improvement: Make it Async!

As we ascertained from our problem statement exercise just now, kicking off HTTP requests at the same time would drastically improve our report's performance.

Here's how you currently use LWP to make HTTP requests:


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 

 

my $ua = LWP::UserAgent->new();
my $req = HTTP::Request->new( GET => $url );

# make the request, and wait until we get the results back
my $res = $ua->request($req);

my $content = $res->content;
return $content;

 

Here's how it looks in our sample application:


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 

 

while (my ($service_name, $url) = (each %{$self->urls})) {
    my $req = HTTP::Request->new( GET => $url );
    my $res = $self->_user_agent->request($req);
    my $content = $res->content;
    $http_data->{$service_name} = $content;
}
  
return $http_data;

 

Notice that these requests happen for EACH URL in our list for EACH row in the database results one after the other, waiting for the previous request to return before starting the next. If we have 10 URLs and each one takes 3 milliseconds, our total request takes 10 x 3 milliseconds or 30 milliseconds total. If one of them takes any more time than that, it will hold up processing of the others and makes our request take even longer.

There are a lot of asynchronous libraries in the CPAN for you to consider. I'm using AnyEvent because we're going to be talking to RabbitMQ later and AnyEvent::RabbitMQ is one of the better libraries for doing that. When considering what asynchronous library to use, you always need to think about your requirements and to determine if it's likely you'll be adding other asynchronous functionality. Swapping one out for another, however, isn't impossible as they all pretty much follow the same rules (they just call things by different names). What you'll learn here about how to use AnyEvent will also apply in concept to any other library you might want to use instead.

Here's what the asynchronous solution looks like in our sample application using AnyEvent::HTTP.


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 

 

my $cv = AnyEvent->condvar;
  
my $result;

$cv->begin(sub { shift->send($result) });

while (my ($service_name, $url) = (each %{$self->urls})) {
  $cv->begin;

  my $request;
  
  $request = http_request(
    GET => $url,
    timeout => 2, # seconds
    sub {
      my ($content, $headers) = @_;
      $result->{$service_name} = $content;

      undef $request;
      $cv->end;
    }
  );
}

$cv->end;

return $cv->recv;

 

Let's add a few more comments to try and make it a little clearer what's going on:


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 

 

# A condition variable. Basically a mechanism to tell if we're done
# processing all the requests or not.
my $cv = AnyEvent->condvar;

my $result;

# tell the condition variable that it's not "ready" until it sees the "end"
# call at the end of the code signifying we're done setting up all the
# requests. This is good practice to avoid accidentally completing
# while we're still setting up all the requests.
#
# Also pass the code block that will be executed at this point when the
# condition variable is "ready".
$cv->begin(sub { shift->send($result) });

# for each of our urls, start a request in parallel
while (my ($service_name, $url) = (each %{$self->urls})) {

# add another thing to the list of things that must be completed
  # before the condition variable is "ready". i.e. let the condition variable
  # know that we must wait until the HTTP request returns and the callback
  # calls a corresponding "end" on the condition variable before we're done.
$cv->begin;
  
# schedule a HTTP request to made asynchronously. Once it's done
  # and we've got the content, call the callback.
my $request;
  $request = http_request(
    GET => $url,
    timeout => 2, # seconds
    sub {
      my ($content, $headers) = @_;

# save the content for this request (i.e. what we downloaded) in our
      # result
$result->{$service_name} = $content;

# clean ourselves up
undef $request;

# and let the condition variable know that this scheduled thing
      # is done and once the last one is done it can fire.
$cv->end;
    }
  );
}

# signify that we're done with stage of setting up all the requests. The
# condition variable become "ready" until all the requests that called "begin"
# above have also completed and called a corresponding number of "end" calls
$cv->end;

# wait (i.e. block) until the condition variable is ready, i.e. until a
# corresponding number of "end" calls have been called for each "begin" call
return $cv->recv;

 

We'll get back to this code, explaining in more depth each section of code bit by bit once we've learned a little more about AnyEvent.

First, let's look at the advantage of using this significantly more complicated code. With an asynchronous solution, if each request takes 3 milliseconds, then we get our results back in 3 milliseconds. If one of those requests takes 1 second, then we get all of our results back in 1 second. The time it takes to get our results back takes only as long as the longest request!

What does it mean to be Asynchronous?

Asynchronous code is event-driven

An event queue manages code execution. If you've ever had to do any work with user interfaces, you should be familiar with the notion of an event-listener. An event in a user interface would be a button click. You would register a function to execute when the the button is pushed. In our example, the function (callback) we've registered with the event we're interested in is pushed to an event queue that fires after all the other code executes. The event loop is a queue of callback functions. When an asynchronous function executes, the callback function is pushed into the event queue.

Results are returned in the background while your application does other things

The rest of your application proceeds ahead using a placeholder for the result until it reaches a point where it can't go any further without the actual result (for example writing that result out to a data store).

The basic formula for Async: Event Loop + Listener + Callbacks

A listener registers your interest in a particular event. A callback is a function that runs when your request is in a ready state. No matter what Async library you use, all of them follow these rules.

Now we'll talk about more specifically what this looks like in the AnyEvent libraries.

AnyEvent 101

condvar

AnyEvent is just an abstraction layer on top of event loops. The condvar() method initializes the condition variable which represents a value that may or may not be available. The condition variable doesn't have a value until it is “ready”, meaning the request has completed. You use the condvar() method to interface with the event loop.

send

The send() method sets the conditional variable to ready. This will indicate that one event is complete. In the case of multiple requests, you can bind a callback to send() so that you can store the values retrieved each time the condvar is in a ready state.

recv

This returns a value when the condvar is in a ready state. Note that this is blocking because this is called at the point where your program cannot continue until it has the value it needs. I'll show why this is important later when we introduce RabbitMQ and have to manage multiple asynchronous requests.

begin and end

These methods are syntactic sugar for using a counter within an event loop. Begin increments the counter and end decrements it. The event loops runs until the counter is at 0. You can use these methods when you're managing multiple requests within an event loop and you want to make sure the event loop doesn't terminate until all of the requests you want have finished.

If you were to implement this yourself, you could create a loop where you create a condvar for each individual request. This syntactic sugar lets you declare just one <Ccondvar> and then specify when you want to set its ready state so recv can return the result.

AnyEvent::HTTP Code Walkthrough

Remember that code sample from earlier? Well now that we have a little better vocabulary, we can walk through it and understand what's happening in a little more detail, and hopefully understand how we came to write the code like that.

The first step is to initialize the condvar. Remember at this point, it has no actual value!


1: 
 

my $cv = AnyEvent->condvar;
 

Next, you need to determine what you want to return at the ready state and assign it. This is where async programming can get a little tricky (hence the title of this article)! You need to think backwards when you start thinking async. Think from the result and work your way back. Decide at what point your application absolutely needs this data and put your recv call there.


1: 
2: 

 

...all the other code...
return $cv->recv;

 

Now all we need to work out what code goes between these two statements. We need to figure out when to call the send event so we can capture our result. We're going to call it within a callback bound to a begin call. Each time we want to fetch data from a URL, we'll increment our event loop counter so that we don't exit out of the event loop before we've gotten a response for each of our URL requests. So, this means, in English: Once all the URLs are fetched, set the condvar to ready and return the result.


1: 
2: 
3: 
4: 

 

my $cv = AnyEvent->condvar;
$cv->begin(sub { shift->send($result) }); # <------------- we add this
...
return $cv->recv;

 

Because we've declared a begin, we need to declare an end outside of the loop to ensure that send gets called. If our loop has no data (in the while block) we would loop forever without this call to end here. This ensures the send we registered with our begin up above gets called.


1: 
2: 
3: 
4: 
5: 

 

my $cv = AnyEvent->condvar;
$cv->begin(sub { shift->send($result) });
...
$cv->end; # <------------- we add this
return $cv->recv;

 

Now that we've defined our start and end points, we can focus on the actual logic around retrieving the URLs. We're going to use the AnyEvent::HTTP library, in particular the http_request method. Calling http_request means that you want the request to be made whenever it's possible (versus a procedural call which would mean stop everything and do it right now). This registers an IO watcher and tells the event loop that we are interested in this IO event.

When the request finishes, we want to invoke the callback that we've bound to the function call.

This method doesn't actually do the request, it returns right away and tells the event loop to do the request whenever it can. When this chunk of code runs, the event loop is not running because this chunk of code has control.


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 

 

my $cv = AnyEvent->condvar;
my $result;
$cv->begin(sub { shift->send($result) });

while (my ($service_name, $url) = (each %{$self->urls})) {
  ...
  my $request; <----
  $request = http_request( <----
    GET =>
$url, <---- we add
    timeout =>
2, # seconds <---- all of
    sub { <---- this
      my ($content, $headers) = @_; <---- code
      $result->{$service_name} = $content; <----
      ...
    } <----
  ); <----
}

$cv->end;
return $cv->recv;

 

We need to unregister our interest in this event once we're done with it.


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 

 

my $cv = AnyEvent->condvar;
my $result;
$cv->begin(sub { shift->send($result) });

while (my ($service_name, $url) = (each %{$self->urls})) {
  ...
  my $request;
  $request = http_request(
    GET => $url,
    timeout => 2, # seconds
    sub {
      my ($content, $headers) = @_;
      $result->{$service_name} = $content;

      undef $request; # <------------- we add this
      ...
    }
  );
  ...
}

$cv->end;
return $cv->recv;

 

The inner begin and end calls define the individual requests that we want to track in the event loop. For each begin, the AnyEvent event loop will increment a counter. For each end the AnyEvent event loop decrements the counter. Once the counter reaches 0, AnyEvent can set the condvar to ready.


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 

 

my $cv = AnyEvent->condvar;
  
my $result;
   
$cv->begin(sub { shift->send($result) });

while (my ($service_name, $url) = (each %{$self->urls})) {
  $cv->begin; # <------------- we add this

  my $request;
  
  $request = http_request(
    GET => $url,
    timeout => 2, # seconds
    sub {
      my ($content, $headers) = @_;
      $result->{$service_name} = $content;
      
      undef $request;
      $cv->end; # <------------- and we add this
    }
  );
}

$cv->end;

return $cv->recv;

 

And that's it, we've got a working example!

When should I use Asynchronous Programming?

An asynchronous solution works best for problems that meet all of the following criteria:

  • You need to do many things that could take awhile

  • You don't care about the order you do those things in

Given that an asynchronous solution can be hard to read and potentially hard to maintain, you should use metrics to determine if this added complexity really buys you anything.

Next Improvement: Let's create a work queue!

Work queues are good for throttling the work your application has to do, for scheduling when this work needs to happen, and is a good solution for anything that could potentially take a long time thus causing a timeout experience for the end user. To create a work queue, you need a message queue where one part of your application can post messages to (work to be done) and where another part of your application or a separate worker process can access the messages and perform the work.

You can throttle the database load by determining how many messages your worker handles at one time. This could be as simple as a while loop that pauses in between processing. You could also spin up more workers if your message queue gets too long. If you need to do massive updates to lots of records such that you don't want to do them at a peak time when the database indexes are constantly being updated due to a large number of writes, you can schedule your worker processes to run at an off-peak time. This can be as simple as a perl script you write and then use a cron job to schedule.

In our example, let's say our users all hit the database the same time every week for these reports. These reports hit the database pretty hard which causes timeouts for other users and reports to fail to complete. Again, before we jump into adding yet more complexity to our technology stack, we need to consider if this solution is the best one for our problem.

Other solutions could be to move our single database instance into a cluster, or to pre-compute the data into predictable date chunks. If we have a table that has a lengthy history we could look at windowing or partitioning of the table to limit the total results the query has to run against. Let's say in this situation, we don't have the resources to make changes to the database. Apparently that's another department and there's a lengthy red tape process we need to follow in order to get anything done in that regard. While that points to greater systemic problems at our example company, this does often reflect reality and we still need to come up with a solution.

Enter RabbitMQ. RabbitMQ runs as a separate process which the various Perl processes communicate with via AMQP (Advanced Message Queuing Protocol) and is implemented in Erlang. RabbitMQ essentially receives and forwards messages. You could also use Redis and the Redis::Requeue library for the same thing, but RMQ offers us an asynchronous option.

What does Asynchronous Programming have to do with RabbitMQ?

Sockets are traditionally a blocking connection. The actual connection attempt blocks until the connection is available. So to connect to RabbitMQ, we would need to wait for the HTTP connection and we would need to be able to respond to error messages. AnyEvent is an asynchronous library and AnyEvent::RabbitMQ is the canonical Perl library for interacting with RabbitMQ.

The reason using an asynchronous connection with RabbitMQ is important is because RabbitMQ sends a heartbeat to determine if the connection is still active. Most web framework applications by their nature are blocking, code is only triggered when a specific request (usually via a URL route) is made.

Once the publisher has established a connection, RabbitMQ can create an exchange. This exchange handles all incoming updates and redistributes them to the available queues. Once the exchange is created, RabbitMQ can send messages.

If the web application does not respond to the RMQ heartbeat, RMQ will hang up on you thereby closing the connection. So unless you want to hang up and reconnect every time you want to send a message to RabbitMQ, you'll need to double check that your web framework has a way to respond to this heartbeat.

In the example code, I've chosen to use Mojolicious because Mojo::IO Loop handles multiple reactor backends.

Step 1: Replace the original “work” section with publishing a message to RabbitMQ

Some things to think about:

  • do you need to keep a record of the queue data?

  • do you need to recreate the queue if the messages are lost?

First we need to set up our RabbitMQ connection. This is in the main.pl file of the worker queue version of the sample code.


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 
11: 

 

my $cv = AnyEvent->condvar;

my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
    host => 'localhost',
    port => 5672,
    user => 'guest',
    pass => 'guest',
    vhost => '/',
    on_success => sub { ... },
    ...
);

 

This essentially connects to the message queue and then calls the on_success callback. Inside this on_success callback we then open the channel:


1: 
2: 
3: 
4: 
5: 
6: 
7: 

 

on_success => sub {
    my $ar = shift;
    $ar->open_channel(
        on_success => sub { ... }
        ...
    );
},

 

And inside the on_success callback of the open_channel we have the bit of code that actually puts stuff on the message queue:


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 

 

on_success => sub {
     my $channel = shift;

# use a named queue "reports"
$channel->declare_queue(
         queue => 'reports',
         auto_delete => 0,
     );

# publish (send) to the message queue
my %publish_args = (
         header => {
             content_type => 'application/json',
         },
         body => encode_json($params),
         routing_key => 'reports',
     );
     $channel->publish(%publish_args);

# and update the condition variable to say we're done with the
     # queue sending part
$cv->send("Added report request to queue");
  },
  ...

 

Step 2: Move the work to a worker process

Now we've got something requesting work, we need a worker on the other end of the message queue actually doing the work.

You can schedule your worker as a cron job which works with RabbitMQ because there isn't any overstepping if the previous job hasn't finished yet. You can also use AnyEvent::timer on a loop if you aren't a fan of cron jobs. There a quite a few options for running your worker process on a schedule, the sample application just uses a simple Perl script with the idea that it would be a cron job.

The worker will need to take a message, process it, then grab the next one. You can have as many of these running as you want depending on your work load. To determine your work load, log metrics and keep an eye on them. Don't make assumptions that aren't based on actual data!

We'll use the consume() method to get the next message in the message queue. Once we get the message, we get the delivery tag for communication with the queue later. The payload is the queue item content. In our example we have this encoded in JSON so we need to decode it to do anything with it. This code can be found in the report_worker.pl file.


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 

 

# connect to the message queue as in the previous example
$ar->open_channel(
    on_success => sub {
        my $channel = shift;
        ...

# consume (read) messages when they arrive
$channel->consume(
            on_consume => sub {
                  my ($message) = @_;
                  my $frame = $message->{deliver}->method_frame;
                  my $delivery_tag = $frame->delivery_tag;
                  my $params = decode_json($message->{body}->payload);
                              
# do work specified in %params...

# confirm that we've done the work and the message
                  # queue can drop the message
$channel->ack(delivery_tag => $delivery_tag);
                               
            }
        );
    },
);

 

The Gotcha

Here's what the work looks like in the do work section mentioned above. We create a ReportBuilder instance and tell it to build the report.


1: 
2: 
3: 
4: 

 

my $report_builder = Poo::ReportBuilder->new();
my $report = $report_builder->build_report($params);
my $report_json = encode_json($report_data);
my $report = Poo::Report->new(\%params)->save;

 

Now here's the gotcha, and this is an easy mistake to make when you're first dealing with asynchronous programming: Remember that our build_report function itself uses AnyEvent::HTTP to build the reports - to run more than one web request at a time - and we didn't write it in a way that plays well with the AnyEvent code we just wrote to deal with the message queue.

If you were to run this code as is, you would get the following error:

  AnyEvent::CondVar: recursive blocking wait attempted at ../lib/Poo/ReportBuilder.pm line 238

For reference, I've implemented this in the async version of the application in Github with a comment explaining to not do that. Use that code to play around with adding another asynchronous call (say by adding the RabbitMQ code) and seeing what happens.

Remember, we have to call recv at the point in our code when we absolutely need the result to continue: Because our program cannot continue until this method returns something, recv() is considered a blocking function, but we don't want to block in the middle of generating a report - we need to be going back to the event loop in order to respond to those heartbeats!

recv always needs to be at the top level. We need to use callbacks at the highest level, not the condvar. Anything that depends on any return value shouldn't depend on a variable, it should bind to a callback.

This is also why asynchronous programming can be very confusing and also very hard to maintain and debug. There are some techniques to improve this workflow, including Promises or Futures, but that's outside of the scope of this article. I do encourage you to research them though as they are reasonable solutions for dealing with callback soup.

Our original build_report method had this code:


1: 
2: 
3: 
4: 
5: 
6: 

 

sub build_report {
    ...
    my $http_data = $self->_get_data_from_urls( \%urls_list );

# do stuff with $http_data...
}

 

We need to change this to bind to a callback so a caller can manage their own condvar and their own recv call.


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 

 

sub build_report {
    ...
    $self->_get_data_from_urls(
      \%urls_list,
      sub {
# do stuff with $http_data...
...
      }
    );
}

 

We'll alter the _get_data_from_urls() method by removing the recv method and by using the cb method instead. Whenever it completes and something is sent to the condvar, the cb method of the condvar calls the specified function as soon as it completes. The caller is responsible for providing a callback.


1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 

 

sub _get_data_from_urls {
  my $urls = shift;
  my $callback = shift;

  my $cv = AnyEvent->condvar;
  
  my $result;

  $cv->begin(sub { shift->send($result) });

  while (my ($customerid, $urls) = (each %{$urls})) {
    for my $service_name (keys %{$urls}) {
      $cv->begin;
      my $request;

      $request = http_request(
        GET => $urls->{$service_name},
        timeout => 2, # seconds
        sub {
          my ($body, $hdr) = @_;
          
          $result->{$customerid}->{$service_name} = $body;
          $cv->end;
        }
      );
    }
  }
  
  $cv->end;
  
# this replaces "my $http_result = $cv->recv;" # <--
  # now the condition variable calls the callback when it # <-- new
  # reaches the ready state # <-- code
$cv->cb($callback); # <--
}

 

Sample Code Available

Again, I want to reiterate, I have sample code available for you to download and play with. Please experiment with this, try setting different values and using your debugger to step through the code to see when things get called and what values they produce. Asynchronous programming isn't something you really understand until you have to use it, and it's worth understanding.

See Also

Gravatar Image This article contributed by: Augustina Ragwitz <aragwitz@gmail.com>