Async, Cached Twitter API Proxy in F#

October 3rd, 2011 by mythz Leave a reply »

Following on from showing how easy it was to create web services in F# with Service Stack, it’s now a good time to step it up a bit and do something useful to showcase some F#’s async goodness!

The final solution is available as a stand-alone HttpListener or an ASP.NET host which I just happen to have deployed earlier on a Linux server at http://www.servicestack.net/ftweetstack/. Here’s a couple example urls to see the stats of all my friends and followers:

http://www.servicestack.net/ftweetstack/followers/demisbellot
http://www.servicestack.net/ftweetstack/friends/demisbellot

Edit Screenshots added:

As I’m now getting rate limited by twitter, the live services have stopped working so I’ve included screenshots of the web service responses, reminiscent of happier times :)

[format=html]

[format=csv]

[format=json]

Back to the code editor…

The best way to showoff some Async code is to do some IO, and the most interesting IO people do these days is against 3rd Party APIs, and the API nearly everyone is familiar with is Twitter’s, so let’s write some code that calls Twitter’s API!

Stuck with the old

We’ll first start with something simple – a basic HTTP request. Unfortunately F# doesn’t come with hot new networking libs to make life easy for us. Nope we’re stuck with the same Async APIs .NET was born with – and .NETs Asynchronous Programming Model (APM) of using the BeginXXX/EndXXX pattern could very well be the worst async model in use today. But what F# doesn’t fix with new cake, it fixes with sweeteners providing us some nice sugar coating we can use over the top to make these APIs more palatable.

Async.FromBeginEnd(,) takes the BeginXXX,EndXXX async method pair and returns an Asynchronous computation that wraps the Async task for use in F#’s built-in Asynchronous workflows.

Doing something new with it

Taking advantage of F#’s ability to extend existing types, we can add this helper method to the familiar WebRequest type with:

	type System.Net.WebRequest with
	    member x.GetResponseAsync() =
	    	Async.FromBeginEnd(x.BeginGetResponse, x.EndGetResponse)

With this in-place we can build a high level asyncHttp API that downloads the text contents of a http resource asynchronously.

	let asyncHttp (url:string) =
	    async {
	        printfn "downloading: %s" url
	        let req = System.Net.WebRequest.Create(url)
	        let! rsp = req.GetResponseAsync()
	        printfn "processing response..."
	        use stream = rsp.GetResponseStream()
	        use reader = new System.IO.StreamReader(stream)
	        return reader.ReadToEnd() }

What is this async { … } ?

What’s different with this function is it doesn’t actually return a string as the implementation might suggest, it actually returns an Async which is basically says “I wrap an asynchronous computation that returns a string”. Async workflows in F# are surrounded by an async { … } block, inside this block you will occaisionally see a let! statement. The let! statement above which happens to be against the async helper we created earlier is used when calling Async APIs which performs the async task without blocking the current execution, when the task completes it resumes, and executes the remaining logic in the async block.

In code terms this looks like:

	let fetchMe = asyncHttp "http://api.twitter.com/1/users/lookup.json?screen_name=demisbellot"
	printfn "doing other stuff... la, di, da..."
	let json = fetchMe |> Async.RunSynchronously

Which when run in fsi returns:

	doing other stuff... la, di, da...
	downloading: http://api.twitter.com/1/users/lookup.json?screen_name=demisbellot
	processing response...
	val fetchMe : Async<string>
	val json : string =
	  "[{"id_str":"17575623","profile_link_color":"43594A","follower"+[1330 chars]

As illustrated invoking the asyncHttp method doesn’t execute it immediately, instead it returns an async workflow that can be called upon later, in this way it works like the deferred exeuction of yield iterators in C# where the iterator itself is only executed when it’s enumerated, i.e. someone calls .ToList() or in this case Async.RunSynchronously which runs the async task and awaits the results.

