Skip to content

Commit

Permalink
Add buildStream and tests (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
brinco80 authored and mweibel committed Mar 20, 2017
1 parent 565be06 commit fff36f5
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/facebook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ defmodule Facebook do
end
end


# Request access token and extract the access token from the access token
# response
defp getAccessToken(params) do
Expand Down
10 changes: 8 additions & 2 deletions lib/facebook/graph.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,19 @@ defmodule Facebook.Graph do
request(:get, url, options)
end

@doc """
HTTP generic request (GET, POST, etc) using a full URL and options
"""
@spec request(method, url, options) :: response
defp request(method, url, options) do
def request(method, url, options) do
request(method, url, <<>>, options)
end

@doc """
HTTP generic request (GET, POST, etc) using a full URL, payload and options
"""
@spec request(method, url, payload, options) :: response
defp request(method, url, payload, options) do
def request(method, url, payload, options) do
headers = []
Logger.debug fn ->
"[#{method}] #{url} #{inspect headers} #{inspect payload}"
Expand Down
89 changes: 89 additions & 0 deletions lib/facebook/stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
defmodule Facebook.Stream do
alias Facebook.Graph

@moduledoc """
Provides stream functionalities for the Facebook Graph API paginated responses
See: https://developers.facebook.com/docs/graph-api/using-graph-api/#reading
"""

defstruct [:current, :next, :max_retries]

@type retry :: pos_integer
@type error :: any

@doc """
Build a stream resource from a facebook paginated response with a custom
define error handler and a maximum of retries
The user defined error handler can be used to log errors, delay next retry,
raise an exception, etc.
The default error handler sleeps 1 second between retries.
## Examples
iex> stream = Facebook.pageFeed(:feed, "CocaColaMx", "<Your Token>", "id,name") |> Facebook.Stream.new
iex> stream |> Stream.filter(fn(name) -> name == "Coca Cola" end) |> Stream.take(100) |> Enum.to_list
# Custom error handler with linear backoff
iex> feed = Facebook.pageFeed(:feed, "CocaColaMx", "<Your Token>", 25, "id,name")
iex> stream = Facebook.Stream.new(feed, fn(error, retry) -> Process.sleep(retry*500) end)
iex> stream |> Stream.filter(fn(name) -> name == "Coca Cola" end) |> Stream.take(100) |> Enum.to_list
"""
@spec new(Map.t, ((error, retry) -> any), pos_integer) :: Enumerable.t
def new(paged_response,
error_handler \\ fn(_error, _retry) -> Process.sleep(1_000) end,
max_retries \\ 3) do
Stream.resource(
fn -> %__MODULE__{next: paged_response, current: :empty, max_retries: max_retries} end,
fn(feed) -> case nextPage(feed, error_handler) do
%__MODULE__{current: nil } -> {:halt, nil} # no more data
%__MODULE__{current: response} -> {getData(response), %{feed | current: response}} # normal pagination
{error, retries} -> # max retries reached
error_handler.(error, retries)
{:halt, error}
end
end,
fn (_) -> :ok
end
)
end

# Get next object using FB Graph API pagination
defp nextPage(%__MODULE__{current: :empty, next: next} = feed,
_error_handler) do
%{feed | current: next, next: :empty}
end

defp nextPage(%__MODULE__{current: current, max_retries: max_retries} = feed,
error_handler) do
Stream.cycle([1])
|> Stream.scan(0, &(&1+&2))
|> Enum.reduce_while(feed, fn i, acc ->
if i < max_retries do
case getNextPagedData(current) do
{:json, next_obj} -> {:halt, %{feed | current: next_obj}}
nil -> {:halt, %{feed | current: nil}}
{:error, reason} -> error_handler.(reason, i); {:cont, reason}
end
else
{:halt, {acc, i}}
end
end)
end

# Gets next data page
defp getNextPagedData(%{"paging" => %{"next" => next_url}}) do
Graph.request(:get, next_url, [])
end

defp getNextPagedData(%{"paging" => %{"cursors" => %{"next" => next_url}}}) do
Graph.request(:get, next_url, [])
end

defp getNextPagedData(_), do: nil

# Get data from FB Graph Object
defp getData(%{"data" => data}), do: data
defp getData(_), do: nil

end
13 changes: 13 additions & 0 deletions test/facebook_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,17 @@ defmodule FacebookTest do
assert(data["token_type"] == "bearer")
assert(data["expires"] > 0)
end

test "new stream", context do
%{access_token: access_token} = context

stream =
Facebook.pageFeed(:feed, @testPageId, access_token, 25)
|> Facebook.Stream.new

# get 150 posts
posts = stream |> Stream.take(150) |> Enum.to_list

assert(length(posts) == 150)
end
end

0 comments on commit fff36f5

Please sign in to comment.