-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathApiClient.fs
39 lines (30 loc) · 1.56 KB
/
ApiClient.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/// Provides logic for crawling of the source dataset
/// (Wrapped in a PeriodicSource to manage continual refreshing and checkpointing when a traverse has completed)
module PeriodicIngesterTemplate.Ingester.ApiClient
open FSharp.Control
open System
open System.Collections.Generic
open System.Net.Http
open System.Threading
open PeriodicIngesterTemplate.Domain
[<NoComparison; NoEquality>]
type TicketsDto = { tickets: TicketDto[] }
and TicketDto = { id: TicketId; lastUpdated: DateTimeOffset; body: string; }
type TicketsClient(client: HttpClient) =
let basePath = "api/tickets"
member _.Crawl(ct: CancellationToken): IAsyncEnumerable<struct (TimeSpan * Propulsion.Feed.SourceItem<Propulsion.Sinks.EventBody>[])> = taskSeq {
let request = HttpReq.get () |> HttpReq.withPath basePath
let ts = System.Diagnostics.Stopwatch.StartNew()
let! response = client.Send2(request, ct)
let! basePage = response |> HttpRes.deserializeOkStj<TicketsDto>
yield struct (ts.Elapsed,
[| for t in basePage.tickets ->
let data: Ingester.TicketData = { lastUpdated = t.lastUpdated; body = t.body }
Ingester.PipelineEvent.sourceItemOfTicketIdAndData (t.id, data) |])
}
type TicketsFeed(baseUri) =
let client = new HttpClient(BaseAddress = baseUri)
let tickets = TicketsClient(client)
// TODO add retries - consumer loop will abort if this throws
member _.Crawl(_trancheId): IAsyncEnumerable<struct (TimeSpan * Propulsion.Feed.SourceItem<_> array)> =
tickets.Crawl(CancellationToken.None)