Great, we now know how to make an async HTTP request so let’s create a service out of it. To help with error handling I’ve added a wrapper over the raw asyncHttp call to detect and log errors of any fail whales caught and return an empty JSON array muting Exceptions. I’ve also added a join method to join any lists we need to and a short jsonTo alias to make using ServiceStack’s fast Json Serializer easier on the eyes :)

	let asyncJson url =
	    async {
	        try
	            return! asyncHttp url
	        with
	            | ex -> printfn "Error downloading: %s => %s" url ex.Message; return "[]" }

	let joinWith delim seq =
	    let sb = new System.Text.StringBuilder()
	    seq |> Seq.iter (fun x -> sb.Append((if sb.Length > 0 then delim else "") + x.ToString()) |> ignore)
	    sb.ToString()

	let join seq = joinWith "," seq

	let jsonTo<'a> json = ServiceStack.Text.JsonSerializer.DeserializeFromString<'a>(json)

We also need a model to dehydrate twitter’s response. I’m only interested in Users statistics for this example which I can easily map into an F# Record Type which makes definiing POCO’s effortless.

And as I’m always striving for DRY, readable code, I’ll wrap the async calls into more readable methods. Immediately you can start to see the benefits of F# async workflows as we’re able to compose higher level async APIs together without infecting the calling code, a common problem with many Async APIs.

    type UserStat = {
        mutable id: uint64;
        mutable screen_name: string;
        mutable name: string;
        mutable friends_count: int;
        mutable followers_count: int;
        mutable listed_count: int;
        mutable statuses_count: int }

    let asyncUsers screenNames = asyncJson("http://api.twitter.com/1/users/lookup.json?screen_name=" + (screenNames |> join))

    let usersByNames screenNames = asyncUsers screenNames |> Async.RunSynchronously |> jsonTo<UserStat[]> |> Seq.toList

With the above helpers in place all we need to do is to create the service, which is easy as creating the UserStats Request DTO so we know what to expect then and the actual implementation which just passes the list of names into our above usersByNames function, returning the results as-is:

    type UserStats = { mutable ScreenNames: string; }
    type UserStatsService() =
        interface IService<UserStats> with
            member this.Execute (req:UserStats) = req.ScreenNames.Split(',') |> usersByNames :> Object

After registering the appropriate Route in our AppHost we can now call our service passing in multiple twitter user names:

http://localhost:8080/users/demisbellot,servicestack,dsyme

More Async

Although we’ve created a web service that calls Twitter’s REST API asynchronously, we’re synchronously waiting for it straight after so it’s not doing us much good. The benefits of Async begin to show itself when you’re performing multiple IO calls, which is what will be needed in order to gather the stats of your friends or followers using twitter’s APIs.

Twitter only provides an API to return your followers or friends as a list of user ids, to get the user info, another API call is needed to lookup.json which at most only accepts 100 user ids at a time. Knowing this, the quickest way to fetch the user info for your friends of followers is to send the requests for userinfo asynchronously, in parallel.

The actual API calls given they’re just JSON urls are the easiest to do, just 1 line each. I’ll further wrap them in higher level followerIds and friendIds functions which deserializes the json ids in the response into a list of longs:

    let asyncFollowerIds screenName = asyncJson("http://api.twitter.com/1/followers/ids.json?screen_name=" + screenName)
    let asyncFriendIds screenName = asyncJson("http://api.twitter.com/1/friends/ids.json?screen_name=" + screenName)
    let asyncUserIds userIds = asyncJson("http://api.twitter.com/1/users/lookup.json?user_id=" + (userIds |> join))

    let followerIds screenName = asyncFollowerIds screenName |> Async.RunSynchronously |> jsonTo<uint64[]> |> Array.toList
    let friendIds screenName = asyncFriendIds screenName   |> Async.RunSynchronously |> jsonTo<uint64[]> |> Array.toList

The first piece of functionality required is batching an unbounded list of ids into manageable 100-size chunks. My first attempt at this was short and Linq-y:

	let batchesOf size (sequence: _ seq) : _ list seq =
	    seq {
	        let s = ref sequence
	        while not (!s |> Seq.isEmpty)  do
	            yield !s |> Seq.truncate size |> List.ofSeq
	            s := System.Linq.Enumerable.Skip(!s, size)
	    }

