Skip to content

Commit

Permalink
Merge pull request #26 from DigitalNZ/rm/sj-elastic-search-scroll-har…
Browse files Browse the repository at this point in the history
…vest

SJ ELASTIC SEARCH SCROLL HARVEST: As Emmanuel I want a SJ harvest method to work with ElasticSearch Scrolling, so that I can harvest from Te Papa's new API
  • Loading branch information
richardmatthewsdev authored Dec 19, 2018
2 parents 9b35a04 + e6c9c77 commit 1133bd4
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 8 deletions.
7 changes: 6 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ GEM
ast (2.4.0)
bson (4.3.0)
builder (3.2.3)
byebug (10.0.2)
chronic (0.10.2)
coderay (1.1.2)
concurrent-ruby (1.0.5)
Expand Down Expand Up @@ -106,6 +107,9 @@ GEM
pry (0.11.3)
coderay (~> 1.1.0)
method_source (~> 0.9.0)
pry-byebug (3.6.0)
byebug (~> 10.0)
pry (~> 0.10)
public_suffix (3.0.2)
rack (2.0.4)
rack-test (0.8.3)
Expand Down Expand Up @@ -169,6 +173,7 @@ DEPENDENCIES
mock_redis
oai (~> 0.3.1)
pry
pry-byebug
rake (< 11.0)
rspec (~> 2.11.0)
rubocop
Expand All @@ -177,4 +182,4 @@ DEPENDENCIES
webmock (~> 1.8)

BUNDLED WITH
1.16.0
1.16.1
10 changes: 8 additions & 2 deletions lib/supplejack_common/json/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ def record_selector(path)
end

def document(url)
if url =~ /^https?/
if url.include?('scroll')
self._document = SupplejackCommon::Request.scroll(url, _request_timeout, _throttle, _http_headers)
_document
elsif url =~ /^https?/
self._document = SupplejackCommon::Request.get(url, _request_timeout, _throttle, _http_headers)
_document
elsif url =~ /^file/
Expand All @@ -34,7 +37,10 @@ def total_results(total_selector)
end

def records_json(url)
records = JsonPath.on(document(url), _record_selector).try(:first)
new_document = document(url)
return [] if new_document.code == 204

records = JsonPath.on(new_document, _record_selector).try(:first)
records = [records] if records.is_a? Hash
records
end
Expand Down
20 changes: 19 additions & 1 deletion lib/supplejack_common/paginated_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def each(&block)

return nil unless yield_from_records(&block)

next unless paginated?
next unless paginated? || scroll?

while more_results?
@records.clear
@records = klass.fetch_records(next_url(base_url))
Expand All @@ -50,6 +51,8 @@ def initial_url(url, joiner)
url
end

# rubocop:disable Metrics/CyclomaticComplexity
# rubocop:disable Metrics/PerceivedComplexity
def next_url(url)
if paginated?
joiner = url =~ /\?/ ? '&' : '?'
Expand All @@ -63,10 +66,19 @@ def next_url(url)
increment_page_counter!
result
end
elsif scroll?
if klass._document.present?
base_url = url.match('(?<base_url>.+\/collection)')[:base_url]
base_url + klass._document.headers[:location]
else
url
end
else
url
end
end
# rubocop:enable Metrics/CyclomaticComplexity
# rubocop:enable Metrics/PerceivedComplexity

def url_options
options = {}
Expand Down Expand Up @@ -100,6 +112,8 @@ def increment_page_counter!
end

def more_results?
return klass._document.code == 303 if scroll?

if tokenised?
return klass.next_page_token(@next_page_token_location).present?
end
Expand All @@ -114,6 +128,10 @@ def tokenised?
@type == 'token'
end

def scroll?
@type == 'scroll'
end

def yield_from_records
while record = @records.shift
record.set_attribute_values
Expand Down
21 changes: 19 additions & 2 deletions lib/supplejack_common/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ class << self
def get(url, request_timeout, options = [], headers = {})
new(url, request_timeout, options, headers).get
end

def scroll(url, request_timeout, options = [], headers = {})
new(url, request_timeout, options, headers).scroll
end
end

attr_accessor :url, :throttling_options, :request_timeout, :headers

def initialize(url, request_timeout, options = [], headers = {})
# Prevents from escaping escaped URL
# Sifter #6439
@url = URI.escape(URI.unescape(url))

options ||= []
Expand Down Expand Up @@ -50,6 +52,21 @@ def get
end
end

def scroll
acquire_lock do
http_verb = if url.include? '_scroll'
# The Te Papa scroll API requires you to do a post for the first request
:post
else
:get
end

RestClient::Request.execute(method: http_verb, url: url, timeout: request_timeout, headers: headers, max_redirects: 0) do |response, _request, _result|
response
end
end
end

