Skip to content

Commit

Permalink
feat: added the archive/json format for the extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-mesnilgrente committed Jun 19, 2024
1 parent e3bf47d commit 5b90c01
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 7 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ gem 'rqrcode'
gem 'faraday', '~> 2.7'
gem 'faraday-follow_redirects'
gem 'jsonpath'
gem 'minitar'
gem 'nokogiri'
gem 'sidekiq'
gem 'zlib'

# transformation related
gem 'webmock'
Expand Down
4 changes: 4 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ GEM
mime-types-data (~> 3.2015)
mime-types-data (3.2023.0218.1)
mini_mime (1.1.5)
minitar (0.9)
minitest (5.19.0)
multi_json (1.15.0)
mysql2 (0.5.5)
Expand Down Expand Up @@ -427,6 +428,7 @@ GEM
nokogiri (~> 1.8)
yard (0.9.34)
zeitwerk (2.6.11)
zlib (3.1.1)

PLATFORMS
aarch64-linux-musl
Expand All @@ -452,6 +454,7 @@ DEPENDENCIES
jsonpath
kaminari
letter_opener
minitar
mysql2 (~> 0.5)
nokogiri
pry-byebug
Expand All @@ -475,6 +478,7 @@ DEPENDENCIES
webdrivers
webmock
yard
zlib

RUBY VERSION
ruby 3.2.2p53
Expand Down
2 changes: 1 addition & 1 deletion app/models/extraction_definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Used to store the information for running an extraction
#
class ExtractionDefinition < ApplicationRecord
FORMATS = %w[JSON XML HTML].freeze
FORMATS = %w[JSON XML HTML ARCHIVE_JSON].freeze

# The destination is used for Enrichment Extractions
# To know where to pull the records that are to be enriched from
Expand Down
117 changes: 117 additions & 0 deletions app/supplejack/extraction/archive_extraction.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# frozen_string_literal: true

module Extraction
class ArchiveExtraction < AbstractExtraction
def initialize(request, extraction_folder = nil, response = nil)
super()
@request = request
@extraction_folder = extraction_folder
@extraction_definition = request.extraction_definition
@response = response
end

def download_archive
extract
end

def save_entries(extraction_folder)
each_entry(extraction_folder) do |_, name|
@current_document = build_doc("#{extraction_folder}/#{name}")
save
next_page
end
ensure
clear_extracted_archive(extraction_folder)
end

private

def each_entry(extraction_folder)
@top_level_entries = []
Minitar::Input.open(StringIO.new(@document.body)) do |input|
input.each do |entry|
@top_level_entries << entry.full_name if top_level_entry?(entry.full_name)

input.extract_entry(extraction_folder, entry) do |action, name, stats|
next if action != :file_done

yield(action, name, stats)
end
end
end
end

def top_level_entry?(full_name)
return true if full_name.count('/').zero?
return true if full_name.count('/') == 1 && full_name[-1] == '/'

false
end

def clear_extracted_archive(extraction_folder)
FileUtils.rm_rf(@top_level_entries.compact_blank.map { |file| "#{extraction_folder}/#{file}" })
end

def build_doc(entry_path)
Document.new(
url: @document.url,
method: @document.method,
params: @document.params,
request_headers: @document.request_headers,
status: @document.status,
response_headers: @document.response_headers,
body: body(entry_path)
)
end

def next_page
@extraction_definition.page += 1
end

def save
raise ArgumentError, 'extraction_folder was not provided in #new' if @extraction_folder.blank?
raise '#extract must be called before #save AbstractExtraction' if @document.blank?

@current_document.save(file_path)
end

def body(extracted_file_path)
return extract_file_from_gz(extracted_file_path) if gzipped?(extracted_file_path)

File.read(extracted_file_path)
end

def gzipped?(gz_file_path)
File.open(gz_file_path, 'rb') do |file|
magic_number = file.read(2).unpack('C2')
return magic_number == [0x1F, 0x8B]
end
end

def extract_file_from_gz(gz_path)
body = Zlib::GzipReader.open(gz_path, &:read)
File.delete(gz_path)
body
end

def file_path
page_str = format('%09d', @extraction_definition.page)[-9..]
name_str = @extraction_definition.name.parameterize(separator: '_')
"#{@extraction_folder}/#{name_str}__-__#{page_str}.json"
end

def url
@request.url(@response)
end

def params
@request.query_parameters(@response)
end

def headers
return super if @request.headers.blank?

super.merge(@request.headers(@response))
end
end
end
35 changes: 29 additions & 6 deletions app/supplejack/extraction/execution.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# frozen_string_literal: true

require 'zlib'
require 'archive/tar/minitar'

module Extraction
# Performs the work as defined in the document extraction
class Execution
Expand All @@ -11,24 +14,38 @@ def initialize(job, extraction_definition)
end

def call
extract_and_save_document(@extraction_definition.requests.first)
extract(@extraction_definition.requests.first)
return if @extraction_job.is_sample? || set_number_reached?
return unless @extraction_definition.paginated?

loop do
@extraction_definition.page += 1

extract_and_save_document(@extraction_definition.requests.last)

next_page
extract(@extraction_definition.requests.last)
throttle

break if @extraction_job.reload.cancelled?
break if execution_cancelled?
break if stop_condition_met?
end
end

private

def extract(request)
if @extraction_definition.format == 'ARCHIVE_JSON'
extract_archive_and_save(request)
else
extract_and_save_document(request)
end
end

def next_page
@extraction_definition.page += 1
end

def execution_cancelled?
@extraction_job.reload.cancelled?
end

def stop_condition_met?
[set_number_reached?, extraction_failed?, duplicate_document_extracted?, custom_stop_conditions_met?].any?(true)
end
Expand Down Expand Up @@ -62,6 +79,12 @@ def throttle
sleep @extraction_definition.throttle / 1000.0
end

def extract_archive_and_save(request)
@de = ArchiveExtraction.new(request, @extraction_job.extraction_folder, @previous_request)
@de.extract
@de.save_entries(@extraction_job.extraction_folder)
end

def extract_and_save_document(request)
@de = DocumentExtraction.new(request, @extraction_job.extraction_folder, @previous_request)
@previous_request = @de.extract
Expand Down

0 comments on commit 5b90c01

Please sign in to comment.