But was later discovered to be fairly slow, given the repetitive call to Skip. Since I’m particularly sensitive to perf in my library functions, I opted to go with a more verbose version, but one that only enumerates the sequence once:

	let batchesOf size (s: seq<'v>) =
	    seq {
	        let en = s.GetEnumerator()
	        let more = ref true
	        while !more do
	        let group = [
	            let i = ref 0
	            while !i < size && en.MoveNext () do
	                yield en.Current
	                i := !i + 1 ]
	        if List.isEmpty group then
	            more := false
	        else
	            yield group }

As third party IO calls are amongst the most expensive things you can do, having a cache is a good idea to reduce unnecessary IO calls. We’ll need a threadsafe collection here since we’ll be reading and writting to it at runtime, and .NET 4′s Generic ConcurrentDictionary handles the task nicely and made easily available to F#:

    let userCache = System.Collections.Concurrent.ConcurrentDictionary<uint64,UserStat>()

With the rest of what we need in place we can focus on the most complicated piece to asynchronously download in parallel the the user info for an unlimited list of user ids in batches of 100. In addition, the function should make use of the userCache only fetching the missing entries before merging the new and cached UserStat’s. Surprisingly this entire behaviour can be achieved in just a few lines of F# code:

    let usersByIds userIds =
		let cachedIds, missingIds = userIds |> List.partition userCache.ContainsKey
		missingIds
		|> batchesOf 100
		|> Seq.map asyncUserIds
		|> Async.Parallel |> Async.RunSynchronously
		|> Seq.map jsonTo<UserStat[]>
		|> Seq.collect (fun xs -> xs |> Seq.map
			(fun x -> userCache.TryAdd(x.id, x) |> ignore; x)) |> Seq.toList
		|> List.append (cachedIds |> List.map (fun x -> userCache.TryGetValue x |> snd))

As there’s a bit going on here I’ll explain what’s happening for each line:

#2: Split the entire list of user ids into the ids we have cached and the ones that are missing.
#3: Group the ids in batches of 100
#4: Create separate async API calls for each of those batches
#5: Call each of the API calls in Parallel and wait till they’re all completed
#6: Deserialize the response into lists of UserStat[] types
#7: Merge the UserStat’s together adding each entry into the cache
#8: Append the resulting entries with the previously cached UserStats

Neat! the hard part’s over now it’s just a matter of exposing them in web services and then we’re done:

    let followers screenName = followerIds screenName |> usersByIds
    let friends screenName = friendIds screenName |> Seq.toList |> usersByIds

    type FollowerStats = { mutable ScreenName: string; }
    type FollowerStatsService() =
        interface IService<FollowerStats> with
            member this.Execute (req:FollowerStats) = followers req.ScreenName :> Object

    type FriendsStats = { mutable ScreenName: string; }
    type FriendsStatsService() =
        interface IService<FriendsStats> with
            member this.Execute (req:FriendsStats) = friends req.ScreenName :> Object

After adding the routes for each service we can now see the stats for all my friends and followers at these two friendly urls:

http://localhost:8080/followers/demisbellot
http://localhost:8080/friends/demisbellot

We can test the above urls live using the deployed web service on servicestack.net:

http://www.servicestack.net/ftweetstack/followers/demisbellot
http://www.servicestack.net/ftweetstack/friends/demisbellot

Because we’re using ServiceStack, viewing these urls in a web browser (or any REST client with Accept: text/html) returns the data in a human readable HTML Report Format. Naturally, the usual XML and JSON formats are available and likewise the CSV format which will load the response data into your preferred spreadsheet program or database. All the different formats are available by appending the format to the url e.g: ?format=csv or by specifying it in the Accept HTTP Header.

http://www.servicestack.net/ftweetstack/followers/demisbellot?format=csv
http://www.servicestack.net/ftweetstack/friends/demisbellot?format=json

Should you feel like angering the REST gods, you can commit heresy by accessing twitters REST-ful API over the SOAP endpoints and WSDLS at /soap11 and /soap12 urls :)

Download in ASP.NET or HttpListener modes

The complete source code for the above is available in different hosting flavours which can be compiled and run on all Windows/OSX/Linux platforms:

Check the previous post for more information on creating ServiceStack Web Services with F#.

Advertisement

Comments are closed.