def acquire_lock
loop do
if SupplejackCommon.redis.setnx(redis_lock_key, 0)
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'webmock/rspec'
require 'simplecov'
require 'loofah'
require 'pry-byebug'

SimpleCov.start

Expand Down
24 changes: 22 additions & 2 deletions spec/supplejack_common/json/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
end

describe '.records_json' do
let(:json_example_1) { '{"items": [{"title": "Record1"},{"title": "Record2"},{"title": "Record3"}]}' }
let(:json_example_2) { '{"items": {"title": "Record1"}}' }
let(:json_example_1) { RestClient::Response.create('{"items": [{"title": "Record1"}, {"title": "Record2"}, {"title": "Record3"}]}', double.as_null_object, double.as_null_object) }
let(:json_example_2) { RestClient::Response.create('{"items": {"title": "Record1"}}', double.as_null_object, double.as_null_object) }

it 'returns an array of records with the parsed json' do
klass.stub(:document) { json_example_1 }
Expand Down Expand Up @@ -74,6 +74,26 @@
klass.document('file:///data/sites/data.json').should eq json
end
end

context 'scroll api' do
it 'stores the raw json from _scroll' do
klass._throttle = {}
klass.http_headers('x-api-key': 'key')
klass._request_timeout = 60_000
SupplejackCommon::Request.should_receive(:scroll).with('http://google.com/_scroll', 60_000, {}, 'x-api-key': 'key') { json }
klass.document('http://google.com/_scroll')
expect(klass._document).to eq json
end

it 'stores the raw json from /scroll' do
klass._throttle = {}
klass.http_headers('x-api-key': 'key')
klass._request_timeout = 60_000
SupplejackCommon::Request.should_receive(:scroll).with('http://google.com/scroll', 60_000, {}, 'x-api-key': 'key') { json }
klass.document('http://google.com/scroll')
expect(klass._document).to eq json
end
end
end

describe '.total_results' do
Expand Down
25 changes: 25 additions & 0 deletions spec/supplejack_common/paginated_collection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,31 @@
end
end

context 'scroll API' do
let(:params) { { type: 'scroll' } }
let(:collection) { klass.new(SupplejackCommon::Base, params) }

context 'when the _document is present' do
before do
SupplejackCommon::Base.stub(:_document) { OpenStruct.new(headers: OpenStruct.new(location: '/scroll/scroll_token/pages')) }
end

it 'generates the next url based on the header :location in the response' do
expect(collection.send(:next_url, 'http://google/collection/_scroll')).to eq 'http://google/collection/scroll/scroll_token/pages'
end
end

context 'when the _document is not present' do
before do
SupplejackCommon::Base.stub(:_document) { nil }
end

it 'uses the url that it was instantiated with' do
expect(collection.send(:next_url, 'http://google/collection/_scroll')).to eq 'http://google/collection/_scroll'
end
end
end

context 'with initial parameter' do
let(:params) do
{ page_parameter: 'page-parameter',
Expand Down
26 changes: 26 additions & 0 deletions spec/supplejack_common/request_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,32 @@
end
end

describe '#scroll' do
let(:initial_request) { klass.new('http://google.com/collection/_scroll', 10_000, {}, 'x-api-key' => 'key') }
let(:subsequent_request) { klass.new('http://google.com/collection/scroll', 10_000, {}, 'x-api-key' => 'key') }

it 'should aquire the lock' do
request.should_receive(:acquire_lock)
request.scroll
end

it 'should do a POST request initially and NOT follow redirects' do
RestClient::Request.should_receive(:execute).with(method: :post,
url: 'http://google.com/collection/_scroll',
timeout: 10_000,
headers: { 'x-api-key' => 'key' }, max_redirects: 0)
initial_request.scroll
end

it 'should follow up with subsequent GET requests and NOT follow redirects' do
RestClient::Request.should_receive(:execute).with(method: :get,
url: 'http://google.com/collection/scroll',
timeout: 10_000,
headers: { 'x-api-key' => 'key' }, max_redirects: 0)
subsequent_request.scroll
end
end

describe '#acquire_lock' do
let(:mock_redis) { double(:redis).as_null_object }

Expand Down
1 change: 1 addition & 0 deletions supplejack_common.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency 'mimemagic'
gem.add_runtime_dependency 'mongoid'
gem.add_runtime_dependency 'nokogiri'
gem.add_development_dependency 'pry-byebug'
gem.add_runtime_dependency 'redis'
gem.add_runtime_dependency 'rest-client'
gem.add_runtime_dependency 'retriable'
Expand Down

0 comments on commit 1133bd4

Please sign in to comment.