diff --git a/Gemfile.lock b/Gemfile.lock index fe2c8ec..b791ff2 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - umbrellio-utils (1.1.0) + umbrellio-utils (1.2.0) memery (~> 1) GEM diff --git a/lib/umbrellio_utils/database.rb b/lib/umbrellio_utils/database.rb index daf747c..171e50a 100644 --- a/lib/umbrellio_utils/database.rb +++ b/lib/umbrellio_utils/database.rb @@ -25,7 +25,8 @@ def each_record(dataset, **options, &block) primary_key = primary_key_from(**options) with_temp_table(dataset, **options) do |ids| - dataset.model.where(primary_key => ids).reverse(primary_key).each(&block) + rows = ids.map { |id| row(id.is_a?(Hash) ? id.values : [id]) } + dataset.model.where(row(primary_key) => rows).reverse(row(primary_key)).each(&block) end end @@ -39,11 +40,7 @@ def with_temp_table(dataset, page_size: 1_000, sleep: nil, **options) loop do DB.transaction do - pk_expr = DB[temp_table_name].select(primary_key).reverse(primary_key).limit(page_size) - - deleted_items = DB[temp_table_name].where(primary_key => pk_expr).returning.delete - pk_set = deleted_items.map { |item| item[primary_key] } - + pk_set = pop_next_pk_batch(temp_table_name, primary_key, page_size) yield(pk_set) if pk_set.any? end @@ -61,18 +58,22 @@ def clear_lamian_logs! Lamian.logger.send(:logdevs).each { |x| x.truncate(0) && x.rewind } end - def create_temp_table(dataset, primary_key:) - model = dataset.model + def create_temp_table(dataset, **options) time = Time.current + model = dataset.model temp_table_name = "temp_#{model.table_name}_#{time.to_i}_#{time.nsec}".to_sym - type = model.db_schema[primary_key][:db_type] + primary_key = primary_key_from(**options) - DB.drop_table?(temp_table_name) DB.create_table(temp_table_name, unlogged: true) do - column primary_key, type, primary_key: true + primary_key.each do |field| + type = model.db_schema[field][:db_type] + column field, type + end + + primary_key primary_key end - insert_ds = dataset.select(Sequel[model.table_name][primary_key]) + insert_ds = dataset.select(*qualified_pk(model.table_name, primary_key)) DB[temp_table_name].disable_insert_returning.insert(insert_ds) temp_table_name @@ -80,8 +81,16 @@ def create_temp_table(dataset, primary_key:) private + def row(values) + Sequel.function(:row, *values) + end + def primary_key_from(**options) - options.fetch(:primary_key, :id) + Array(options.fetch(:primary_key, :id)) + end + + def qualified_pk(table_name, primary_key) + primary_key.map { |f| Sequel[table_name][f] } end def sleep_interval_from(sleep) @@ -94,5 +103,15 @@ def sleep_interval_from(sleep) defined?(Rails) && Rails.env.production? ? 1 : 0 end end + + def pop_next_pk_batch(temp_table_name, primary_key, batch_size) + row = row(primary_key) + pk_expr = DB[temp_table_name].select(*primary_key).reverse(row).limit(batch_size) + deleted_items = DB[temp_table_name].where(row => pk_expr).returning.delete + deleted_items.map do |item| + next item if primary_key.size > 1 + item[primary_key.first] + end + end end end diff --git a/lib/umbrellio_utils/version.rb b/lib/umbrellio_utils/version.rb index 4a80661..a899907 100644 --- a/lib/umbrellio_utils/version.rb +++ b/lib/umbrellio_utils/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module UmbrellioUtils - VERSION = "1.1.0" + VERSION = "1.2.0" end diff --git a/spec/support/database.rb b/spec/support/database.rb index 213a7b2..ad1f80e 100644 --- a/spec/support/database.rb +++ b/spec/support/database.rb @@ -23,8 +23,22 @@ column :email, :text end +DB.drop_table? :complex_users +DB.create_table :complex_users do + column :geo, :text + column :nick, :text + + primary_key %i[geo nick] +end + class User < Sequel::Model(:users) def skip_table_sync? false end end + +class ComplexUser < Sequel::Model(:complex_users) + def skip_table_sync? + false + end +end diff --git a/spec/umbrellio_utils/database_spec.rb b/spec/umbrellio_utils/database_spec.rb index 3c312e8..09fb354 100644 --- a/spec/umbrellio_utils/database_spec.rb +++ b/spec/umbrellio_utils/database_spec.rb @@ -56,6 +56,31 @@ expect(sleep_calls).to eq([]) end + context "with complex primary key" do + before { ComplexUser.multi_insert(complex_users_data) } + + let(:complex_users_data) do + Array.new(10) { |index| Hash[geo: "Europe #{index + 1}", nick: "user#{index + 1}"] } + end + + let(:nicks) { complex_users_data.pluck(:nick) } + + subject(:result_nicks) do + users = [] + + described_class.each_record(ComplexUser.dataset, primary_key: %i[geo nick]) do |user| + users << user + end + + users.map(&:nick) + end + + it "yields all records" do + expect(result_nicks).to match_array(nicks) + expect(sleep_calls).to eq([]) + end + end + context "smaller page_size and numeric sleep value" do let(:options) { Hash[page_size: 3, sleep: 10] }