diff --git a/.gitignore b/.gitignore index 0badafd..d046319 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,6 @@ test/tmp test/version_tmp tmp spec/internal/config/database.yml -spec/internal/config/sphinx.yml combustion .ruby-version Makefile.local diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..3f1d129 --- /dev/null +++ b/.rspec @@ -0,0 +1,4 @@ +--color +--tty +--format progress +--order random diff --git a/README.md b/README.md index 69d9071..0b1939d 100644 --- a/README.md +++ b/README.md @@ -13,15 +13,15 @@ ## Rake tasks Если в параметрах не передавать node, то по умолчанию комманда будет выполнена на всех нодах -rake sphinx:start[node] -rake sphinx:stop[node] -rake sphinx:restart[node] -rake sphinx:index[node,offline] -rake sphinx:rebuild[node] +rake sphinx:start[host] +rake sphinx:stop[host] +rake sphinx:restart[host] +rake sphinx:index[host,offline] +rake sphinx:rebuild[host] rake sphinx:conf -rake sphinx:copy_conf[node] -rake sphinx:rm_indexes[node] -rake sphinx:rm_binlog[node] +rake sphinx:copy_conf[host] +rake sphinx:rm_indexes[host] +rake sphinx:rm_binlog[host] *Внимание* при офлайн индексации rt индексы не очищаются. Рекоммендуется в этом случае использовать rebuild @@ -36,7 +36,6 @@ development: address: localhost port: 10300 mysql41: 9300 - listen_all_interfaces: true max_matches: 5000 version: 2.0.3 @@ -61,7 +60,6 @@ production: address: index port: 10300 mysql41: 9300 - listen_all_interfaces: true max_matches: 5000 version: 2.0.3 @@ -75,17 +73,16 @@ production: dist_threads: 2 binlog_max_log_size: 1024M rt_flush_period: 86400 + log_level: fatal + mysql_connect_timeout: 2 + mysql_read_timeout: 5 user: sphinx - remote_path: /home/index query_log_file: /dev/null - searchd_log_file: logs/searchd.log - pid_file: pid/searchd.pid - searchd_file_path: data - binlog_path: binlog - log_level: warn - mysql_connect_timeout: 2 - mysql_read_timeout: 5 + searchd_log_file: /absolute/path/to/logs/searchd.log + pid_file: /absolute/path/to/pid/searchd.pid + searchd_file_path: /absolute/path/to/data + binlog_path: /absolute/path/to/binlog ``` ### production with replication @@ -93,12 +90,13 @@ production: ```yml production: remote: true - replication: true - address: index + address: + - index-slave1 + - index-slave2 port: 10300 mysql41: 9300 - listen_all_interfaces: false + ssh_port: 22123 max_matches: 5000 version: 2.0.3 @@ -112,33 +110,16 @@ production: dist_threads: 2 binlog_max_log_size: 1024M rt_flush_period: 86400 - agent_connect_timeout: 50 - ha_strategy: nodeads - log_level: warn + log_level: fatal mysql_connect_timeout: 2 mysql_read_timeout: 5 user: sphinx - remote_path: /home/index/master query_log_file: /dev/null - searchd_log_file: logs/searchd.log - pid_file: pid/searchd.pid - searchd_file_path: data - binlog_path: binlog - - agents: - slave1: - address: index - port: 10301 - mysql41: 9301 - listen_all_interfaces: true - remote_path: /home/index/slave - slave2: - address: index2 - port: 10302 - mysql41: 9302 - listen_all_interfaces: true - remote_path: /home/index/slave + searchd_log_file: /absolute/path/to/logs/searchd.log + pid_file: /absolute/path/to/pid/searchd.pid + searchd_file_path: /absolute/path/to/data + binlog_path: /absolute/path/to/binlog ``` @@ -160,9 +141,9 @@ Workflow: Когда запускается очередная полная индексация: + начинает наполнятся core индекс сфинксовым индексатором + но в этот момент данные могут обновляться, записываться в rt индекс они будут, но потом всё равно удаляться после завершения полной индексации -+ для того, чтобы не потерять обновления данные начинают попадать в дополнительный delta_rt индекс ++ для того, чтобы не потерять обновления данные начинают попадать в дополнительный rt индекс + после завершения полной индексации, очищается основной rt индекс -+ и в него перетекают данные из delta rt индекса ++ а дополнительный rt индекс становится основным ## Дополнительные возможности конфигурировани индекса diff --git a/lib/sphinx/integration.rb b/lib/sphinx/integration.rb index 581202e..9a4325b 100644 --- a/lib/sphinx/integration.rb +++ b/lib/sphinx/integration.rb @@ -3,10 +3,13 @@ module Integration autoload :Helper, 'sphinx/integration/helper' autoload :Mysql, 'sphinx/integration/mysql' autoload :Searchd, 'sphinx/integration/searchd' + autoload :Decaying, 'sphinx/integration/decaying' autoload :Transmitter, 'sphinx/integration/transmitter' autoload :FastFacet, 'sphinx/integration/fast_facet' autoload :RecentRt, 'sphinx/integration/recent_rt' autoload :WasteRecords, 'sphinx/integration/waste_records' + autoload :ServerPool, 'sphinx/integration/server_pool' + autoload :Server, 'sphinx/integration/server' end end diff --git a/lib/sphinx/integration/decaying.rb b/lib/sphinx/integration/decaying.rb new file mode 100644 index 0000000..c2f5bc7 --- /dev/null +++ b/lib/sphinx/integration/decaying.rb @@ -0,0 +1,36 @@ +module Sphinx + module Integration + # A float value which decays exponentially toward 0 over time. + class Decaying + attr_accessor :e + attr_accessor :p + + # opts - Hash + # :p - Float (0.0) The initial value + # :e - Float (Math::E) Exponent base + # :r - Float (Math.log(0.5) / 10) Timescale factor - defaulting to decay 50% every 10 seconds + def initialize(opts = {}) + @p = opts[:p] || 0.0 + @e = opts[:e] || Math::E + @r = opts[:r] || Math.log(0.5) / 10 + @t0 = Time.now + end + + # Add to current value + # + # d - Float value to add + def <<(d) + @p = value + d + end + + # Returns Float the current value (adjusted for the time decay) + def value + return 0.0 unless @p > 0 + now = Time.now + dt = now - @t0 + @t0 = now + @p *= @e**(@r * dt) + end + end + end +end diff --git a/lib/sphinx/integration/extensions/riddle/client.rb b/lib/sphinx/integration/extensions/riddle/client.rb index 97e6d3f..44aea70 100644 --- a/lib/sphinx/integration/extensions/riddle/client.rb +++ b/lib/sphinx/integration/extensions/riddle/client.rb @@ -9,14 +9,32 @@ module Client extend ActiveSupport::Concern included do + alias_method_chain :initialize, :pooling alias_method_chain :connect, :pooling alias_method_chain :request, :pooling end + module ClassMethods + def server_pool + @server_pool + end + + def init_server_pool(servers, host) + @server_pool = ::Sphinx::Integration::ServerPool.new(servers, host, mysql: false) + end + end + + def initialize_with_pooling(*args) + initialize_without_pooling(*args) + + self.class.init_server_pool(@servers, @port) unless self.class.server_pool + end + def connect_with_pooling - options = {server: servers.sample, port: @port} - ::Sphinx::Integration::Searchd::ConnectionPool.take(options) do |socket| - yield socket + self.class.server_pool.take do |server| + server.take do |connection| + yield connection.socket + end end end diff --git a/lib/sphinx/integration/extensions/riddle/configuration.rb b/lib/sphinx/integration/extensions/riddle/configuration.rb index fbb42bc..8bf534f 100644 --- a/lib/sphinx/integration/extensions/riddle/configuration.rb +++ b/lib/sphinx/integration/extensions/riddle/configuration.rb @@ -2,35 +2,23 @@ module Sphinx::Integration::Extensions module Riddle module Configuration - autoload :Searchd, 'sphinx/integration/extensions/riddle/configuration/searchd' - autoload :DistributedIndex, 'sphinx/integration/extensions/riddle/configuration/distributed_index' - extend ActiveSupport::Concern included do alias_method_chain :render, :integration end - def render_with_integration(config_type, agent) - if config_type == :slave - searchd = @searchd.dup - searchd.address = agent[:address] - searchd.port = agent[:port] - searchd.mysql41 = agent[:mysql41] - searchd.listen_all_interfaces = agent.fetch(:listen_all_interfaces, true) - searchd.remote_path = Pathname.new(agent[:remote_path]) if agent.key?(:remote_path) - else - searchd = @searchd - end + def render_with_integration + searchd = @searchd - listen_ip = searchd.listen_all_interfaces ? '0.0.0.0' : searchd.address + listen_ip = "0.0.0.0" searchd.listen = [ "#{listen_ip}:#{searchd.port}", "#{listen_ip}:#{searchd.mysql41.is_a?(TrueClass) ? '9306' : searchd.mysql41}:mysql41", ] ( - [config_type == :master ? nil : @indexer.render, searchd.render].compact + + [@indexer.render, searchd.render].compact + @sources.collect { |source| source.render } + @indices.collect { |index| index.render } ).join("\n") diff --git a/lib/sphinx/integration/extensions/riddle/configuration/distributed_index.rb b/lib/sphinx/integration/extensions/riddle/configuration/distributed_index.rb deleted file mode 100644 index 323e09c..0000000 --- a/lib/sphinx/integration/extensions/riddle/configuration/distributed_index.rb +++ /dev/null @@ -1,63 +0,0 @@ -# coding: utf-8 -module Sphinx::Integration::Extensions - module Riddle - module Configuration - module DistributedIndex - extend ActiveSupport::Concern - - included do - attr_accessor :mirror_indices - attr_accessor :persistent - attr_writer :ha_strategy - alias_method_chain :initialize, :integration - alias_method_chain :valid?, :integration - alias_method_chain :agent, :integration - - class << self - alias_method_chain :settings, :integration - end - end - - def initialize_with_integration(name) - initialize_without_integration(name) - @mirror_indices = [] - end - - def valid_with_integration? - local_indices.any? || remote_indices.any? || mirror_indices.any? - end - - def ha_strategy - @ha_strategy if mirror_indices.any? - end - - def agents - agents_list = agent_without_integration - - mirror_indices.each do |cluster| - agents_list << cluster.map { |agent| "#{agent.remote}:#{agent.name}" }.join('|') - end - - agents_list - end - - def agent_with_integration - return if persistent - agents - end - - def agent_persistent - return unless persistent - agents - end - - module ClassMethods - def settings_with_integration - settings_without_integration + [:ha_strategy, :agent_persistent] - end - end - - end - end - end -end diff --git a/lib/sphinx/integration/extensions/riddle/configuration/searchd.rb b/lib/sphinx/integration/extensions/riddle/configuration/searchd.rb deleted file mode 100644 index 7e3fd1b..0000000 --- a/lib/sphinx/integration/extensions/riddle/configuration/searchd.rb +++ /dev/null @@ -1,38 +0,0 @@ -# coding: utf-8 -module Sphinx::Integration::Extensions - module Riddle - module Configuration - module Searchd - extend ActiveSupport::Concern - - included do - attr_accessor :listen_all_interfaces - attr_reader :remote_path - - # Базовый путь для удалённого сфинкса - # имеется ввиду тот путь, по которому лежат подпапки logs pid data binlog - def remote_path=(value) - @remote_path = Pathname.new(value.to_s) - end - - def log(with_remote = true) - remote_path && with_remote ? remote_path.join(@log) : @log - end - - def query_log(with_remote = true) - with_remote = false if @query_log.to_s == '/dev/null' - remote_path && with_remote ? remote_path.join(@query_log) : @query_log - end - - def pid_file(with_remote = true) - remote_path && with_remote ? remote_path.join(@pid_file) : @pid_file - end - - def binlog_path(with_remote = true) - remote_path && with_remote ? remote_path.join(@binlog_path) : @binlog_path - end - end - end - end - end -end \ No newline at end of file diff --git a/lib/sphinx/integration/extensions/thinking_sphinx.rb b/lib/sphinx/integration/extensions/thinking_sphinx.rb index d25224d..8372b9f 100644 --- a/lib/sphinx/integration/extensions/thinking_sphinx.rb +++ b/lib/sphinx/integration/extensions/thinking_sphinx.rb @@ -12,8 +12,6 @@ module Sphinx::Integration::Extensions::ThinkingSphinx autoload :Source, 'sphinx/integration/extensions/thinking_sphinx/source' autoload :Configuration, 'sphinx/integration/extensions/thinking_sphinx/configuration' autoload :LastIndexingTime, 'sphinx/integration/extensions/thinking_sphinx/last_indexing_time' - autoload :Statements, 'sphinx/integration/extensions/thinking_sphinx/statements' - autoload :Context, 'sphinx/integration/extensions/thinking_sphinx/context' extend ActiveSupport::Concern @@ -21,7 +19,10 @@ module Sphinx::Integration::Extensions::ThinkingSphinx DEFAULT_MATCH = :extended2 include Sphinx::Integration::FastFacet include LastIndexingTime - extend Sphinx::Integration::Extensions::ThinkingSphinx::Statements + + class << self + attr_writer :logger + end end module ClassMethods @@ -35,10 +36,6 @@ def reset_indexed_models end end - def replication? - ThinkingSphinx::Configuration.instance.replication? - end - def error(exception_or_message, severity = ::Logger::ERROR) if exception_or_message.is_a?(::Exception) log(exception_or_message.message, severity) @@ -61,7 +58,16 @@ def warn(message) end def log(message, severity = ::Logger::INFO) - logger.add(severity, message) + message.to_s.split("\n").each { |m| logger.add(severity, m) if m.present? } + + return unless block_given? + + begin + yield + rescue Exception => exception + fatal(exception) + raise + end end alias_method :info, :log @@ -72,28 +78,5 @@ def logger logger.level = ::Logger.const_get(::ThinkingSphinx::Configuration.instance.log_level.upcase) end end - - # Посылает sql запрос в Sphinx - # - # query - String - # - # Returns Mysql2::Result|NilClass - def execute(query, options = {}) - result = nil - ::ThinkingSphinx::Search.log(query) do - take_connection(options) do |connection| - result = connection.execute(query) - end - end - result - end - - def take_connection(options = {}) - method = options[:on_slaves] ? :take_slaves : :take - - ::Sphinx::Integration::Mysql::ConnectionPool.send(method) do |connection| - yield connection - end - end end end diff --git a/lib/sphinx/integration/extensions/thinking_sphinx/active_record.rb b/lib/sphinx/integration/extensions/thinking_sphinx/active_record.rb index 80392a3..3c65638 100644 --- a/lib/sphinx/integration/extensions/thinking_sphinx/active_record.rb +++ b/lib/sphinx/integration/extensions/thinking_sphinx/active_record.rb @@ -41,9 +41,8 @@ def transmitter_destroy end module ClassMethods - def max_matches - @ts_max_matches ||= ThinkingSphinx::Configuration.instance.configuration.searchd.max_matches || 5000 + ThinkingSphinx::Configuration.instance.configuration.searchd.max_matches || 5000 end def define_secondary_index(*args, &block) @@ -69,21 +68,6 @@ def rt_indexed_by_sphinx? def add_sphinx_callbacks_and_extend(*args) include Sphinx::Integration::Extensions::ThinkingSphinx::ActiveRecord::TransmitterCallbacks end - - # Индексы для конфига - # - # config_type - Symbol - # - # Returns Array - def to_riddle(config_type) - define_indexes - sphinx_database_adapter.setup - - local_sphinx_indexes.collect { |index| - index.to_riddle(sphinx_offset, config_type) - }.flatten - end - end # Находится ли запись в сфинксе diff --git a/lib/sphinx/integration/extensions/thinking_sphinx/configuration.rb b/lib/sphinx/integration/extensions/thinking_sphinx/configuration.rb index 97103d6..acf97b5 100644 --- a/lib/sphinx/integration/extensions/thinking_sphinx/configuration.rb +++ b/lib/sphinx/integration/extensions/thinking_sphinx/configuration.rb @@ -3,17 +3,28 @@ module Sphinx::Integration::Extensions::ThinkingSphinx::Configuration extend ActiveSupport::Concern included do - attr_accessor :remote, :replication, :agent_connect_timeout, :agent_query_timeout, - :ha_strategy, :user, :password, :exclude, :log_level, :mysql_read_timeout, :mysql_connect_timeout + attr_accessor :remote, :user, :password, :exclude, :ssh_port, + :log_level, :mysql_read_timeout, :mysql_connect_timeout - attr_reader :agents - - alias_method_chain :build, :integration - alias_method_chain :generate, :integration - alias_method_chain :config_file, :integration + alias_method_chain :shuffled_addresses, :integration alias_method_chain :reset, :integration - alias_method_chain :searchd_file_path, :integration alias_method_chain :enforce_common_attribute_types, :rt + + def initial_model_directories + [] + end + end + + def mysql_client + return @mysql_client if @mysql_client + + port = configuration.searchd.mysql41 + port = 9306 if port.is_a?(TrueClass) + @mysql_client = Sphinx::Integration::Mysql::Client.new(shuffled_addresses, port) + end + + def shuffled_addresses_with_integration + Array.wrap(address) end # Находится ли sphinx на другой машине @@ -23,118 +34,24 @@ def remote? !!remote end - # Включена ли репликация - # - # Returns boolean - def replication? - !!replication - end - def reset_with_integration(custom_app_root = nil) self.remote = false - self.replication = false - self.agents = [] - self.agent_connect_timeout = 50 - self.agent_query_timeout = 5000 - self.ha_strategy = 'nodeads' self.user = 'sphinx' self.exclude = [] - self.log_level = "warn" + self.log_level = "fatal" self.mysql_connect_timeout = 2 self.mysql_read_timeout = 5 - @configuration.searchd.listen_all_interfaces = true + self.ssh_port = 22 + @mysql_client = nil reset_without_integration(custom_app_root) - end - - def agents=(value) - @agents = {} - value.each do |name, agent| - @agents[name] = agent.merge(:name => name).with_indifferent_access - @agents[name][:remote_path] = Pathname.new(@agents[name][:remote_path]) if @agents[name].key?(:remote_path) - end - end - - # Построение конфигурационных файлов для сфинкса - # - # Returns nothing - def build_with_integration - if replication? - generate(:master) - - agents.each do |_, agent| - generate(:slave, agent) - end - else - generate(:single) - end - end - - # Генерация конкретного конфига по типу - # - # config_type - Symbol any of [:single, :master, :slave] - # agent - Hash (default: nil) - # - # Returns nothing - def generate_with_integration(config_type, agent = nil) - @configuration.indices.clear - - ThinkingSphinx.context.indexed_models.each do |model| - model = model.constantize - model.define_indexes - if agent - prev_remote_path = @configuration.searchd.remote_path - @configuration.searchd.remote_path = Pathname.new(agent[:remote_path]) - end - - @configuration.indices.concat(model.to_riddle(config_type)) - - @configuration.searchd.remote_path = prev_remote_path if agent - - enforce_common_attribute_types - end - - write_config(config_type, agent) - end - - # Путь для файла конфигурации - # - # config_type - Symbol - # agent - Hash - # - # Returns String - def config_file_with_integration(config_type, agent = nil) - case config_type - when :single - "#{app_root}/config/#{environment}.sphinx.conf" - when :master - "#{app_root}/config/#{environment}.sphinx.master.conf" - when :slave - "#{app_root}/config/#{environment}.sphinx.slave-#{agent[:name]}.conf" - else - ArgumentError - end - end - # Запись конфига в файл - # - # config_type - Symbol - # agent - Hash - # - # Returns nothing - def write_config(config_type, agent) - file_path = config_file(config_type, agent) - open(file_path, "w") do |file| - file.write @configuration.render(config_type, agent) - end + return if @configuration.searchd.binlog_path + @configuration.searchd.binlog_path = "#{app_root}/db/sphinx/#{environment}" end - def searchd_file_path_with_integration(with_remote = true) - if @configuration.searchd.remote_path && with_remote - @configuration.searchd.remote_path.join(@searchd_file_path) - else - @searchd_file_path - end + def generated_config_file + Rails.root.join("config", "#{Rails.env}.sphinx.conf").to_s end # Не проверям на валидность RT индексы diff --git a/lib/sphinx/integration/extensions/thinking_sphinx/context.rb b/lib/sphinx/integration/extensions/thinking_sphinx/context.rb deleted file mode 100644 index 13d59f3..0000000 --- a/lib/sphinx/integration/extensions/thinking_sphinx/context.rb +++ /dev/null @@ -1,17 +0,0 @@ -module Sphinx - module Integration - module Extensions - module ThinkingSphinx - module Context - extend ActiveSupport::Concern - - included do - def load_models - # nope - end - end - end - end - end - end -end diff --git a/lib/sphinx/integration/extensions/thinking_sphinx/index.rb b/lib/sphinx/integration/extensions/thinking_sphinx/index.rb index 1c43112..b62b548 100644 --- a/lib/sphinx/integration/extensions/thinking_sphinx/index.rb +++ b/lib/sphinx/integration/extensions/thinking_sphinx/index.rb @@ -6,8 +6,9 @@ module Sphinx::Integration::Extensions::ThinkingSphinx::Index included do attr_accessor :merged_with_core, :is_core_index, :mva_sources + alias_method_chain :initialize, :mutex - alias_method_chain :to_riddle, :replication + alias_method_chain :to_riddle, :integration alias_method_chain :to_riddle_for_distributed, :merged alias_method_chain :to_riddle_for_core, :integration alias_method_chain :all_names, :rt @@ -24,29 +25,9 @@ def initialize_with_mutex(model, &block) # config_type - Symbol # # Returns Array - def to_riddle_with_replication(offset, config_type) + def to_riddle_with_integration(offset) return [] if merged_with_core? - if config_type == :master - to_master_riddle - else - to_slave_riddle(offset) - end - end - - # Сформировать набор индесов для мастера - # - # Returns nothing - def to_master_riddle - build_read_indexes + build_write_indexes - end - - # Сформировать набор индексов для слейва - # - # offset - Integer - # - # Returns nothing - def to_slave_riddle(offset) indexes = [to_riddle_for_core(offset)] indexes << to_riddle_for_delta(offset) if delta? @@ -65,61 +46,6 @@ def to_slave_riddle(offset) indexes end - # Построить distributed индексы на чтение - # - # Returns Array - def build_read_indexes - indexes = [] - all_index_names.each do |index_name| - indexes << (index = build_master_distributed(index_name)) - cluster = [] - config.agents.each do |name, agent| - cluster << build_master_remote(index_name, agent) - end - index.mirror_indices << cluster - end - - indexes - end - - # Построить distributed индексы на запись - # - # Returns Array - def build_write_indexes - indexes = [] - all_index_names.each do |index_name| - indexes << (index = build_master_distributed("#{index_name}_w")) - config.agents.each do |name, agent| - index.remote_indices << build_master_remote(index_name, agent) - end - end - - indexes - end - - # Создать объект distributed индекс для мастера - # - # index_name - String - # - # Returns Riddle::Configuration::DistributedIndex - def build_master_distributed(index_name) - index = Riddle::Configuration::DistributedIndex.new(index_name) - index.agent_connect_timeout = config.agent_connect_timeout - index.agent_query_timeout = config.agent_query_timeout - index.ha_strategy = 'nodeads' - index.persistent = true - index - end - - # Создать объект remote индекс для мастера - # - # index_name - String - # - # Returns Riddle::Configuration::RemoteIndex - def build_master_remote(index_name, agent) - Riddle::Configuration::RemoteIndex.new(agent['address'], agent['port'], index_name) - end - # Имена всех индексов # # Returns Array @@ -193,15 +119,7 @@ def to_riddle_for_core_with_integration(offset) # # Returns nothing def truncate(index_name) - ThinkingSphinx.execute("TRUNCATE RTINDEX #{index_name}", :on_slaves => config.replication?) - end - - def name_w - @name_w ||= config.replication? ? "#{name}_w" : name - end - - def core_name_w - @core_name_w ||= config.replication? ? "#{core_name}_w" : core_name + ThinkingSphinx::Configuration.instance.mysql_client.write("TRUNCATE RTINDEX #{index_name}") end def rt_name(partition) @@ -209,11 +127,6 @@ def rt_name(partition) @rt_name[partition] ||= "#{name}_rt#{partition}" end - def rt_name_w(partition) - @rt_name_w ||= {} - @rt_name_w[partition] ||= config.replication? ? "#{rt_name(partition)}_w" : rt_name(partition) - end - def rt? !!@options[:rt] end diff --git a/lib/sphinx/integration/extensions/thinking_sphinx/statements.rb b/lib/sphinx/integration/extensions/thinking_sphinx/statements.rb deleted file mode 100644 index 5d0fcc0..0000000 --- a/lib/sphinx/integration/extensions/thinking_sphinx/statements.rb +++ /dev/null @@ -1,70 +0,0 @@ -module Sphinx - module Integration - module Extensions - module ThinkingSphinx - module Statements - def replace(index_name, data) - query = ::Riddle::Query::Insert.new(index_name, data.keys, data.values).replace!.to_sql - execute(query, on_slaves: replication?) - end - - def update(index_name, data, where) - query = ::Sphinx::Integration::Extensions::Riddle::Query::Update.new(index_name, data, where).to_sql - execute(query) - end - - def delete(index_name, document_id) - query = ::Riddle::Query::Delete.new(index_name, document_id).to_sql - execute(query) - end - - def soft_delete(index_name, document_id) - update(index_name, {sphinx_deleted: 1}, {id: document_id}) - end - - def select(values, index_name, where, limit = nil) - query = ::Riddle::Query::Select. - new. - reset_values. - values(values). - from(index_name). - where(where). - limit(limit). - to_sql - execute(query).to_a - end - - def find_in_batches(index_name, options = {}) - primary_key = options.fetch(:primary_key, "sphinx_internal_id").to_s.freeze - batch_size = options.fetch(:batch_size, 1_000) - batch_order = "#{primary_key} ASC" - where = options.fetch(:where, {}) - where[primary_key.to_sym] = -> { "> 0" } - - query = ::Riddle::Query::Select.new.reset_values. - values(primary_key). - from(index_name). - where(where). - order_by(batch_order). - limit(batch_size). - matching(options[:matching]) - - records = execute(query.to_sql).to_a - while records.any? - primary_key_offset = records.last[primary_key].to_i - - records.map! { |record| record[primary_key].to_i } - yield records - - break if records.size < batch_size - - where[primary_key.to_sym] = -> { "> #{primary_key_offset}" } - query.where(where) - records = execute(query.to_sql).to_a - end - end - end - end - end - end -end diff --git a/lib/sphinx/integration/helper.rb b/lib/sphinx/integration/helper.rb index a3b7818..9846bc6 100644 --- a/lib/sphinx/integration/helper.rb +++ b/lib/sphinx/integration/helper.rb @@ -1,19 +1,29 @@ # coding: utf-8 -require 'rye' -require 'redis-mutex' +require "redis-mutex" +require "logger" module Sphinx::Integration - class Helper - MAX_FULL_REINDEX_LOCK_TIME = 6.hours - CONFIG_PATH = 'conf/sphinx.conf'.freeze - REINDEX_CONFIG_PATH = 'conf/sphinx.reindex.conf'.freeze + module HelperAdapters + autoload :Base, 'sphinx/integration/helper_adapters/base' + autoload :Local, 'sphinx/integration/helper_adapters/local' + autoload :Remote, 'sphinx/integration/helper_adapters/remote' + end - attr_reader :node - attr_reader :master - attr_reader :agents - attr_reader :nodes + class Helper + attr_reader :sphinx delegate :recent_rt, to: 'self.class' + delegate :log, to: 'ThinkingSphinx' + + [:running?, :stop, :start, :remove_indexes, :remove_binlog, :copy_config, :reload].each do |method_name| + class_eval <<-EORUBY, __FILE__, __LINE__ + 1 + def #{method_name} + log "#{method_name.capitalize}" do + sphinx.#{method_name} + end + end + EORUBY + end def self.full_reindex? Redis::Mutex.new(:full_reindex).locked? @@ -23,186 +33,47 @@ def self.recent_rt @recent_rt ||= Sphinx::Integration::RecentRt.new end - def initialize(node = nil) + def initialize(options = {}) ThinkingSphinx.context.define_indexes - node = node.presence || 'all' - - if config.replication? && !config.remote? - raise 'Support for replication only remote!' + ThinkingSphinx.logger = ::Logger.new(Rails.root.join("log", "index.log")).tap do |logger| + logger.formatter = ::Logger::Formatter.new + logger.level = ::Logger::INFO end - if config.remote? && config.configuration.searchd.remote_path.nil? - raise 'Config remote_path should be spicified!' - end - - @node = ActiveSupport::StringInquirer.new(node) - @logger = ::Logger.new(STDOUT) - @logger.formatter = ::Logger::Formatter.new - log "Sphinx Helper initialized with node: #{node}" - - init_ssh if config.remote? + @sphinx = config.remote? ? HelperAdapters::Remote.new(options) : HelperAdapters::Local.new(options) end - # Запущен ли сфинкс? - # - # Returns boolean - def sphinx_running? - if config.remote? - begin - nodes.searchd('--status') - rescue Rye::Err - false - end - else - ThinkingSphinx.sphinx_running? - end - end - - # Остновить сфинкс - # - # Returns nothing - def stop - log "Stop sphinx" - if config.remote? - nodes.searchd('--stopwait') - else - local_searchd('--stopwait') - end - end - - # Запустить сфинкс - # - # Returns nothing - def start - log "Start sphinx" - if config.remote? - nodes.searchd - else - local_searchd - end - end - - # Перезапустить сфинкс - # - # Returns nothing def restart stop - sleep 1 start end - # Построить конфиги для сфинкса - # - # Returns nothing def configure - log "Configure sphinx" - config.build - end - - # Уничтожить все индексы - # - # Returns nothing - def remove_indexes - log "Remove indexes" - - if config.remote? - nodes.remove_indexes - else - files = Dir.glob("#{config.searchd_file_path}/*.*") - log files.join("\n") - FileUtils.rm(files) - end - end - - # Уничтожить бинарный лог - # - # Returns nothing - def remove_binlog - if (binlog_path = config.configuration.searchd.binlog_path).present? - log "Remove binlog" - - if config.remote? - nodes.remove_binlog - else - files = Dir.glob("#{config.searchd_file_path}/*.*") - log files.join("\n") - FileUtils.rm(Dir.glob("#{binlog_path}/*.*")) - end - end - end - - # Скопировать конфиги на сервер(ы) - # - # Returns nothing - def copy_config - return unless config.remote? - - master_remote_path = config.configuration.searchd.remote_path - - if config.replication? - if node.all? || node.master? - config.agents.each do |_, agent| - remote_config_path = agent.fetch(:remote_path, master_remote_path).join(CONFIG_PATH) - agent[:box].file_upload(config.config_file(:slave, agent), remote_config_path) - end if node.all? - - master.file_upload(config.config_file(:master), master_remote_path.join(CONFIG_PATH)) - else - nodes.boxes.each do |box| - _, agent = config.agents.detect { |_, agent| agent[:box] == box } - remote_config_path = agent.fetch(:remote_path, master_remote_path).join(CONFIG_PATH) - box.file_upload(config.config_file(:slave, agent), remote_config_path) - end - end - else - master.file_upload(config.config_file(:single), master_remote_path.join(CONFIG_PATH)) + log "Configure sphinx" do + config.build(config.generated_config_file) end end - # Переиндексация сфинкса - # - # online - boolean (default: true) означает, что в момент индексации все апдейты будут писаться в дельту def index(online = true) - raise 'Мастер ноду нельзя индексировать' if node.master? - reset_waste_records - log "Start indexing" - indexer_args = [] - indexer_args << '--rotate' if online - - if config.remote? - indexer_args << "--config %REMOTE_PATH%/#{CONFIG_PATH}" - if config.replication? - if node.all? - full_reindex_with_replication(online) - else - with_index_lock { nodes.indexer(indexer_args) } - truncate_rt_indexes(recent_rt.prev) - end - else - with_index_lock do - master.indexer(indexer_args) - recent_rt.switch - end - truncate_rt_indexes(recent_rt.prev) if online - end - else + log "Index sphinx" do with_index_lock do - local_indexer(indexer_args) - recent_rt.switch + @sphinx.index(online) end - truncate_rt_indexes(recent_rt.prev) if online end ThinkingSphinx.set_last_indexing_finish_time + + return unless online + + truncate_rt_indexes(recent_rt.prev) + cleanup_waste_records + recent_rt.switch end alias_method :reindex, :index - # Полное перестроение с нуля - # - # Returns nothing def rebuild with_updates_lock do stop @@ -222,127 +93,26 @@ def truncate_rt_indexes(partition = nil) log "Truncate rt indexes" rt_indexes do |index| - begin - log "- #{index.name}" + log "- #{index.name}" do if partition index.truncate(index.rt_name(partition)) else index.truncate(index.rt_name(0)) index.truncate(index.rt_name(1)) end - rescue ::Sphinx::Integration::SphinxError => error - log "sphinx error: #{error.message}" end end - - cleanup_waste_records end private - def log(message) - @logger.info(message) - end - - # Инициализация Rye - which run SSH commands on a bunch of machines at the same time - # - # Returns nothing - def init_ssh - add_commands - - @master = Rye::Box.new(config.address, ssh_options) - @master.pre_command_hook = proc do |cmd, *_| - cmd.gsub!('%REMOTE_PATH%', config.configuration.searchd.remote_path.cleanpath.to_s) - ::Kernel.puts "$ #{cmd}" - end - @master.stdout_hook = proc { |data| ::Kernel.puts data.to_s } - - @nodes = Rye::Set.new('nodes', ssh_options.merge(:parallel => true)) - - if config.replication? - @agents = Rye::Set.new('agents', ssh_options) - - config.agents.each do |name, agent| - agent[:box] = Rye::Box.new(agent[:address], ssh_options) - agent[:box].pre_command_hook = proc do |cmd, *_| - remote_path = agent.fetch(:remote_path, config.configuration.searchd.remote_path) - cmd.gsub!('%REMOTE_PATH%', remote_path.cleanpath.to_s) - ::Kernel.puts "$ #{cmd}" - end - agent[:box].stdout_hook = proc { |data| ::Kernel.puts data.to_s } - agents.add_box(agent[:box]) - nodes.add_box(agent[:box]) if node.all? || node == name - end - end - - nodes.add_box(master) if node.all? || node.master? - end - - # Опции подключения для Rye - # - # Returns Hash - def ssh_options - { - :quiet => false, - :info => true, - :safe => false, - :user => config.user, - :debug => false - } - end - - # Добавление комманд Rye, которые будут запускаться удалённо - # - # Returns nothing - def add_commands - Rye::Cmd.add_command :searchd, 'searchd', "--config %REMOTE_PATH%/#{CONFIG_PATH}" - Rye::Cmd.add_command :indexer, 'indexer', "--all" - Rye::Cmd.add_command :remove_indexes, 'rm', "-f %REMOTE_PATH%/#{config.searchd_file_path(false)}/*" - Rye::Cmd.add_command :remove_binlog, 'rm', "-f %REMOTE_PATH%/#{config.configuration.searchd.binlog_path(false)}/*" - end - - # Полня переиндексация всего кластера - # - # online - boolean - # - # Returns nothing - def full_reindex_with_replication(online = true) - with_index_lock do - main_box = agents.boxes.shift - main_agent = config.agents.detect { |_, x| x[:box] == main_box }.last - - if online - # При онлайн индексации нужно сгенерировать индексные файлы, с опцией --rotate они сразу подменятся - # и потом скопировать их на другие слевый будет невозможно (так пробовал, но другие слейвы падали в сегфолт). - # Поэтому делаем новую конфигу для индексатора, в которой пути для индексных файлов будут иметь суффикс .new. - # которые мы потом копируем на все слейвы и посылаем SIGHUP всем сфинксам - # По этому сигналу сфинкс ищет индексные файлы с таким префиксом и переключается на них, переименовывая их. - main_box.execute("cat %REMOTE_PATH%/#{CONFIG_PATH} | sed -e '/data\\//s/_core/_core.new/g' > %REMOTE_PATH%/#{REINDEX_CONFIG_PATH}") - main_box.indexer("--config %REMOTE_PATH%/#{REINDEX_CONFIG_PATH}") - main_box.rm("%REMOTE_PATH%/#{REINDEX_CONFIG_PATH}") - else - main_box.indexer("--config %REMOTE_PATH%/#{CONFIG_PATH}") - end - - data_path = Pathname.new(config.searchd_file_path(false)).cleanpath.to_s - main_agent_remote_path = main_agent.fetch(:remote_path, config.configuration.searchd.remote_path) - source_path = main_agent_remote_path.join(data_path, "*#{'.new.*' if online}") - main_ssh_credentials = "#{main_box.user}@#{main_box.host}" - agents.execute("rsync -lptv #{main_ssh_credentials}:#{source_path} %REMOTE_PATH%/#{data_path}/") - agents.boxes.unshift(main_box) - agents.kill("-SIGHUP `cat %REMOTE_PATH%/#{config.configuration.searchd.pid_file(false)}`") if online - - recent_rt.switch - end - - truncate_rt_indexes(recent_rt.prev) if online - end - def reset_waste_records log "Reset waste records" + rt_indexes do |index| - log "- #{index.name}" - Sphinx::Integration::WasteRecords.for(index).reset + log "- #{index.name}" do + Sphinx::Integration::WasteRecords.for(index).reset + end end end @@ -355,12 +125,9 @@ def cleanup_waste_records end rt_indexes do |index| - begin - waste_records = Sphinx::Integration::WasteRecords.for(index) - log "- #{index.name} (#{waste_records.size} records)" + waste_records = Sphinx::Integration::WasteRecords.for(index) + log "- #{index.name} (#{waste_records.size} records)" do waste_records.cleanup - rescue ::Sphinx::Integration::SphinxError => error - log "sphinx error: #{error.message}" end end end @@ -369,28 +136,17 @@ def config ThinkingSphinx::Configuration.instance end - def local_searchd(*params) - puts Rye.shell(:searchd, "--config #{config.config_file(:single)}", *params).inspect - end - - def local_indexer(*params) - puts Rye.shell(:indexer, "--config #{config.config_file(:single)} --all", *params).inspect - end - - # Установить блокировку на индексацию + # Установить блокировку на изменение данных в приложении # # Returns nothing - def with_index_lock - Redis::Mutex.with_lock(:full_reindex, :expire => MAX_FULL_REINDEX_LOCK_TIME) do + def with_updates_lock + Redis::Mutex.with_lock(:updates, expire: 10.hours) do yield end end - # Установить блокировку на изменение данных в приложении - # - # Returns nothing - def with_updates_lock - Redis::Mutex.with_lock(:updates, :expire => MAX_FULL_REINDEX_LOCK_TIME) do + def with_index_lock + Redis::Mutex.with_lock(:full_reindex, expire: 10.hours) do yield end end diff --git a/lib/sphinx/integration/helper_adapters/base.rb b/lib/sphinx/integration/helper_adapters/base.rb new file mode 100644 index 0000000..4e7ae41 --- /dev/null +++ b/lib/sphinx/integration/helper_adapters/base.rb @@ -0,0 +1,19 @@ +module Sphinx + module Integration + module HelperAdapters + class Base + delegate :log, to: 'ThinkingSphinx' + + def initialize(options = {}) + @options = options + end + + private + + def config + ThinkingSphinx::Configuration.instance + end + end + end + end +end diff --git a/lib/sphinx/integration/helper_adapters/local.rb b/lib/sphinx/integration/helper_adapters/local.rb new file mode 100644 index 0000000..b1ef332 --- /dev/null +++ b/lib/sphinx/integration/helper_adapters/local.rb @@ -0,0 +1,58 @@ +require "rye" + +module Sphinx + module Integration + module HelperAdapters + class Local < Base + def running? + ThinkingSphinx.sphinx_running? + end + + def stop + searchd("--stopwait") + end + + def start + searchd + end + + def remove_indexes + remove_files("#{config.searchd_file_path}/*_{core,rt0,rt1}.*") + end + + def remove_binlog + return unless config.configuration.searchd.binlog_path.present? + remove_files("#{config.configuration.searchd.binlog_path}/binlog.*") + end + + def copy_config + # no-op + end + + def index(online) + online ? indexer("--rotate") : indexer + end + + def reload + Rye.shell(:kill, "-SIGHUP `#{config.configuration.searchd.pid_file}`") + end + + private + + def searchd(*params) + log Rye.shell(:searchd, "--config #{config.config_file}", *params).inspect + end + + def indexer(*params) + log Rye.shell(:indexer, "--config #{config.config_file} --all", *params).inspect + end + + def remove_files(pattern) + files = Dir.glob(pattern) + log files.join("\n") + FileUtils.rm(files) + end + end + end + end +end diff --git a/lib/sphinx/integration/helper_adapters/remote.rb b/lib/sphinx/integration/helper_adapters/remote.rb new file mode 100644 index 0000000..541f4fe --- /dev/null +++ b/lib/sphinx/integration/helper_adapters/remote.rb @@ -0,0 +1,146 @@ +require "rye" + +module Sphinx + module Integration + module HelperAdapters + class SshProxy + DEFAULT_SSH_OPTIONS = { + quiet: false, + info: true, + safe: false, + debug: false, + forward_agent: true + }.freeze + + delegate :file_upload, to: "@servers" + + # options - Hash + # :hosts - Array of String (required) + # :port - Integer ssh port (default: 22) + # :user - String (default: sphinx) + def initialize(options = {}) + options.reverse_merge!(user: "sphinx", port: 22) + + @servers = Rye::Set.new("servers", parallel: true) + + Array.wrap(options.fetch(:hosts)).each do |host| + server = Rye::Box.new(host, DEFAULT_SSH_OPTIONS.merge(options.slice(:user, :port))) + server.stdout_hook = proc { |data| ::ThinkingSphinx.info(data) } + server.pre_command_hook = proc { |cmd, *| ::ThinkingSphinx.info(cmd) } + @servers.add_box(server) + end + end + + def within(host) + removed_servers = [] + @servers.boxes.keep_if { |server| server.host == host || (removed_servers << server && false) } + yield + @servers.boxes.concat(removed_servers) + end + + def without(host) + index = @servers.boxes.index { |x| x.host == host } + server = @servers.boxes.delete_at(index) + yield server + @servers.boxes.insert(index, server) + end + + def execute(*args) + options = args.extract_options! + exit_status = Array.wrap(options.fetch(:exit_status, 0)) + raps = @servers.execute(*args) + has_errors = false + + raps.each do |rap| + real_status = rap[0].is_a?(::Rye::Err) ? rap[0].exit_status : rap.exit_status + unless exit_status.include?(real_status) + ThinkingSphinx.fatal(rap.inspect) + has_errors ||= true + end + end + raise "Error in executing #{args.inspect}" if has_errors + end + end + + class Remote < Base + def initialize(*) + super + + @ssh = SshProxy.new(hosts: hosts, port: config.ssh_port, user: config.user) + end + + def running? + !!@ssh.execute("searchd", "--config #{config.config_file}", "--status") + rescue Rye::Err + false + end + + def stop + @ssh.execute("searchd", "--config #{config.config_file}", "--stopwait") + end + + def start + @ssh.execute("searchd", "--config #{config.config_file}") + end + + def remove_indexes + remove_files("#{config.searchd_file_path}/*.*") + end + + def remove_binlog + return unless config.configuration.searchd.binlog_path.present? + remove_files("#{config.configuration.searchd.binlog_path}/binlog.*") + end + + def copy_config + @ssh.file_upload(config.generated_config_file, config.config_file) + end + + def index(online) + indexer_args = ["--all", "--config #{config.config_file}"] + indexer_args << "--rotate" if online + + if hosts.one? + @ssh.execute("indexer", *indexer_args) + return + end + + @ssh.within(reindex_host) do + indexer_args << "--nohup" if online + @ssh.execute("indexer", *indexer_args, exit_status: [0, 2]) + @ssh.execute("rename -f 's/\\.tmp\\./\\.new\\./' #{config.searchd_file_path}/*_core.tmp.*") if online + end + + files = "#{config.searchd_file_path}/*_core#{".new" if online}.*" + @ssh.without(reindex_host) do |server| + @ssh.execute("rsync", "-ptzv", "-e 'ssh -p #{server.opts[:port]}'", + "#{server.user}@#{server.host}:#{files} #{config.searchd_file_path}") + end + + reload if online + end + + def reload + @ssh.execute("kill", "-SIGHUP `cat #{config.configuration.searchd.pid_file}`") + end + + private + + def hosts + return @hosts if @hosts + @hosts = Array.wrap(config.address) + @hosts = @hosts.select { |host| @options[:host] == host } if @options[:host].presence + @hosts + end + + def remove_files(pattern) + @ssh.execute("rm", "-f", pattern) + end + + def reindex_host + @reindex_host ||= hosts.first + end + end + end + end +end diff --git a/lib/sphinx/integration/mysql.rb b/lib/sphinx/integration/mysql.rb index 949b2fe..e21133e 100644 --- a/lib/sphinx/integration/mysql.rb +++ b/lib/sphinx/integration/mysql.rb @@ -1,5 +1,9 @@ -# coding: utf-8 -module Sphinx::Integration::Mysql - autoload :ConnectionPool, 'sphinx/integration/mysql/connection_pool' - autoload :Connection, 'sphinx/integration/mysql/connection' -end \ No newline at end of file +module Sphinx + module Integration + module Mysql + autoload :Client, 'sphinx/integration/mysql/client' + autoload :ConnectionPool, 'sphinx/integration/mysql/connection_pool' + autoload :Connection, 'sphinx/integration/mysql/connection' + end + end +end diff --git a/lib/sphinx/integration/mysql/client.rb b/lib/sphinx/integration/mysql/client.rb new file mode 100644 index 0000000..cf195be --- /dev/null +++ b/lib/sphinx/integration/mysql/client.rb @@ -0,0 +1,93 @@ +module Sphinx + module Integration + module Mysql + class Client + def initialize(hosts, port) + @server_pool = ServerPool.new(hosts, port) + end + + def read(sql) + execute(sql, all: false) + end + + def write(sql) + execute(sql, all: true) + end + + def replace(index_name, data) + write ::Riddle::Query::Insert.new(index_name, data.keys, data.values).replace!.to_sql + end + + def update(index_name, data, where) + write ::Sphinx::Integration::Extensions::Riddle::Query::Update.new(index_name, data, where).to_sql + end + + def delete(index_name, document_id) + write ::Riddle::Query::Delete.new(index_name, document_id).to_sql + end + + def soft_delete(index_name, document_id) + update(index_name, {sphinx_deleted: 1}, id: document_id) + end + + def select(values, index_name, where, limit = nil) + sql = ::Riddle::Query::Select. + new. + reset_values. + values(values). + from(index_name). + where(where). + limit(limit). + to_sql + + read(sql).to_a + end + + def find_in_batches(index_name, options = {}) + primary_key = options.fetch(:primary_key, "sphinx_internal_id").to_s.freeze + batch_size = options.fetch(:batch_size, 1_000) + batch_order = "#{primary_key} ASC" + where = options.fetch(:where, {}) + where[primary_key.to_sym] = -> { "> 0" } + + query = ::Riddle::Query::Select.new.reset_values. + values(primary_key). + from(index_name). + where(where). + order_by(batch_order). + limit(batch_size). + matching(options[:matching]) + + records = read(query.to_sql).to_a + while records.any? + primary_key_offset = records.last[primary_key].to_i + + records.map! { |record| record[primary_key].to_i } + yield records + + break if records.size < batch_size + + where[primary_key.to_sym] = -> { "> #{primary_key_offset}" } + query.where(where) + records = read(query.to_sql).to_a + end + end + + private + + def execute(sql, options = {}) + result = nil + ::ThinkingSphinx::Search.log(sql) do + @server_pool.send(options[:all] ? :take_all : :take) do |server| + server.take do |connection| + result = connection.execute(sql) + end + end + end + + result + end + end + end + end +end diff --git a/lib/sphinx/integration/mysql/connection.rb b/lib/sphinx/integration/mysql/connection.rb index 3b9c4ad..ac1ab8c 100644 --- a/lib/sphinx/integration/mysql/connection.rb +++ b/lib/sphinx/integration/mysql/connection.rb @@ -1,52 +1,65 @@ -# coding: utf-8 -require 'mysql2' +require "mysql2" -class Sphinx::Integration::Mysql::Connection - def initialize(options) - options[:flags] ||= Mysql2::Client::MULTI_STATEMENTS - @client_options = options - end +module Sphinx + module Integration + module Mysql + class Connection + def initialize(host, port) + config = ::ThinkingSphinx::Configuration.instance - def close - client.close - rescue - # silence close - end + @client_options = { + host: host, + port: port, + flags: Mysql2::Client::MULTI_STATEMENTS, + reconnect: true, + read_timeout: config.mysql_read_timeout, + connect_timeout: config.mysql_connect_timeout + } + end - def execute(statement) - query(statement).first - end + def close + client.close + rescue + # silence close + end - def query_all(*statements) - query(*statements) - end + def execute(statement) + query(statement).first + end - private + def query_all(*statements) + query(*statements) + end - def client - @client ||= ::Mysql2::Client.new(@client_options) - rescue Mysql2::Error => error - raise ::Sphinx::Integration::QueryExecutionError.new(error) - end + private - def close_and_clear - close - @client = nil - end + def client + @client ||= ::Mysql2::Client.new(@client_options) + rescue Mysql2::Error => error + raise ::Sphinx::Integration::QueryExecutionError.new(error) + end - def query(*statements) - results_for(*statements) - rescue => error - human_statements = statements.join('; ') - message = "#{error.message} - #{human_statements}" - wrapper = ::Sphinx::Integration::QueryExecutionError.new message - wrapper.statement = human_statements - raise wrapper - end + def close_and_clear + close + @client = nil + end + + def query(*statements) + results_for(*statements) + rescue => error + human_statements = statements.join('; ') + message = "#{error.message} - #{human_statements}" + wrapper = ::Sphinx::Integration::QueryExecutionError.new message + wrapper.statement = human_statements + raise wrapper + end - def results_for(*statements) - results = [client.query(statements.join('; '))] - results << client.store_result while client.next_result - results + def results_for(*statements) + results = [client.query(statements.join('; '))] + results << client.store_result while client.next_result + results + end + end + end end end diff --git a/lib/sphinx/integration/mysql/connection_pool.rb b/lib/sphinx/integration/mysql/connection_pool.rb index d19f968..f122e80 100644 --- a/lib/sphinx/integration/mysql/connection_pool.rb +++ b/lib/sphinx/integration/mysql/connection_pool.rb @@ -1,126 +1,62 @@ # coding: utf-8 -require 'innertube' +require "innertube" -module Sphinx::Integration::Mysql::ConnectionPool - MAXIMUM_RETRIES = 2 +module Sphinx + module Integration + module Mysql + class ConnectionPool + MAXIMUM_RETRIES = 2 - @slaves_pool = {} - @master_lock = Mutex.new - @slaves_lock = Mutex.new + def initialize(server) + @server = server - class << self - def take(pool = nil) - retries = 0 - original = nil - begin - (pool || master_pool).take do |connection| - begin - yield connection - # Если ошибка сфинкса нам не ведома - удаляем ресурс - rescue Mysql2::Error => error - original = error - raise Innertube::Pool::BadResource - rescue ::Sphinx::Integration::QueryExecutionError => error - original = ::Sphinx::Integration::SphinxError.new_from_mysql(error) - - case original - # Если сфинкс недосутпен - удаляем ресурс - when ::Sphinx::Integration::ConnectionError - raise Innertube::Pool::BadResource - # Если ошибка в запросе, переповторять запрос не будем - when ::Sphinx::Integration::QueryError - retries += MAXIMUM_RETRIES - end - - raise ::Sphinx::Integration::Retry - end - end - rescue Innertube::Pool::BadResource, ::Sphinx::Integration::Retry - retries += 1 - - if retries >= MAXIMUM_RETRIES - ::ThinkingSphinx.error(original) - raise original - else - ::ThinkingSphinx.info "Retrying. #{original.message}" - retry + @pool = Innertube::Pool.new( + proc { Connection.new(server.host, server.port) }, + proc { |connection| connection.close } + ) end - end - end - def take_slaves(&block) - @agents ||= ThinkingSphinx::Configuration.instance.agents - errors = [] + def take + retries = 0 + original = nil - @agents.each do |agent_name, _| - begin - take(slaves_pool(agent_name), &block) - rescue ::Innertube::Pool::BadResource, - ::Mysql2::Error, - ::Sphinx::Integration::QueryExecutionError, - ::Sphinx::Integration::SphinxError => error - errors << error + begin + @pool.take do |connection| + begin + yield connection + # Если ошибка сфинкса нам не ведома - удаляем ресурс + rescue Mysql2::Error => error + original = error + raise Innertube::Pool::BadResource + rescue ::Sphinx::Integration::QueryExecutionError => error + original = ::Sphinx::Integration::SphinxError.new_from_mysql(error) + + case original + # Если сфинкс недосутпен - удаляем ресурс + when ::Sphinx::Integration::ConnectionError + raise Innertube::Pool::BadResource + # Если ошибка в запросе, переповторять запрос не будем + when ::Sphinx::Integration::QueryError + retries += MAXIMUM_RETRIES + end + + raise ::Sphinx::Integration::Retry + end + end + rescue Innertube::Pool::BadResource, ::Sphinx::Integration::Retry + retries += 1 + + if retries >= MAXIMUM_RETRIES + @server.error_rate << 1 + ::ThinkingSphinx.error(original) + raise original + else + ::ThinkingSphinx.info "Retrying. #{original.message}" + retry + end + end end end - - # Райзим ошибку, только если она получаена от обоих реплик - return if errors.size < @agents.size - raise errors[0] - end - - def master_connection - configuration = ThinkingSphinx::Configuration.instance.configuration - options = options_for_connection(configuration.searchd.address, - configuration.searchd.mysql41) - Sphinx::Integration::Mysql::Connection.new(options) - end - - def slave_connection(agent_name) - agent = ThinkingSphinx::Configuration.instance.agents[agent_name] - options = options_for_connection(agent[:address], agent[:mysql41]) - Sphinx::Integration::Mysql::Connection.new(options) - end - - private - - def host_prepared(host) - host ||= '127.0.0.1' - # If you use localhost, MySQL insists on a socket connection, but Sphinx - # requires a TCP connection. Using 127.0.0.1 fixes that. - host = '127.0.0.1' if host == 'localhost' - host - end - - def port_prepared(port) - port.is_a?(TrueClass) ? 9306 : port - end - - def options_for_connection(host, port) - { - host: host_prepared(host), - port: port_prepared(port), - reconnect: true, - read_timeout: ::ThinkingSphinx::Configuration.instance.mysql_read_timeout, - connect_timeout: ::ThinkingSphinx::Configuration.instance.mysql_connect_timeout - } - end - - def master_pool - @master_lock.synchronize do - @master_pool ||= Innertube::Pool.new( - proc { ::Sphinx::Integration::Mysql::ConnectionPool.master_connection }, - proc { |connection| connection.close } - ) - end - end - - def slaves_pool(agent_name) - @slaves_lock.synchronize do - @slaves_pool[agent_name] ||= Innertube::Pool.new( - proc { ::Sphinx::Integration::Mysql::ConnectionPool.slave_connection(agent_name) }, - proc { |connection| connection.close } - ) - end end end end diff --git a/lib/sphinx/integration/railtie.rb b/lib/sphinx/integration/railtie.rb index db31419..6b4d49b 100644 --- a/lib/sphinx/integration/railtie.rb +++ b/lib/sphinx/integration/railtie.rb @@ -1,5 +1,4 @@ # coding: utf-8 - require 'rails' require 'thinking-sphinx' require 'sphinx-integration' @@ -9,7 +8,6 @@ class Railtie < Rails::Railtie initializer 'sphinx_integration.configuration', :before => 'thinking_sphinx.set_app_root' do ThinkingSphinx::Configuration.send :include, Sphinx::Integration::Extensions::ThinkingSphinx::Configuration - Riddle::Configuration::Searchd.send :include, Sphinx::Integration::Extensions::Riddle::Configuration::Searchd ThinkingSphinx.database_adapter = :postgresql end @@ -18,10 +16,8 @@ class Railtie < Rails::Railtie Riddle::Query::Insert, Riddle::Query::Select, Riddle::Configuration, - Riddle::Configuration::DistributedIndex, Riddle::Client, ThinkingSphinx, - ThinkingSphinx::Context, ThinkingSphinx::Configuration, ThinkingSphinx::Attribute, ThinkingSphinx::Source, @@ -79,6 +75,10 @@ class Railtie < Rails::Railtie end end + config.after_initialize do + ThinkingSphinx.context.define_indexes + end + rake_tasks do load 'sphinx/integration/tasks.rake' end diff --git a/lib/sphinx/integration/searchd.rb b/lib/sphinx/integration/searchd.rb index d017590..67da673 100644 --- a/lib/sphinx/integration/searchd.rb +++ b/lib/sphinx/integration/searchd.rb @@ -2,6 +2,7 @@ module Sphinx module Integration module Searchd autoload :ConnectionPool, "sphinx/integration/searchd/connection_pool" + autoload :Connection, "sphinx/integration/searchd/connection" end end end diff --git a/lib/sphinx/integration/searchd/connection.rb b/lib/sphinx/integration/searchd/connection.rb new file mode 100644 index 0000000..ebf2c80 --- /dev/null +++ b/lib/sphinx/integration/searchd/connection.rb @@ -0,0 +1,34 @@ +module Sphinx + module Integration + module Searchd + class Connection + def initialize(host, port) + @host = host + @port = port + end + + def socket + return @socket if @socket + + @socket = TCPSocket.new(@host, @port) + check_version + @socket.send [1].pack('N'), 0 + core_header = [::Riddle::Client::Commands[:persist], 0, 4].pack("nnN") + @socket.send core_header + [1].pack('N'), 0 + + @socket + end + + private + + def check_version + version = @socket.recv(4).unpack('N*').first + return unless version < 1 + @socket.close + raise ::Riddle::VersionError, + "Can only connect to searchd version 1.0 or better, not version #{version}" + end + end + end + end +end diff --git a/lib/sphinx/integration/searchd/connection_pool.rb b/lib/sphinx/integration/searchd/connection_pool.rb index 279460e..1ae57d0 100644 --- a/lib/sphinx/integration/searchd/connection_pool.rb +++ b/lib/sphinx/integration/searchd/connection_pool.rb @@ -7,65 +7,38 @@ module Searchd class ConnectionPool MAXIMUM_RETRIES = 3 - @pool = {} - @lock = Mutex.new + def initialize(server) + @server = server - class << self - # Open or reuse socket connection - # - # options - Hash - # :server - String (required) - # :port - Integer (required) - def take(options) - retries = 0 - original = nil - begin - pool(options).take do |socket| - begin - yield socket - rescue Exception => error - original = error - raise Innertube::Pool::BadResource - end - end - rescue Innertube::Pool::BadResource - retries += 1 + @pool = Innertube::Pool.new( + proc { Connection.new(server.host, server.port) }, + proc { |connection| connection.socket.close rescue nil } + ) + end - if retries >= MAXIMUM_RETRIES - ::ThinkingSphinx.error(original) - raise ::Riddle::ConnectionError, "Connection to #{options.inspect} failed. #{original.message}" - else - ::ThinkingSphinx.info "Retrying. #{original.message}" - retry + def take + retries = 0 + original = nil + + begin + @pool.take do |connection| + begin + yield connection + rescue Exception => error + original = error + raise Innertube::Pool::BadResource end end - end - - def socket(options) - socket = TCPSocket.new(options.fetch(:server), options.fetch(:port)) - - version = socket.recv(4).unpack('N*').first - if version < 1 - socket.close - raise ::Riddle::VersionError, - "Can only connect to searchd version 1.0 or better, not version #{version}" - end - socket.send [1].pack('N'), 0 - - core_header = [::Riddle::Client::Commands[:persist], 0, 4].pack("nnN") - socket.send core_header + [1].pack('N'), 0 - - socket - end - - private - - def pool(options) - @lock.synchronize do - @pool[options.fetch(:server)] ||= Innertube::Pool.new( - proc { ::Sphinx::Integration::Searchd::ConnectionPool.socket(options) }, - proc { |socket| socket.close rescue nil } - ) + rescue Innertube::Pool::BadResource + retries += 1 + + if retries >= MAXIMUM_RETRIES + @server.error_rate << 1 + ::ThinkingSphinx.error(original) + raise original + else + ::ThinkingSphinx.info "Retrying. #{original.message}" + retry end end end diff --git a/lib/sphinx/integration/server.rb b/lib/sphinx/integration/server.rb new file mode 100644 index 0000000..1fddc1e --- /dev/null +++ b/lib/sphinx/integration/server.rb @@ -0,0 +1,27 @@ +module Sphinx + module Integration + class Server + attr_reader :host, :port, :error_rate, :pool + + def initialize(host, port, options = {}) + @host = host + @port = port + @error_rate = ::Sphinx::Integration::Decaying.new + + @pool = if options.fetch(:mysql, true) + Mysql::ConnectionPool.new(self) + else + Searchd::ConnectionPool.new(self) + end + end + + def take + pool.take { |connection| yield connection } + end + + def to_s + "" + end + end + end +end diff --git a/lib/sphinx/integration/server_pool.rb b/lib/sphinx/integration/server_pool.rb new file mode 100644 index 0000000..5de4ccd --- /dev/null +++ b/lib/sphinx/integration/server_pool.rb @@ -0,0 +1,66 @@ +module Sphinx + module Integration + class ServerPool + def initialize(hosts, port, options = {}) + @servers = Array.wrap(hosts).map { |host| Server.new(host, port, options) } + end + + def take + skip_servers = Set.new + server = choose(skip_servers) + + begin + yield server + rescue Exception + skip_servers << server + + if skip_servers.size >= @servers.size + ::ThinkingSphinx.fatal("Error on servers: #{skip_servers.map(&:to_s).join(", ")}") + raise + else + server = choose(skip_servers) + ::ThinkingSphinx.info("Retrying with next server #{server}") + retry + end + end + end + + def take_all + skip_servers = Set.new + + @servers.each do |server| + begin + yield server + rescue Exception + skip_servers << server + + if skip_servers.size >= @servers.size + ::ThinkingSphinx.fatal("Error on servers: #{skip_servers.map(&:to_s).join(", ")}") + raise + else + ::ThinkingSphinx.info("Error on server #{server}") + end + end + end + end + + private + + def choose(skip_servers) + if skip_servers.any? + servers = @servers.select { |server| !skip_servers.include?(server) } + else + servers = @servers + end + + best_servers = servers.select { |server| server.error_rate.value < 0.1 } + + if best_servers.empty? + @servers.min_by { |server| server.error_rate.value } + else + best_servers.sample + end + end + end + end +end diff --git a/lib/sphinx/integration/tasks.rake b/lib/sphinx/integration/tasks.rake index e817b70..6c7f8e8 100644 --- a/lib/sphinx/integration/tasks.rake +++ b/lib/sphinx/integration/tasks.rake @@ -36,34 +36,33 @@ namespace :thinking_sphinx do end namespace :sphinx do - desc 'Start Sphinx' - task :start, [:node] => :environment do |_, args| - Sphinx::Integration::Helper.new(args[:node]).start + task :start, [:host] => :environment do |_, args| + Sphinx::Integration::Helper.new(args).start end desc 'Stop Sphinx' - task :stop, [:node] => :environment do |_, args| - Sphinx::Integration::Helper.new(args[:node]).stop + task :stop, [:host] => :environment do |_, args| + Sphinx::Integration::Helper.new(args).stop end desc 'Restart Sphinx' - task :restart, [:node] => :environment do |_, args| - Sphinx::Integration::Helper.new(args[:node]).restart + task :restart, [:host] => :environment do |_, args| + Sphinx::Integration::Helper.new(args).restart end desc 'Index Sphinx' - task :index, [:node, :offline] => :environment do |_, args| + task :index, [:host, :offline] => :environment do |_, args| Rails.application.eager_load! - is_offline = args[:offline].present? && %w(true yes y da offline).include?(args[:offline]) - Sphinx::Integration::Helper.new(args[:node]).index(!is_offline) + is_offline = (offline = args.delete(:offline)).present? && %w(true yes y da offline).include?(offline) + Sphinx::Integration::Helper.new(args).index(!is_offline) end desc 'Rebuild Sphinx' - task :rebuild, [:node] => :environment do |_, args| + task :rebuild => :environment do Rails.application.eager_load! - Sphinx::Integration::Helper.new(args[:node]).rebuild + Sphinx::Integration::Helper.new.rebuild end desc 'Generate configuration files' @@ -78,18 +77,22 @@ namespace :sphinx do end desc 'Copy configuration files' - task :copy_conf, [:node] => :environment do |_, args| - Sphinx::Integration::Helper.new(args[:node]).copy_config + task :copy_conf, [:host] => :environment do |_, args| + Sphinx::Integration::Helper.new(args).copy_config end desc 'Remove indexes files' - task :rm_indexes, [:node] => :environment do |_, args| - Sphinx::Integration::Helper.new(args[:node]).remove_indexes + task :rm_indexes, [:host] => :environment do |_, args| + Sphinx::Integration::Helper.new(args).remove_indexes end desc 'Remove binlog files' - task :rm_binlog, [:node] => :environment do |_, args| - Sphinx::Integration::Helper.new(args[:node]).remove_binlog + task :rm_binlog, [:host] => :environment do |_, args| + Sphinx::Integration::Helper.new(args).remove_binlog end + desc 'Reload config or rotate indexes' + task :reload, [:host] => :environment do |_, args| + Sphinx::Integration::Helper.new(args).reload + end end diff --git a/lib/sphinx/integration/transmitter.rb b/lib/sphinx/integration/transmitter.rb index d4266f3..82ab572 100644 --- a/lib/sphinx/integration/transmitter.rb +++ b/lib/sphinx/integration/transmitter.rb @@ -8,6 +8,7 @@ class Transmitter class_attribute :write_disabled delegate :full_reindex?, to: :'Sphinx::Integration::Helper' + delegate :mysql_client, to: :"ThinkingSphinx::Configuration.instance" def initialize(klass) @klass = klass @@ -37,12 +38,8 @@ def delete(record) return false if write_disabled? rt_indexes do |index| - partitions { |partition| ::ThinkingSphinx.delete(index.rt_name_w(partition), record.sphinx_document_id) } - - if record.exists_in_sphinx?(index.core_name) - ::ThinkingSphinx.soft_delete(index.core_name_w, record.sphinx_document_id) - end - + partitions { |partition| mysql_client.delete(index.rt_name(partition), record.sphinx_document_id) } + mysql_client.soft_delete(index.core_name, record.sphinx_document_id) Sphinx::Integration::WasteRecords.for(index).add(record.sphinx_document_id) if full_reindex? end @@ -75,17 +72,17 @@ def update_fields(fields, where) rt_indexes do |index| if full_reindex? # вначале обновим всё что уже есть в rt индексе - partitions { |i| ThinkingSphinx.update(index.rt_name_w(i), fields, where) } + partitions { |partition| mysql_client.update(index.rt_name(partition), fields, where) } # и зареплейсим всё что осталось в core # TODO: implement sphinx transactions batch_options = {where: where, matching: matching} - ThinkingSphinx.find_in_batches(index.core_name, batch_options) do |ids| + mysql_client.find_in_batches(index.core_name, batch_options) do |ids| klass.where(id: ids).each { |record| transmit(index, record) } sleep 1 # empirical number end else - ThinkingSphinx.update(index.name_w, fields, where) + mysql_client.update(index.name, fields, where) end end end @@ -102,9 +99,9 @@ def transmit(index, record) data = transmitted_data(index, record) return unless data - partitions { |i| ThinkingSphinx.replace(index.rt_name(i), data) } + partitions { |partition| mysql_client.replace(index.rt_name(partition), data) } - ThinkingSphinx.soft_delete(index.core_name_w, record.sphinx_document_id) + mysql_client.soft_delete(index.core_name, record.sphinx_document_id) Sphinx::Integration::WasteRecords.for(index).add(record.sphinx_document_id) if full_reindex? end diff --git a/lib/sphinx/integration/waste_records.rb b/lib/sphinx/integration/waste_records.rb index ab53c49..a276162 100644 --- a/lib/sphinx/integration/waste_records.rb +++ b/lib/sphinx/integration/waste_records.rb @@ -3,6 +3,8 @@ module Integration # This class stores records which updated when full reindex running. # Records has been removed from core index after ending reindex. class WasteRecords + delegate :mysql_client, to: :"ThinkingSphinx::Configuration.instance" + def self.for(index) @instances ||= {} @instances[index.name] ||= new(index) @@ -23,11 +25,12 @@ def cleanup ids = Redis.current.smembers(redis_key) return if ids.blank? - index_core_name = @index.core_name_w + index_core_name = @index.core_name + mysql_client ids.each_slice(3_000) do |slice| slice = slice.map!(&:to_i) - ThinkingSphinx.soft_delete(index_core_name, slice) + mysql_client.soft_delete(index_core_name, slice) end reset diff --git a/spec/internal/config/sphinx.yml b/spec/internal/config/sphinx.yml new file mode 100644 index 0000000..a09c1f3 --- /dev/null +++ b/spec/internal/config/sphinx.yml @@ -0,0 +1,2 @@ +test: + version: 2.0.3 diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9ec0a56..c1cd8cb 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,7 +1,9 @@ # coding: utf-8 - -require 'rubygems' require 'bundler' + +require 'simplecov' +SimpleCov.start { minimum_coverage 85 } + require 'pry-debugger' Bundler.require :default, :development @@ -15,13 +17,16 @@ require 'mock_redis' require 'redis-classy' +Redis.current = MockRedis.new +Redis::Classy.db = Redis.current + +require "support/helpers/sphinx_conf" + RSpec.configure do |config| - config.backtrace_exclusion_patterns = [/lib\/rspec\/(core|expectations|matchers|mocks)/] - config.color_enabled = true - config.order = 'random' + include SphinxConf config.before(:each) do - Redis.current = MockRedis.new - Redis::Classy.db = Redis.current + Redis.current.flushdb + ThinkingSphinx::Configuration.instance.reset end end diff --git a/spec/sphinx/integration/extensions/thinking_sphinx/active_record_spec.rb b/spec/sphinx/integration/extensions/thinking_sphinx/active_record_spec.rb index 32a8467..c082e00 100644 --- a/spec/sphinx/integration/extensions/thinking_sphinx/active_record_spec.rb +++ b/spec/sphinx/integration/extensions/thinking_sphinx/active_record_spec.rb @@ -2,34 +2,36 @@ require 'spec_helper' describe ActiveRecord::Base do + describe '.max_matches' do + context "when defined in config" do + before { stub_sphinx_conf(max_matches: 3_000) } - before(:all){ ThinkingSphinx.context.define_indexes } + it { expect(ActiveRecord::Base.max_matches).to eq 3_000 } + end - describe '.max_matches' do - subject { ActiveRecord::Base.max_matches } - it { should be_a(Integer) } - it { should eq 5000 } + context "when defined in config" do + it { expect(ActiveRecord::Base.max_matches).to eq 5_000 } + end end describe '.define_secondary_index' do - subject { ModelWithSecondDisk.sphinx_indexes.detect{ |x| x.name == 'model_with_second_disk_delta' } } - it { should_not be_nil } - its(:merged_with_core) { should be_true } + let(:index) { ModelWithSecondDisk.sphinx_indexes.detect { |x| x.name == 'model_with_second_disk_delta' } } + + it do + expect(index.merged_with_core).to be true + end end describe '.reset_indexes' do - before { ModelWithDisk.reset_indexes } - after { ModelWithDisk.define_indexes } - - subject { ModelWithDisk } - its(:sphinx_index_blocks) { should be_empty } - its(:sphinx_indexes) { should be_empty } - its(:sphinx_facets) { should be_empty } - its(:defined_indexes?) { should be_false } + it do + ModelWithDisk.reset_indexes + expect(ModelWithDisk.sphinx_index_blocks).to be_empty + expect(ModelWithDisk.sphinx_indexes).to be_empty + expect(ModelWithDisk.sphinx_facets).to be_empty + end end describe '.rt_indexed_by_sphinx?' do - it { ModelWithRt.rt_indexed_by_sphinx?.should be_true } + it { expect(ModelWithRt.rt_indexed_by_sphinx?).to be true } end - -end \ No newline at end of file +end diff --git a/spec/sphinx/integration/extensions/thinking_sphinx/configuration_spec.rb b/spec/sphinx/integration/extensions/thinking_sphinx/configuration_spec.rb index 90f0a08..f3fde90 100644 --- a/spec/sphinx/integration/extensions/thinking_sphinx/configuration_spec.rb +++ b/spec/sphinx/integration/extensions/thinking_sphinx/configuration_spec.rb @@ -2,84 +2,18 @@ require 'spec_helper' describe ThinkingSphinx::Configuration do - let(:config) { ThinkingSphinx::Configuration.instance } - let(:base_options) do - {'test' => {}} - end - let(:spec_options) { {} } - - before do - config_path = "#{config.app_root}/config/sphinx.yml" - File.stub(:exists?).with(config_path).and_return(true) - IO.stub(:read).with(config_path).and_return(base_options.deep_merge(spec_options).to_yaml) - config.reset - end describe '`remote` option' do context 'when default' do - it { expect(config.remote?).to be_false } - end - - context 'when enabled' do - let(:spec_options){ {'test' => {'remote' => true}} } - it { expect(config.remote?).to be_true } - end - end - - describe '`replication` option' do - context 'when default' do - it { expect(config.replication?).to be_false } + it { expect(config.remote?).to be false } end context 'when enabled' do - let(:spec_options){ {'test' => {'replication' => true}} } - it { expect(config.replication?).to be_true } - end - end - - describe '`agents` option' do - context 'when default' do - it { expect(config.agents).to be_empty } - end - - context 'when specified' do - let(:spec_options){ {'test' => {'agents' => {'slave' => {'address' => 'index', 'port' => 123, 'mysql41' => 321, 'name' => 'slave'}}}} } - it { expect(config.agents).to have(1).item } - it { expect(config.agents).to eq spec_options['test']['agents'] } - end - end - - describe '`agent_connect_timeout` option' do - context 'when default' do - it { expect(config.agent_connect_timeout).to eq 50 } - end - - context 'when specified' do - let(:spec_options){ {'test' => {'agent_connect_timeout' => 100}} } - it { expect(config.agent_connect_timeout).to eq 100 } - end - end - - describe '`agent_query_timeout` option' do - context 'when default' do - it { expect(config.agent_query_timeout).to eq 5000 } - end - - context 'when specified' do - let(:spec_options){ {'test' => {'agent_query_timeout' => 100}} } - it { expect(config.agent_query_timeout).to eq 100 } - end - end - - describe '`ha_strategy` option' do - context 'when default' do - it { expect(config.ha_strategy).to eq 'nodeads' } - end - - context 'when specified' do - let(:spec_options){ {'test' => {'ha_strategy' => 'roundrobin'}} } - it { expect(config.ha_strategy).to eq 'roundrobin' } + it do + stub_sphinx_conf(remote: true) + expect(config.remote?).to be true + end end end @@ -89,11 +23,28 @@ end context 'when file have `exclude` section' do - let(:spec_options) do - {'test' => {'exclude' => ['apress/product_denormalization/sphinx_index']}} + it do + stub_sphinx_conf(exclude: ['apress/product_denormalization/sphinx_index']) + expect(config.exclude).to eq ['apress/product_denormalization/sphinx_index'] + end + end + end + + describe "#mysql_client" do + context "when one address" do + it do + stub_sphinx_conf(address: "s1", mysql41: true) + expect(Sphinx::Integration::Mysql::Client).to receive(:new).with(%w(s1), 9306) + config.mysql_client end + end - it { expect(config.exclude).to eq ['apress/product_denormalization/sphinx_index'] } + context "when many addresses" do + it do + stub_sphinx_conf(address: %w(s1 s2), mysql41: 9300) + expect(Sphinx::Integration::Mysql::Client).to receive(:new).with(%w(s1 s2), 9300) + config.mysql_client + end end end end diff --git a/spec/sphinx/integration/extensions/thinking_sphinx/index/builder_spec.rb b/spec/sphinx/integration/extensions/thinking_sphinx/index/builder_spec.rb index 15010f3..7b0f3b0 100644 --- a/spec/sphinx/integration/extensions/thinking_sphinx/index/builder_spec.rb +++ b/spec/sphinx/integration/extensions/thinking_sphinx/index/builder_spec.rb @@ -12,11 +12,10 @@ it { index.local_options[:source_joins].should have(1).item } it { index.local_options[:source_joins][:rubrics].should be_present } - - subject { index.local_options[:source_joins][:rubrics] } - its([:type]) { should eq :left } - its([:on]) { should eq 'rubrics.id = model.rubric_id' } - its([:as]) { should eq :rubs } + it do + join = index.local_options[:source_joins][:rubrics] + expect(join).to include(type: :left, on: "rubrics.id = model.rubric_id", as: :rubs) + end end describe 'inner_join' do @@ -28,11 +27,11 @@ it { index.local_options[:source_joins].should have(1).item } it { index.local_options[:source_joins][:rubrics].should be_present } - - subject { index.local_options[:source_joins][:rubrics] } - its([:type]) { should eq :inner } - its([:on]) { should eq 'rubrics.id = model.rubric_id' } - its([:as]) { should eq :rubs } + it do + index.local_options[:source_joins][:rubrics] + join = index.local_options[:source_joins][:rubrics] + expect(join).to include(type: :inner, on: "rubrics.id = model.rubric_id", as: :rubs) + end end describe 'delete_joins' do @@ -113,4 +112,4 @@ it { index.local_options[:source_cte].should have(1).item } it { index.local_options[:source_cte][:_rubrics2].should == "select id from rubrics2" } end -end \ No newline at end of file +end diff --git a/spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb b/spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb index 868ec7a..a6ae2f2 100644 --- a/spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb +++ b/spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb @@ -11,37 +11,24 @@ end describe '#to_riddle_with_merged' do - context 'when single or slave mode' do - let(:result) { index.to_riddle(0, :single) } + let(:result) { index.to_riddle(0) } - it 'generate core index' do - result. - select{ |x| x.is_a?(Riddle::Configuration::Index) }. - should have(1).items - end - - it 'generate rt index' do - result. - select{ |x| x.is_a?(Riddle::Configuration::RealtimeIndex) }. - should have(2).items - end - - it 'generate distributed index' do - result. - select{ |x| x.is_a?(Riddle::Configuration::DistributedIndex) }. - should have(1).item - end + it 'generate core index' do + result. + select { |x| x.is_a?(Riddle::Configuration::Index) }. + should have(1).items end - context 'when master mode' do - let(:result) { index.to_riddle(0, :master) } - let(:agents) { {'slave' => {'address' => 'slave0', 'port' => 10, 'mysql41' => 100}} } - - before { index.send(:config).stub(:agents).and_return(agents) } + it 'generate rt index' do + result. + select { |x| x.is_a?(Riddle::Configuration::RealtimeIndex) }. + should have(2).items + end - it 'all distributed' do - result.all? { |x| x.is_a?(Riddle::Configuration::DistributedIndex) } - end + it 'generate distributed index' do + result. + select { |x| x.is_a?(Riddle::Configuration::DistributedIndex) }. + should have(1).item end end diff --git a/spec/sphinx/integration/extensions/thinking_sphinx/statements_spec.rb b/spec/sphinx/integration/extensions/thinking_sphinx/statements_spec.rb deleted file mode 100644 index 6d212f9..0000000 --- a/spec/sphinx/integration/extensions/thinking_sphinx/statements_spec.rb +++ /dev/null @@ -1,77 +0,0 @@ -require "spec_helper" - -describe ThinkingSphinx do - describe ".replace" do - it do - expect(ThinkingSphinx). - to receive(:execute).with("REPLACE INTO product (`company_id`) VALUES (1)", on_slaves: false) - ThinkingSphinx.replace("product", company_id: 1) - end - end - - describe ".update" do - it do - expect(ThinkingSphinx). - to receive(:execute).with("UPDATE product SET company_id = 1 WHERE `id` = 1") - - ThinkingSphinx.update("product", {company_id: 1}, id: 1) - end - end - - describe ".delete" do - it do - expect(ThinkingSphinx). - to receive(:execute).with("DELETE FROM product WHERE id = 1") - - ThinkingSphinx.delete("product", 1) - end - end - - describe ".soft_delete" do - it do - expect(ThinkingSphinx). - to receive(:execute).with("UPDATE product SET sphinx_deleted = 1 WHERE `id` = 1") - - ThinkingSphinx.soft_delete("product", 1) - end - end - - describe ".select" do - it do - expect(ThinkingSphinx). - to receive(:execute).with("SELECT company_id FROM product WHERE `id` = 1") - - ThinkingSphinx.select("company_id", "product", id: 1) - end - end - - describe ".find_in_batches" do - it do - expect(ThinkingSphinx).to( - receive(:execute). - with("SELECT sphinx_internal_id " + - "FROM product " + - "WHERE MATCH('@company_id_idx 1') " + - "AND `company_id` = 1 AND `sphinx_internal_id` > 0 " + - "ORDER BY `sphinx_internal_id` ASC LIMIT 1"). - and_return([{"sphinx_internal_id" => "1"}]) - ) - - expect(ThinkingSphinx).to( - receive(:execute). - with("SELECT sphinx_internal_id " + - "FROM product " + - "WHERE MATCH('@company_id_idx 1') " + - "AND `company_id` = 1 AND `sphinx_internal_id` > 1 " + - "ORDER BY `sphinx_internal_id` ASC LIMIT 1"). - and_return([]) - ) - - result = [] - ThinkingSphinx.find_in_batches( - "product", - where: {company_id: 1}, matching: "@company_id_idx 1", batch_size: 1) { |ids| result += ids } - expect(result).to eq [1] - end - end -end diff --git a/spec/sphinx/integration/helper_adapters/local_spec.rb b/spec/sphinx/integration/helper_adapters/local_spec.rb new file mode 100644 index 0000000..9ecf655 --- /dev/null +++ b/spec/sphinx/integration/helper_adapters/local_spec.rb @@ -0,0 +1,61 @@ +require "spec_helper" +require "sphinx/integration/helper" + +describe Sphinx::Integration::HelperAdapters::Local do + let!(:rye) { class_double("Rye").as_stubbed_const } + let(:adapter) { described_class.new } + + describe "#stop" do + it do + expect(rye).to receive(:shell).with(:searchd, /--config/, "--stopwait") + adapter.stop + end + end + + describe "#start" do + it do + expect(rye).to receive(:shell).with(:searchd, /--config/) + adapter.start + end + end + + describe "#remove_indexes" do + it do + expect(adapter).to receive(:remove_files).with(%r{db/sphinx/test}) + adapter.remove_indexes + end + end + + describe "#remove_binlog" do + context "when path is empty" do + it do + allow(ThinkingSphinx::Configuration.instance.configuration.searchd).to receive(:binlog_path).and_return("") + expect(adapter).to_not receive(:remove_files) + adapter.remove_binlog + end + end + + context "when path is present" do + it do + expect(adapter).to receive(:remove_files).with(%r{db/sphinx/test}) + adapter.remove_binlog + end + end + end + + describe "#index" do + context "when is online" do + it do + expect(rye).to receive(:shell).with(:indexer, /--config/, "--rotate") + adapter.index(true) + end + end + + context "when is offline" do + it do + expect(rye).to receive(:shell).with(:indexer, /--config/) + adapter.index(false) + end + end + end +end diff --git a/spec/sphinx/integration/helper_adapters/remote_spec.rb b/spec/sphinx/integration/helper_adapters/remote_spec.rb new file mode 100644 index 0000000..59d8e7c --- /dev/null +++ b/spec/sphinx/integration/helper_adapters/remote_spec.rb @@ -0,0 +1,94 @@ +require "spec_helper" +require "sphinx/integration/helper" + +describe Sphinx::Integration::HelperAdapters::Remote do + let!(:ssh) { instance_double("Sphinx::Integration::HelperAdapters::SshProxy") } + let(:adapter) { described_class.new } + + before do + class_double("Sphinx::Integration::HelperAdapters::SshProxy", new: ssh).as_stubbed_const + end + + describe "#running?" do + it do + expect(ssh).to receive(:execute).with("searchd", /--config/, "--status") + adapter.running? + end + end + + describe "#stop" do + it do + expect(ssh).to receive(:execute).with("searchd", /--config/, "--stopwait") + adapter.stop + end + end + + describe "#start" do + it do + expect(ssh).to receive(:execute).with("searchd", /--config/) + adapter.start + end + end + + describe "#remove_indexes" do + it do + expect(adapter).to receive(:remove_files).with(%r{db/sphinx/test}) + adapter.remove_indexes + end + end + + describe "#remove_binlog" do + context "when path is empty" do + it do + allow(ThinkingSphinx::Configuration.instance.configuration.searchd).to receive(:binlog_path).and_return("") + expect(adapter).to_not receive(:remove_files) + adapter.remove_binlog + end + end + + context "when path is present" do + it do + expect(adapter).to receive(:remove_files).with(%r{db/sphinx/test}) + adapter.remove_binlog + end + end + end + + describe "#index" do + before { stub_sphinx_conf(config_file: "/path/sphinx.conf") } + + context "when is online" do + context "when one host" do + it do + expect(ssh).to receive(:execute).with("indexer", "--all", "--config /path/sphinx.conf", "--rotate") + adapter.index(true) + end + end + + context "when many hosts" do + it do + expect(ThinkingSphinx::Configuration.instance).to receive(:address).and_return(%w(s1.dev s2.dev)) + expect(ssh).to receive(:within).with("s1.dev").and_yield + expect(ssh).to receive(:execute). + with("indexer", "--all", "--config /path/sphinx.conf", "--rotate", "--nohup", exit_status: [0, 2]) + server = double("server", opts: {port: 22}, user: "sphinx", host: "s1.dev") + expect(ssh).to receive(:without).with("s1.dev").and_yield(server) + expect(ssh).to receive(:execute).with("rsync", any_args) + expect(ssh).to receive(:execute).with(/^rename/) + expect(ssh).to receive(:execute).with("kill", /SIGHUP/) + + adapter.index(true) + end + end + end + + context "when is offline" do + context "when one host" do + it do + expect(ssh).to receive(:execute).with("indexer", "--all", "--config /path/sphinx.conf") + adapter.index(false) + end + end + end + end +end diff --git a/spec/sphinx/integration/helper_spec.rb b/spec/sphinx/integration/helper_spec.rb index d0ad707..01da135 100644 --- a/spec/sphinx/integration/helper_spec.rb +++ b/spec/sphinx/integration/helper_spec.rb @@ -2,297 +2,60 @@ require 'spec_helper' describe Sphinx::Integration::Helper do - - let(:config) { ThinkingSphinx::Configuration.instance } - let(:current_node) { nil } - let(:helper) { described_class.new(current_node) } - let(:base_options) { {'test' => {}} } - let(:agents) do - { - 'slave1' => { - 'address' => 'sphinx1', - 'port' => 10301, - 'mysql41' => 9301 - }, - 'slave2' => { - 'address' => 'sphinx2', - 'port' => 10302, - 'mysql41' => 9302 - } - } - end - let(:replication_options) { {'replication' => true, 'agents' => agents} } + let(:adapter) { instance_double("Sphinx::Integration::HelperAdapters::Local") } + let(:mysql_client) { instance_double("Sphinx::Integration::Mysql::Client") } + let(:helper) { described_class.new } before do - config_path = "#{config.app_root}/config/sphinx.yml" - File.stub(:exists?).with(config_path).and_return(true) - IO.stub(:read).with(config_path).and_return(base_options.deep_merge('test' => spec_options).to_yaml) - config.reset - - helper.stub( - :master => double('master'), - :agents => double('agents'), - :nodes => double('nodes') - ) + class_double("Sphinx::Integration::Mysql::Client", new: mysql_client).as_stubbed_const + class_double("Sphinx::Integration::HelperAdapters::Local", new: adapter).as_stubbed_const end - describe '#initialize' do - let(:spec_options) { {} } - - it 'define indexes after initialize' do - helper - - expect(ModelWithRt.sphinx_indexes).not_to be_empty + describe "#restart" do + it do + expect(helper).to receive(:stop) + expect(helper).to receive(:start) + helper.restart end end - context 'when local' do - let(:spec_options) { {'remote' => false} } - - describe '#sphinx_running?' do - it { expect(ThinkingSphinx).to receive(:sphinx_running?) } - after { helper.sphinx_running? } - end - - describe '#stop' do - it { expect(helper).to receive(:local_searchd).with('--stopwait') } - after { helper.stop } - end - - describe '#start' do - it { expect(helper).to receive(:local_searchd).with(no_args) } - after { helper.start } + describe "#configure" do + it do + expect(ThinkingSphinx::Configuration.instance).to receive(:build).with(/test\.sphinx\.conf/) + helper.configure end + end - describe '#restart' do + describe "#index" do + context "when online indexing" do it do - expect(helper).to receive(:stop).ordered - expect(helper).to receive(:start).ordered - end - after { helper.restart } - end - - describe '#configure' do - it { expect(helper.send(:config)).to receive(:build) } - after { helper.configure } - end - - describe '#remove_indexes' do - before { Dir.stub(:glob => ['/path/to/data']) } - it { expect(FileUtils).to receive(:rm) } - after { helper.remove_indexes } - end - - describe '#remove_binlog' do - let(:spec_options) { super().merge('binlog_path' => '/path/to/binlog') } - before { Dir.stub(:glob => ['/path/to/binlog']) } - it { expect(FileUtils).to receive(:rm) } - after { helper.remove_binlog } - end - - describe '#copy_config' do - it { expect(helper.send(:config)).to_not receive(:replication?) } - after { helper.copy_config } - end - - describe '#index' do - context 'when offline' do - it do - expect(helper).to receive(:local_indexer).with([]) - expect(helper).to_not receive(:truncate_rt_indexes) - end - after { helper.index(false) } - end - - context 'when online' do - it do - expect(helper).to receive(:local_indexer).with(['--rotate']) - expect(helper).to receive(:truncate_rt_indexes) - end - after { helper.index } + expect(adapter).to receive(:index).with(true) + expect(mysql_client).to receive(:write).with(/TRUNCATE RTINDEX model_with_rt_rt1/) + helper.index(true) + expect(helper.recent_rt.prev).to eq 0 end end - describe '#rebuild' do + context "when offline indexing" do it do - expect(helper).to receive(:stop).ordered - expect(helper).to receive(:configure).ordered - expect(helper).to receive(:copy_config).ordered - expect(helper).to receive(:remove_indexes).ordered - expect(helper).to receive(:remove_binlog).ordered - expect(helper).to receive(:index).with(false).ordered - expect(helper).to receive(:start).ordered + expect(adapter).to receive(:index).with(false) + expect(mysql_client).to_not receive(:write) + helper.index(false) + expect(helper.recent_rt.prev).to eq 1 end - after { helper.rebuild } end end - context 'when remote' do - let(:spec_options) { {'remote' => true, 'remote_path' => '/path/on/remote'} } - - describe '#sphinx_running?' do - it { expect(helper.nodes).to receive(:searchd).with('--status') } - after { helper.sphinx_running? } - end - - describe '#stop' do - it { expect(helper.nodes).to receive(:searchd).with('--stopwait') } - after { helper.stop } - end - - describe '#start' do - it { expect(helper.nodes).to receive(:searchd).with(no_args) } - after { helper.start } - end - - describe '#remove_indexes' do - it { expect(helper.nodes).to receive(:remove_indexes) } - after { helper.remove_indexes } - end - - describe '#remove_indexes' do - let(:spec_options) { super().merge('binlog_path' => '/path/to/binlog') } - it { expect(helper.nodes).to receive(:remove_binlog) } - after { helper.remove_binlog } - end - - describe '#copy_config' do - context 'when single' do - it { expect(helper.master).to receive(:file_upload) } - after { helper.copy_config } - end - - context 'when replication' do - let(:spec_options) { super().merge(replication_options) } - - context 'when work with all' do - it do - expect(helper.send(:config).agents['slave1'][:box]).to receive(:file_upload) - expect(helper.send(:config).agents['slave2'][:box]).to receive(:file_upload) - expect(helper.master).to receive(:file_upload) - end - after { helper.copy_config } - end - - context 'when work with master' do - let(:current_node) { 'master' } - it { expect(helper.master).to receive(:file_upload) } - after { helper.copy_config } - end - - context 'when work with slave2' do - let(:current_node) { 'slave2' } - before { helper.nodes.stub(:boxes => [helper.send(:config).agents['slave2'][:box]] ) } - it do - expect(helper.send(:config).agents['slave1'][:box]).to_not receive(:file_upload) - expect(helper.send(:config).agents['slave2'][:box]).to receive(:file_upload) - expect(helper.master).to_not receive(:file_upload) - end - after { helper.copy_config } - end - end - end - - describe '#index' do - context 'when single' do - before { expect(ThinkingSphinx).to receive(:set_last_indexing_finish_time) } - - context 'when online' do - it do - expect(helper.master).to receive(:indexer) - expect(helper).to receive(:truncate_rt_indexes) - end - after { helper.index } - end - - context 'when offline' do - it do - expect(helper.master).to receive(:indexer) - expect(helper).to_not receive(:truncate_rt_indexes) - end - after { helper.index(false) } - end - end - - context 'when replication' do - let(:spec_options) { super().merge(replication_options) } - - context 'when work with master' do - let(:current_node) { 'master' } - - before { expect(ThinkingSphinx).to_not receive(:set_last_indexing_finish_time) } - - it { expect { helper.index }.to raise_error } - end - - context 'when work with all' do - before { expect(ThinkingSphinx).to receive(:set_last_indexing_finish_time) } - - it { expect(helper).to receive(:full_reindex_with_replication) } - after { helper.index } - end - - context 'when work with slave2' do - let(:current_node) { 'slave2' } - - before { expect(ThinkingSphinx).to receive(:set_last_indexing_finish_time) } - - context 'when online' do - it do - expect(helper.nodes).to receive(:indexer).with(['--rotate', "--config %REMOTE_PATH%/conf/sphinx.conf"]) - expect(helper).to receive(:truncate_rt_indexes) - end - after { helper.index } - end - - context 'when offline' do - it do - expect(helper.nodes).to receive(:indexer).with(["--config %REMOTE_PATH%/conf/sphinx.conf"]) - expect(helper).to receive(:truncate_rt_indexes).with(helper.recent_rt.prev) - end - after { helper.index(false) } - end - end - end - end - - describe '#full_reindex_with_replication' do - let(:spec_options) { super().merge(replication_options) } - let(:slave1) { double('slave1') } - let(:slave2) { double('slave2') } - - before do - helper.agents.stub(:boxes => [slave1, slave2]) - helper.send(:config).agents['slave1'][:box] = slave1 - helper.send(:config).agents['slave2'][:box] = slave2 - end - - context 'when online' do - it do - expect(slave1).to receive(:execute).with(/sed/) - expect(slave1).to receive(:indexer) - expect(slave1).to receive(:rm) - - expect(slave1).to receive(:user) - expect(slave1).to receive(:host) - expect(helper.agents).to receive(:execute).with(/rsync/) - expect(helper.agents).to receive(:kill).with(/SIGHUP/) - expect(helper).to receive(:truncate_rt_indexes) - end - after { helper.send(:full_reindex_with_replication) } - end - - context 'when offline' do - it do - expect(slave1).to receive(:indexer) - - expect(slave1).to receive(:user) - expect(slave1).to receive(:host) - expect(helper.agents).to receive(:execute).with(/rsync/) - expect(helper).to_not receive(:truncate_rt_indexes) - end - after { helper.send(:full_reindex_with_replication, false) } - end + describe "#rebuild" do + it do + expect(helper).to receive(:stop) + expect(helper).to receive(:configure) + expect(helper).to receive(:copy_config) + expect(helper).to receive(:remove_indexes) + expect(helper).to receive(:remove_binlog) + expect(helper).to receive(:index).with(false) + expect(helper).to receive(:start) + helper.rebuild end end end diff --git a/spec/sphinx/integration/mysql/client_spec.rb b/spec/sphinx/integration/mysql/client_spec.rb new file mode 100644 index 0000000..7d54901 --- /dev/null +++ b/spec/sphinx/integration/mysql/client_spec.rb @@ -0,0 +1,85 @@ +require "spec_helper" + +describe Sphinx::Integration::Mysql::Client do + let(:client) { described_class.new(%w(s1.dev s2.dev), 9300) } + let(:connection) { instance_double(Sphinx::Integration::Mysql::Connection) } + + before do + class_double(Sphinx::Integration::Mysql::Connection, new: connection).as_stubbed_const + end + + describe "#read" do + it do + expect(connection).to receive(:execute).with("select 1").once + client.read("select 1") + end + end + + describe "#write" do + it do + expect(connection).to receive(:execute).with("update index set v = 1").twice + client.write("update index set v = 1") + end + end + + describe "#replace" do + it do + expect(connection).to receive(:execute).with("REPLACE INTO product (`company_id`) VALUES (1)").twice + client.replace("product", company_id: 1) + end + end + + describe "#update" do + it do + expect(connection).to receive(:execute).with("UPDATE product SET company_id = 1 WHERE `id` = 1").twice + client.update("product", {company_id: 1}, id: 1) + end + end + + describe "#delete" do + it do + expect(connection).to receive(:execute).with("DELETE FROM product WHERE id = 1").twice + client.delete("product", 1) + end + end + + describe "#soft_delete" do + it do + expect(connection).to receive(:execute).with("UPDATE product SET sphinx_deleted = 1 WHERE `id` = 1").twice + client.soft_delete("product", 1) + end + end + + describe "#select" do + it do + expect(connection).to receive(:execute).with("SELECT company_id FROM product WHERE `id` = 1").once + client.select("company_id", "product", id: 1) + end + end + + describe ".find_in_batches" do + it do + expect(connection).to receive(:execute).with( + "SELECT sphinx_internal_id " + + "FROM product " + + "WHERE MATCH('@company_id_idx 1') " + + "AND `company_id` = 1 AND `sphinx_internal_id` > 0 " + + "ORDER BY `sphinx_internal_id` ASC LIMIT 1" + ).and_return([{"sphinx_internal_id" => "1"}]) + + expect(connection).to receive(:execute).with( + "SELECT sphinx_internal_id " + + "FROM product " + + "WHERE MATCH('@company_id_idx 1') " + + "AND `company_id` = 1 AND `sphinx_internal_id` > 1 " + + "ORDER BY `sphinx_internal_id` ASC LIMIT 1" + ).and_return([]) + + result = [] + client.find_in_batches( + "product", + where: {company_id: 1}, matching: "@company_id_idx 1", batch_size: 1) { |ids| result += ids } + expect(result).to eq [1] + end + end +end diff --git a/spec/sphinx/integration/server_pool_spec.rb b/spec/sphinx/integration/server_pool_spec.rb new file mode 100644 index 0000000..68c2a4f --- /dev/null +++ b/spec/sphinx/integration/server_pool_spec.rb @@ -0,0 +1,49 @@ +require "spec_helper" + +describe Sphinx::Integration::ServerPool do + let(:pool) { described_class.new(%w(s1 s2), 9306) } + + describe "#take" do + it "take one random server from list" do + expect { |b| pool.take(&b) }.to yield_control.once + end + + it "call second server if first server dies" do + calls_count = 0 + + pool.take do |_| + calls_count += 1 + raise if calls_count == 1 + end + + expect(calls_count).to eq 2 + end + + it "fails if all servers dies" do + expect { pool.take { |_| raise } }.to raise_error + end + end + + describe "#take_all" do + it "iterates by all servers" do + expect { |b| pool.take_all(&b) }.to yield_control.twice + end + + it "doesnt fail if only one server die" do + calls_count = 0 + + expect do + pool.take_all do |_| + calls_count += 1 + raise if calls_count == 1 + end + end.to_not raise_error + + expect(calls_count).to eq 2 + end + + it "fails if all servers dies" do + expect { pool.take_all { |_| raise } }.to raise_error + end + end +end diff --git a/spec/sphinx/integration/transmitter_spec.rb b/spec/sphinx/integration/transmitter_spec.rb index ae373f7..21aa9b5 100644 --- a/spec/sphinx/integration/transmitter_spec.rb +++ b/spec/sphinx/integration/transmitter_spec.rb @@ -4,6 +4,11 @@ describe Sphinx::Integration::Transmitter do let(:transmitter) { described_class.new(ModelWithRt) } let(:record) { mock_model ModelWithRt } + let(:mysql_client) do + client = double("mysql client") + allow(ThinkingSphinx::Configuration.instance).to receive(:mysql_client).and_return(client) + client + end before(:all){ ThinkingSphinx.context.define_indexes } @@ -17,25 +22,28 @@ end describe '#replace' do - it do + it "send valid quries to sphinx" do expect(transmitter).to receive(:transmitted_data).and_return(field: 123) - expect(ThinkingSphinx).to receive(:replace).with('model_with_rt_rt0', field: 123) - expect(ThinkingSphinx).to receive(:soft_delete) + expect(mysql_client).to receive(:replace).with('model_with_rt_rt0', field: 123) + expect(mysql_client).to receive(:soft_delete) + + transmitter.replace(record) end - after { transmitter.replace(record) } end describe '#delete' do - it do - expect(ThinkingSphinx).to receive(:delete).with('model_with_rt_rt0', 1) - expect(ThinkingSphinx).to receive(:soft_delete) + it "send valid quries to sphinx" do + expect(mysql_client).to receive(:delete).with('model_with_rt_rt0', 1) + expect(mysql_client).to receive(:soft_delete) + transmitter.delete(record) end - after { transmitter.delete(record) } end describe '#update' do - it { expect(transmitter).to receive(:update_fields) } - after { transmitter.update(record, :field => 123) } + it "send valid quries to sphinx" do + expect(transmitter).to receive(:update_fields) + transmitter.update(record, field: 123) + end end describe '#update_fields' do @@ -43,23 +51,22 @@ before { transmitter.stub(:full_reindex? => true) } it do - expect(ThinkingSphinx).to receive(:update).with("model_with_rt_rt0", {field: 123}, id: 1) - expect(ThinkingSphinx).to receive(:update).with("model_with_rt_rt1", {field: 123}, id: 1) - expect(ThinkingSphinx). + expect(mysql_client).to receive(:update).with("model_with_rt_rt0", {field: 123}, id: 1) + expect(mysql_client).to receive(:update).with("model_with_rt_rt1", {field: 123}, id: 1) + expect(mysql_client). to receive(:find_in_batches). with("model_with_rt_core", where: {id: 1}, matching: "@id_idx 1"). and_yield([1]) - end - after { transmitter.update_fields({field: 123}, id: 1, matching: "@id_idx 1") } + transmitter.update_fields({field: 123}, id: 1, matching: "@id_idx 1") + end end context 'when no full reindex' do it do - expect(ThinkingSphinx).to receive(:update) + expect(mysql_client).to receive(:update) + transmitter.update_fields({field: 123}, id: 1) end - - after { transmitter.update_fields({:field => 123}, {:id => 1}) } end end end diff --git a/spec/sphinx/integration/waste_records_spec.rb b/spec/sphinx/integration/waste_records_spec.rb index 11e438d..f9f2bd9 100644 --- a/spec/sphinx/integration/waste_records_spec.rb +++ b/spec/sphinx/integration/waste_records_spec.rb @@ -1,7 +1,12 @@ require "spec_helper" describe Sphinx::Integration::WasteRecords do - let(:index) { double("index", name: "product", core_name_w: "product_core_w") } + let(:index) { double("index", name: "product", core_name: "product_core") } + let(:mysql_client) do + client = double("mysql client") + allow(ThinkingSphinx::Configuration.instance).to receive(:mysql_client).and_return(client) + client + end describe "#add" do it "add record to a queue set" do @@ -16,7 +21,7 @@ waste_records = described_class.new(index) waste_records.add(1) - expect(ThinkingSphinx).to receive(:soft_delete).with("product_core_w", [1]) + expect(mysql_client).to receive(:soft_delete).with("product_core", [1]) waste_records.cleanup expect(waste_records.size).to eq 0 end diff --git a/spec/support/helpers/sphinx_conf.rb b/spec/support/helpers/sphinx_conf.rb new file mode 100644 index 0000000..250d0d6 --- /dev/null +++ b/spec/support/helpers/sphinx_conf.rb @@ -0,0 +1,9 @@ +module SphinxConf + def stub_sphinx_conf(options) + expect(YAML).to receive(:load). + with("test:\n version: 2.0.3\n"). + and_return({test: options}.with_indifferent_access) + + ThinkingSphinx::Configuration.instance.reset + end +end diff --git a/sphinx-integration.gemspec b/sphinx-integration.gemspec index 6711e67..79e59c0 100644 --- a/sphinx-integration.gemspec +++ b/sphinx-integration.gemspec @@ -33,11 +33,15 @@ Gem::Specification.new do |gem| gem.add_development_dependency 'rake' gem.add_development_dependency 'bundler' - gem.add_development_dependency 'rspec', '~> 2.14.1' - gem.add_development_dependency 'rspec-rails', '~> 2.14.0' + gem.add_development_dependency 'rspec', '~> 3.3' + gem.add_development_dependency 'rspec-rails' gem.add_development_dependency 'rails', '~> 3.1.12' gem.add_development_dependency 'combustion' gem.add_development_dependency 'mock_redis' gem.add_development_dependency 'database_cleaner' gem.add_development_dependency 'pry-debugger' + gem.add_development_dependency 'rspec-collection_matchers' + gem.add_development_dependency 'rspec-its' + gem.add_development_dependency 'rspec-activemodel-mocks' + gem.add_development_dependency 'simplecov' end