From f59acd725dbf9e68bce5c31404d7983454bef43c Mon Sep 17 00:00:00 2001 From: Niels Vandekeybus Date: Fri, 8 Nov 2024 11:18:11 +0100 Subject: [PATCH 1/5] replace custom calls with elasticsearch client gem rationale: Moving to the elasticsearch client removes complexity from our own client (less maintenance) and allows us to benefit from features offered by the client. Specifically: - full elasticsearch api support, which should allow us to implement new features more quickly (like metrics, updating index settings without a complete rebuild, ...) - the ability to set up tracing and logging of http calls - the ability to use connection pooling and persistent connections --- Gemfile | 2 + Gemfile.lock | 57 +++--- framework/elastic.rb | 372 --------------------------------------- lib/mu_search/elastic.rb | 263 +++++++++++++++++++++++++++ web.rb | 4 +- 5 files changed, 301 insertions(+), 397 deletions(-) delete mode 100644 framework/elastic.rb create mode 100644 lib/mu_search/elastic.rb diff --git a/Gemfile b/Gemfile index 1350537..bae0575 100644 --- a/Gemfile +++ b/Gemfile @@ -2,4 +2,6 @@ source 'https://rubygems.org' do gem 'listen', '~> 3.0' gem 'parallel', '~> 1.17.0' gem "concurrent-ruby", "~> 1.1" + # matching our current backend setup + gem "elasticsearch", "~> 7.17" end diff --git a/Gemfile.lock b/Gemfile.lock index 1d868bd..c08a730 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,38 +1,49 @@ +GEM + specs: + GEM remote: https://rubygems.org/ specs: - concurrent-ruby (1.1.6) - connection_pool (2.2.3) - elasticsearch (6.8.1) - elasticsearch-api (= 6.8.1) - elasticsearch-transport (= 6.8.1) - elasticsearch-api (6.8.1) + base64 (0.2.0) + concurrent-ruby (1.3.4) + elasticsearch (7.17.11) + elasticsearch-api (= 7.17.11) + elasticsearch-transport (= 7.17.11) + elasticsearch-api (7.17.11) multi_json - elasticsearch-transport (6.8.1) - faraday (>= 0.14, < 1) + elasticsearch-transport (7.17.11) + base64 + faraday (>= 1, < 3) multi_json - faraday (0.17.3) - multipart-post (>= 1.2, < 3) - ffi (1.13.1-java) - listen (3.2.1) + faraday (2.12.0) + faraday-net_http (>= 2.0, < 3.4) + json + logger + faraday-net_http (3.3.0) + net-http + ffi (1.17.0-java) + json (2.8.1-java) + listen (3.9.0) rb-fsevent (~> 0.10, >= 0.10.3) rb-inotify (~> 0.9, >= 0.9.10) - multi_json (1.14.1) - multipart-post (2.1.1) + logger (1.6.1) + multi_json (1.15.0) + net-http (0.5.0) + uri parallel (1.17.0) - rb-fsevent (0.10.4) - rb-inotify (0.10.1) + rb-fsevent (0.11.2) + rb-inotify (0.11.1) ffi (~> 1.0) + uri (1.0.1) PLATFORMS - java + universal-java-1.8 DEPENDENCIES - concurrent-ruby (~> 1.1) - connection_pool (~> 2.2) - elasticsearch (~> 6.1) - listen (~> 3.0) - parallel (~> 1.17.0) + concurrent-ruby (~> 1.1)! + elasticsearch (~> 7.17)! + listen (~> 3.0)! + parallel (~> 1.17.0)! BUNDLED WITH - 2.1.0 + 2.5.5 diff --git a/framework/elastic.rb b/framework/elastic.rb deleted file mode 100644 index 2fe04e3..0000000 --- a/framework/elastic.rb +++ /dev/null @@ -1,372 +0,0 @@ -# A quick as-needed Elastic API, for use until -# the conflict with Sinatra::Utils is resolved -# * does not follow the standard API -# * see: https://github.com/mu-semtech/mu-ruby-template/issues/16 -class Elastic - # Sets up the ElasticSearch instance - def initialize(host: "localhost", port: 9200, logger:) - @host = host - @port = port - @port_s = port.to_s - @logger = logger - end - - # Checks whether or not ElasticSearch is up - # - # Executes a health check and accepts either "green" or "yellow". - def up? - uri = URI("http://#{@host}:#{@port_s}/_cluster/health") - req = Net::HTTP::Get.new(uri) - begin - resp = run(uri, req) - if resp.is_a? Net::HTTPSuccess - health = JSON.parse resp.body - health["status"] == "yellow" or health["status"] == "green" - else - false - end - rescue - false - end - end - - # Checks whether or not the supplied index exists. - # - index: string name of the index - # - # Executes a HEAD request. If that succeeds we can assume the index - # exists. - def index_exists?(index) - uri = URI("http://#{@host}:#{@port_s}/#{index}") - req = Net::HTTP::Head.new(uri) - - begin - resp = run(uri, req) - resp.is_a?(Net::HTTPSuccess) ? true : false - rescue StandardError => e - @logger.warn("ELASTICSEARCH") { "Failed to detect whether index #{index} exists. Assuming it doesn't." } - @logger.warn("ELASTICSEARCH") { e.full_message } - false - end - end - - # Creates an index in Elasticsearch - # - index: Index to be created - # - mappings: Optional pre-defined document mappings for the index, - # JSON object passed directly to Elasticsearch. - # - settings: Optional JSON object passed directly to Elasticsearch - def create_index(index, mappings = nil, settings = nil) - uri = URI("http://#{@host}:#{@port_s}/#{index}") - req = Net::HTTP::Put.new(uri) - req_body = { - mappings: mappings, - settings: settings - }.to_json - req.body = req_body - resp = run(uri, req) - - if resp.is_a? Net::HTTPSuccess - @logger.debug("ELASTICSEARCH") { "Successfully created index #{index}" } - resp - else - @logger.error("ELASTICSEARCH") { "Failed to create index #{index}.\nPUT #{uri}\nRequest: #{req_body}\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - if resp.is_a? Net::HTTPClientError # 4xx status code - raise ArgumentError, "Failed to create index #{index}: #{req_body}" - else - raise "Failed to create index #{index}" - end - end - end - - # Deletes an index from ElasticSearch - # - index: Name of the index to be removed - # - # Returns true when the index existed and is succesfully deleted. - # Otherwise false. - # Throws an error if the index exists but fails to be deleted. - def delete_index(index) - uri = URI("http://#{@host}:#{@port_s}/#{index}") - req = Net::HTTP::Delete.new(uri) - resp = run(uri, req) - - if resp.is_a? Net::HTTPNotFound - @logger.debug("ELASTICSEARCH") { "Index #{index} doesn't exist and cannot be deleted." } - false - elsif resp.is_a? Net::HTTPSuccess - @logger.debug("ELASTICSEARCH") { "Successfully deleted index #{index}" } - true - else - @logger.error("ELASTICSEARCH") { "Failed to delete index #{index}.\nDELETE #{uri}\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - raise "Failed to delete index #{index}" - end - end - - # Refreshes an ElasticSearch index, making documents available for - # search. - # - index: Name of the index which will be refreshed. - # - # Returns whether the refresh succeeded - # - # When we store documents in ElasticSearch, they are not necessarily - # available immediately. It requires a refresh of the index. This - # operation happens once every second. When we build an index to - # query it immediately, we should ensure to refresh the index before - # querying. - def refresh_index(index) - uri = URI("http://#{@host}:#{@port_s}/#{index}/_refresh") - req = Net::HTTP::Post.new(uri) - resp = run(uri, req) - - if resp.is_a? Net::HTTPSuccess - @logger.debug("ELASTICSEARCH") { "Successfully refreshed index #{index}" } - true - else - @logger.warn("ELASTICSEARCH") { "Failed to refresh index #{index}.\nPOST #{uri}\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - false - end - end - - # Clear a given index by deleting all documents in the Elasticsearch index - # - index: Index name to clear - # Note: this operation does not delete the index in Elasticsearch - def clear_index(index) - if index_exists? index - resp = delete_documents_by_query index, { query: { match_all: {} } } - if resp.is_a? Net::HTTPSuccess - @logger.debug("ELASTICSEARCH") { "Successfully cleared index #{index}" } - resp - else - @logger.error("ELASTICSEARCH") { "Failed to clear index #{index}.\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - raise "Failed to clear index #{index}" - end - end - end - - # Gets a single document from an index by its ElasticSearch id. - # Returns nil if the document cannot be found. - # - index: Index to retrieve the document from - # - id: ElasticSearch ID of the document - def get_document(index, id) - uri = URI("http://#{@host}:#{@port_s}/#{index}/_doc/#{CGI::escape(id)}") - req = Net::HTTP::Get.new(uri) - resp = run(uri, req) - if resp.is_a? Net::HTTPNotFound - @logger.debug("ELASTICSEARCH") { "Document #{id} not found in index #{index}" } - nil - elsif resp.is_a? Net::HTTPSuccess - JSON.parse resp.body - else - @logger.error("ELASTICSEARCH") { "Failed to get document #{id} from index #{index}.\nGET #{uri}\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - raise "Failed to get document #{id} from index #{index}" - end - end - - # Inserts a new document in an Elasticsearch index - # - index: Index to store the document in. - # - id: Elasticsearch identifier to store the document under. - # - document: document contents to index (as a ruby json object) - # Returns the inserted document - # Raises an error on failure. - def insert_document(index, id, document) - uri = URI("http://#{@host}:#{@port_s}/#{index}/_doc/#{CGI::escape(id)}") - req = Net::HTTP::Put.new(uri) - req_body = document.to_json - req.body = req_body - resp = run(uri, req) - - if resp.is_a? Net::HTTPSuccess - @logger.debug("ELASTICSEARCH") { "Inserted document #{id} in index #{index}" } - JSON.parse resp.body - else - @logger.error("ELASTICSEARCH") { "Failed to insert document #{id} in index #{index}.\nPUT #{uri}\nRequest: #{req_body}\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - raise "Failed to insert document #{id} in index #{index}" - end - end - - # Partially updates an existing document in Elasticsearch index - # - index: Index to update the document in - # - id: ElasticSearch identifier of the document - # - document: New document contents - # Returns the updated document or nil if the document cannot be found. - # Otherwise, raises an error. - def update_document(index, id, document) - uri = URI("http://#{@host}:#{@port_s}/#{index}/_doc/#{CGI::escape(id)}/_update") - req = Net::HTTP::Post.new(uri) - req_body = { "doc": document }.to_json - req.body = req_body - resp = run(uri, req) - - if resp.is_a? Net::HTTPNotFound - @logger.info("ELASTICSEARCH") { "Cannot update document #{id} in index #{index} because it doesn't exist" } - nil - elsif resp.is_a? Net::HTTPSuccess - @logger.debug("ELASTICSEARCH") { "Updated document #{id} in index #{index}" } - JSON.parse resp.body - else - @logger.error("ELASTICSEARCH") { "Failed to update document #{id} in index #{index}.\nPOST #{uri}\nRequest: #{req_body}\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - raise "Failed to update document #{id} in index #{index}" - end - end - - # Updates the document with the given id in the given index. - # Inserts the document if it doesn't exist yet - # - index: index to store document in - # - id: elastic identifier to store the document under - # - document: document contents (as a ruby json object) - def upsert_document(index, id, document) - @logger.debug("ELASTICSEARCH") { "Trying to update document with id #{id}" } - updated_document = update_document index, id, document - if updated_document.nil? - @logger.debug("ELASTICSEARCH") { "Document #{id} does not exist yet, trying to insert new document" } - insert_document index, id, document - else - updated_document - end - end - - # Deletes a document from an Elasticsearch index - # - index: Index to remove the document from - # - id: ElasticSearch identifier of the document - # Returns true when the document existed and is succesfully deleted. - # Otherwise false. - # Throws an error if the document exists but fails to be deleted. - def delete_document(index, id) - uri = URI("http://#{@host}:#{@port_s}/#{index}/_doc/#{CGI::escape(id)}") - req = Net::HTTP::Delete.new(uri) - resp = run(uri, req) - - if resp.is_a? Net::HTTPNotFound - @logger.debug("ELASTICSEARCH") { "Document #{id} doesn't exist in index #{index} and cannot be deleted." } - false - elsif resp.is_a? Net::HTTPSuccess - @logger.debug("ELASTICSEARCH") { "Successfully deleted document #{id} in index #{index}" } - true - else - @logger.error("ELASTICSEARCH") { "Failed to delete document #{id} in index #{index}.\nDELETE #{uri}\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - raise "Failed to delete document #{id} in index #{index}" - end - end - - # Searches for documents in the given indexes - # - indexes: Array of indexes to be searched - # - query: Elasticsearch query JSON object in ruby format - def search_documents(indexes:, query: nil) - indexes_s = indexes.join(',') - uri = URI("http://#{@host}:#{@port_s}/#{indexes_s}/_search") - req_body = query.to_json - @logger.debug("SEARCH") { "Searching Elasticsearch index(es) #{indexes_s} with body #{req_body}" } - req = Net::HTTP::Post.new(uri) - req.body = req_body - resp = run(uri, req) - - if resp.is_a? Net::HTTPSuccess - JSON.parse resp.body - else - @logger.error("SEARCH") { "Searching documents in index(es) #{indexes_s} failed.\nPOST #{uri}\nRequest: #{req_body}\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - if resp.is_a? Net::HTTPClientError # 4xx status code - raise ArgumentError, "Invalid search query #{req_body}" - else - raise "Something went wrong while searching documents in index(es) #{indexes_s}" - end - end - end - - # Counts search results for documents in the given indexex - # - indexes: Array of indexes to be searched - # - query: Elasticsearch query JSON object in ruby format - def count_documents(indexes:, query: nil) - indexes_s = indexes.join(',') - uri = URI("http://#{@host}:#{@port_s}/#{indexes_s}/_doc/_count") - req_body = query.to_json - @logger.debug("SEARCH") { "Count search results in index(es) #{indexes_s} for body #{req_body}" } - req = Net::HTTP::Get.new(uri) - req.body = req_body - resp = run(uri, req) - - if resp.is_a? Net::HTTPSuccess - data = JSON.parse resp.body - data["count"] - else - @logger.error("SEARCH") { "Counting search results in index(es) #{indexes_s} failed.\nPOST #{uri}\nRequest: #{req_body}\nResponse: #{resp.code} #{resp.msg}\n#{resp.body}" } - if resp.is_a? Net::HTTPClientError # 4xx status code - raise ArgumentError, "Invalid search query #{req_body}" - else - raise "Something went wrong while counting search results in index(es) #{indexes_s}" - end - end - end - - private - - # Sends a raw request to Elasticsearch - # - uri: URI instance representing the elasticSearch host - # - req: The request object - # - retries: Max number of retries - # - # Returns the HTTP response. - # - # Note: the method only logs limited info on purpose. - # Additional logging about the error that occurred - # is the responsibility of the consumer. - def run(uri, req, retries = 6) - def run_rescue(uri, req, retries, result = nil) - if retries == 0 - if result.is_a? Exception - @logger.warn("ELASTICSEARCH") { "Failed to run request #{uri}. Max number of retries reached." } - raise result - else - @logger.info("ELASTICSEARCH") { "Failed to run request #{uri}. Max number of retries reached." } - result - end - else - @logger.info("ELASTICSEARCH") { "Failed to run request #{uri}. Request will be retried (#{retries} left)." } - next_retries = retries - 1 - backoff = (6 - next_retries)**2 - sleep backoff - run(uri, req, next_retries) - end - end - - req["content-type"] = "application/json" - - res = Net::HTTP.start(uri.hostname, uri.port) do |http| - http.read_timeout = ENV["ELASTIC_READ_TIMEOUT"].to_i - begin - http.request(req) - rescue Exception => e - run_rescue(uri, req, retries, e) - end - end - - case res - when Net::HTTPSuccess, Net::HTTPRedirection # response code 2xx or 3xx - # Ruby doesn't use the encoding specified in HTTP headers (https://bugs.ruby-lang.org/issues/2567#note-3) - content_type = res["CONTENT-TYPE"] - if res.body && content_type && content_type.downcase.include?("charset=utf-8") - res.body.force_encoding("utf-8") - end - res - when Net::HTTPTooManyRequests - run_rescue(uri, req, retries, res) - else - @logger.info("ELASTICSEARCH") { "Failed to run request #{uri}" } - @logger.debug("ELASTICSEARCH") { "Request body (trimmed): #{req.body.to_s[0...1024]}" } - @logger.debug("ELASTICSEARCH") { "Response: #{res.inspect}" } - res - end - end - - # Deletes all documents which match a certain query - # - index: Index to delete the documents from - # - query: ElasticSearch query used for selecting documents - # - conflicts_proceed: boolean indicating whether to proceed deletion if - # other operations are currently occurring on the same document or not. - # - # For the query formal, see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html - def delete_documents_by_query(index, query, conflicts_proceed: true) - conflicts = conflicts_proceed ? 'conflicts=proceed' : '' - uri = URI("http://#{@host}:#{@port_s}/#{index}/_doc/_delete_by_query?#{conflicts}") - req = Net::HTTP::Post.new(uri) - req.body = query.to_json - run(uri, req) - end -end diff --git a/lib/mu_search/elastic.rb b/lib/mu_search/elastic.rb new file mode 100644 index 0000000..f70340a --- /dev/null +++ b/lib/mu_search/elastic.rb @@ -0,0 +1,263 @@ +require 'elasticsearch' +# monkeypatch "authentic product check"" in client +module Elasticsearch + class Client + alias original_verify_with_version_or_header verify_with_version_or_header + + def verify_with_version_or_header(...) + original_verify_with_version_or_header(...) + rescue Elasticsearch::UnsupportedProductError + # silenty ignore this error + end + end +end + +# A wrapper around elasticsearch client for backwards compatiblity +# see https://rubydoc.info/gems/elasticsearch-api/Elasticsearch +# and https://www.elastic.co/guide/en/elasticsearch/client/ruby-api/current/examples.html +# for docs on the client api +## +module MuSearch + class Elastic + attr_reader :client + # Sets up the ElasticSearch instance + def initialize(host: "localhost", port: 9200, logger:) + @logger = logger + @client = Elasticsearch::Client.new(host: host, port: port) + end + + # Checks whether or not ElasticSearch is up + # + # Executes a health check and accepts either "green" or "yellow". + def up? + begin + health = client.cluster.health + health["status"] == "yellow" or health["status"] == "green" + rescue + false + end + end + + # Checks whether or not the supplied index exists. + # - index: string name of the index + # + # Executes a HEAD request. If that succeeds we can assume the index + # exists. + def index_exists?(index) + begin + client.indices.exists?(index: index) + rescue StandardError => e + @logger.warn("ELASTICSEARCH") { "Error while checking if index #{index} exists. Assuming it doesn't." } + @logger.warn("ELASTICSEARCH") { e.full_message } + false + end + end + + # Creates an index in Elasticsearch + # - index: Index to be created + # - mappings: Optional pre-defined document mappings for the index, + # JSON object passed directly to Elasticsearch. + # - settings: Optional JSON object passed directly to Elasticsearch + def create_index(index, mappings = nil, settings = nil) + begin + client.indices.create(index: index, body: { settings: settings, mappings: mappings}) + rescue Elasticsearch::Transport::Transport::Errors::BadRequest => e + error_message = e.message + if error_message.include?("resource_already_exists_exception") + @logger.warn("ELASTICSEARCH") {"Failed to create index #{index}, because it already exists" } + else + @logger.error("ELASTICSEARCH") {"Failed to create index #{index}, error: #{error_message}" } + raise e + end + rescue StandardError => e + @logger.error("Failed to create index #{index}\n Error: #{e.full_message}") + raise e + end + end + + # Deletes an index from ElasticSearch + # - index: Name of the index to be removed + # + # Returns true when the index existed and is succesfully deleted. + # Otherwise false. + # Throws an error if the index exists but fails to be deleted. + def delete_index(index) + begin + client.indices.delete(index: index) + @logger.debug("ELASTICSEARCH") { "Successfully deleted index #{index}" } + true + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + @logger.debug("ELASTICSEARCH") { "Index #{index} doesn't exist and cannot be deleted." } + false + rescue StandardError => e + @logger.error("ELASTICSEARCH") { "Failed to delete index #{index}. Error: #{e.message}" } + raise "Failed to delete index #{index}: #{e.message}" + end + end + + # Refreshes an ElasticSearch index, making documents available for + # search. + # - index: Name of the index which will be refreshed. + # + # Returns whether the refresh succeeded + # + # When we store documents in ElasticSearch, they are not necessarily + # available immediately. It requires a refresh of the index. This + # operation happens once every second. When we build an index to + # query it immediately, we should ensure to refresh the index before + # querying. + def refresh_index(index) + begin + client.indices.refresh(index: index) + @logger.debug("ELASTICSEARCH") { "Successfully refreshed index #{index}" } + true + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + @logger.warn("ELASTICSEARCH") { "Index #{index} does not exist, cannot refresh." } + false + rescue StandardError => e + @logger.error("ELASTICSEARCH") { "Failed to refresh index #{index}. Error: #{e.full_message}" } + false + end + end + + # Clear a given index by deleting all documents in the Elasticsearch index + # - index: Index name to clear + # Note: this operation does not delete the index in Elasticsearch + def clear_index(index) + begin + # The `query: { match_all: {} }` deletes all documents in the index. + client.delete_by_query(index: index, body: { query: { match_all: {} } }) + @logger.debug("ELASTICSEARCH") { "Successfully cleared all documents from index #{index}" } + true + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + @logger.warn("ELASTICSEARCH") { "Index #{index} does not exist, cannot clear documents." } + false + rescue StandardError => e + @logger.error("ELASTICSEARCH") { "Failed to clear documents in index #{index}. Error: #{e.message}" } + raise e + end + end + + # Gets a single document from an index by its ElasticSearch id. + # Returns nil if the document cannot be found. + # - index: Index to retrieve the document from + # - id: ElasticSearch ID of the document + def get_document(index, id) + begin + client.get(index: index, id: id) + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + @logger.debug("ELASTICSEARCH") { "Document #{id} not found in index #{index}" } + nil + rescue StandardError => e + @logger.error("ELASTICSEARCH") { "Failed to get document #{id} from index #{index}.\n Error: #{e.full_message}" } + raise e + end + end + + # Inserts a new document in an Elasticsearch index + # - index: Index to store the document in. + # - id: Elasticsearch identifier to store the document under. + # - document: document contents to index (as a ruby json object) + # Returns the inserted document + # Raises an error on failure. + def insert_document(index, id, document) + begin + body = client.index(index: index, id: id, body: document) + @logger.debug("ELASTICSEARCH") { "Inserted document #{id} in index #{index}" } + body + rescue StandardError => e + @logger.error("ELASTICSEARCH") { "Failed to insert document #{id} in index #{index}.\n Error: #{e.full_message}" } + raise e + end + end + + # Partially updates an existing document in Elasticsearch index + # - index: Index to update the document in + # - id: ElasticSearch identifier of the document + # - document: New document contents + # Returns the updated document or nil if the document cannot be found. + # Otherwise, raises an error. + def update_document(index, id, document) + begin + body = client.update(index: index, id: id, body: {doc: document}) + @logger.debug("ELASTICSEARCH") { "Updated document #{id} in index #{index}" } + body + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + @logger.info("ELASTICSEARCH") { "Cannot update document #{id} in index #{index} because it doesn't exist" } + nil + rescue StandardError => e + @logger.error("ELASTICSEARCH") { "Failed to update document #{id} in index #{index}.\n Error: #{e.full_message}" } + raise e + end + end + + # Updates the document with the given id in the given index. + # Inserts the document if it doesn't exist yet + # - index: index to store document in + # - id: elastic identifier to store the document under + # - document: document contents (as a ruby json object) + def upsert_document(index, id, document) + @logger.debug("ELASTICSEARCH") { "Trying to update document with id #{id}" } + updated_document = update_document index, id, document + if updated_document.nil? + @logger.debug("ELASTICSEARCH") { "Document #{id} does not exist yet, trying to insert new document" } + insert_document index, id, document + else + updated_document + end + end + + # Deletes a document from an Elasticsearch index + # - index: Index to remove the document from + # - id: ElasticSearch identifier of the document + # Returns true when the document existed and is succesfully deleted. + # Otherwise false. + # Throws an error if the document exists but fails to be deleted. + def delete_document(index, id) + begin + client.delete(index: index, id: id) + @logger.debug("ELASTICSEARCH") { "Successfully deleted document #{id} in index #{index}" } + true + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + @logger.debug("ELASTICSEARCH") { "Document #{id} doesn't exist in index #{index} and cannot be deleted." } + false + rescue StandardError => e + @logger.error("ELASTICSEARCH") { "Failed to dele document #{id} in index #{index}.\n Error: #{e.full_message}" } + raise e + end + end + + # Searches for documents in the given indexes + # - indexes: Array of indexes to be searched + # - query: Elasticsearch query JSON object in ruby format + def search_documents(indexes:, query: nil) + begin + @logger.debug("SEARCH") { "Searching Elasticsearch index(es) #{indexes} with body #{req_body}" } + client.search(index: indexes, body: query) + rescue Elasticsearch::Transport::Transport::Errors::BadRequest => e + raise ArgumentError, "Invalid search query #{query}" + rescue StandardError => e + @logger.error("SEARCH") { "Searching documents in index(es) #{indexes} failed.\n Error: #{e.full_message}" } + raise e + end + end + + # Counts search results for documents in the given indexex + # - indexes: Array of indexes to be searched + # - query: Elasticsearch query JSON object in ruby format + def count_documents(indexes:, query: nil) + begin + @logger.debug("SEARCH") { "Count search results in index(es) #{indexes} for body #{req_body}" } + puts query.inspect + response = client.count(index: indexes, body: query) + response["count"] + rescue Elasticsearch::Transport::Transport::Errors::BadRequest => e + @logger.error("SEARCH") { "Counting search results in index(es) #{indexes} failed.\n Error: #{e.full_message}" } + raise ArgumentError, "Invalid count query #{query}" + rescue StandardError => e + @logger.error("SEARCH") { "Counting search results in index(es) #{indexes} failed.\n Error: #{e.full_message}" } + raise e + end + end + end +end diff --git a/web.rb b/web.rb index f1efa30..d9475db 100644 --- a/web.rb +++ b/web.rb @@ -20,7 +20,7 @@ require_relative 'lib/mu_search/index_builder.rb' require_relative 'lib/mu_search/search_index.rb' require_relative 'lib/mu_search/index_manager.rb' -require_relative 'framework/elastic.rb' +require_relative 'lib/mu_search/elastic.rb' require_relative 'framework/elastic_query_builder.rb' require_relative 'framework/tika.rb' require_relative 'framework/jsonapi.rb' @@ -112,7 +112,7 @@ def setup_delta_handling(index_manager, elasticsearch, tika, sparql_connection_p logger: Mu::log ) - elasticsearch = Elastic.new( + elasticsearch = MuSearch::Elastic.new( host: 'elasticsearch', port: 9200, logger: Mu::log From d1ad5dddce054a94c36a511aa085d765240ea13d Mon Sep 17 00:00:00 2001 From: Niels Vandekeybus Date: Fri, 8 Nov 2024 11:57:16 +0100 Subject: [PATCH 2/5] update default faraday adapter to typhoeus faraday can use several http backends for its requests, quick research indicated this to be the most used modern one. it should be easy to switch to one of the other adapters (net http persistent or patron) if this one is not performant enough sources: - https://apisyouwonthate.com/blog/ruby-users-be-wary-of-net-http/ - https://www.elastic.co/guide/en/elasticsearch/client/ruby-api/current/troubleshooting.html#ruby-ts-adapter - https://ruby.libhunt.com/compare-patron-vs-typhoeus - https://github.com/elastic/elasticsearch-ruby/issues/437 --- Gemfile | 2 ++ Gemfile.lock | 8 ++++++++ lib/mu_search/elastic.rb | 2 ++ 3 files changed, 12 insertions(+) diff --git a/Gemfile b/Gemfile index bae0575..e195b87 100644 --- a/Gemfile +++ b/Gemfile @@ -5,3 +5,5 @@ source 'https://rubygems.org' do # matching our current backend setup gem "elasticsearch", "~> 7.17" end + +gem "faraday-typhoeus", "~> 1.1" diff --git a/Gemfile.lock b/Gemfile.lock index c08a730..1267f17 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,5 +1,12 @@ GEM specs: + ethon (0.16.0) + ffi (>= 1.15.0) + faraday-typhoeus (1.1.0) + faraday (~> 2.0) + typhoeus (~> 1.4) + typhoeus (1.4.1) + ethon (>= 0.9.0) GEM remote: https://rubygems.org/ @@ -42,6 +49,7 @@ PLATFORMS DEPENDENCIES concurrent-ruby (~> 1.1)! elasticsearch (~> 7.17)! + faraday-typhoeus (~> 1.1) listen (~> 3.0)! parallel (~> 1.17.0)! diff --git a/lib/mu_search/elastic.rb b/lib/mu_search/elastic.rb index f70340a..0ca6469 100644 --- a/lib/mu_search/elastic.rb +++ b/lib/mu_search/elastic.rb @@ -1,4 +1,6 @@ require 'elasticsearch' +require 'faraday/typhoeus' + # monkeypatch "authentic product check"" in client module Elasticsearch class Client From ed3794f922b1ca098c6725d536d4bb79c8505f6e Mon Sep 17 00:00:00 2001 From: Niels Vandekeybus Date: Fri, 8 Nov 2024 13:39:28 +0100 Subject: [PATCH 3/5] cleanup unused requires --- lib/mu_search/sparql.rb | 1 - web.rb | 8 -------- 2 files changed, 9 deletions(-) diff --git a/lib/mu_search/sparql.rb b/lib/mu_search/sparql.rb index 0b70aa9..db87753 100644 --- a/lib/mu_search/sparql.rb +++ b/lib/mu_search/sparql.rb @@ -1,4 +1,3 @@ -require 'request_store' module MuSearch module SPARQL class ClientWrapper diff --git a/web.rb b/web.rb index d9475db..a72b939 100644 --- a/web.rb +++ b/web.rb @@ -1,11 +1,3 @@ -require 'net/http' -require 'digest' -require 'set' -require 'request_store' -require 'listen' -require 'singleton' -require 'base64' -require 'open3' require 'webrick' require_relative 'lib/logger.rb' From 1996d56c12db753aa20eb61e8a14d2bce99ad5eb Mon Sep 17 00:00:00 2001 From: Niels Vandekeybus Date: Fri, 8 Nov 2024 15:16:34 +0100 Subject: [PATCH 4/5] temporary fix for `bad URI` error use latest template which has a newer webrick version --- Dockerfile | 2 +- Gemfile | 5 ++--- web.rb | 1 + 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index cfbb76d..fb635b6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM semtech/mu-jruby-template:3.1.0 +FROM semtech/mu-jruby-template LABEL maintainer="redpencil " # 200MB diff --git a/Gemfile b/Gemfile index e195b87..5df6b20 100644 --- a/Gemfile +++ b/Gemfile @@ -1,9 +1,8 @@ source 'https://rubygems.org' do - gem 'listen', '~> 3.0' gem 'parallel', '~> 1.17.0' gem "concurrent-ruby", "~> 1.1" # matching our current backend setup gem "elasticsearch", "~> 7.17" + gem "faraday-typhoeus", "~> 1.1" + gem "pry-debugger-jruby", "~> 2.1" end - -gem "faraday-typhoeus", "~> 1.1" diff --git a/web.rb b/web.rb index a72b939..4da1cf3 100644 --- a/web.rb +++ b/web.rb @@ -17,6 +17,7 @@ require_relative 'framework/tika.rb' require_relative 'framework/jsonapi.rb' + ## # WEBrick setup ## From 68b69b9c20c4578a2632efc5057a9e27be61d18d Mon Sep 17 00:00:00 2001 From: Niels Vandekeybus Date: Fri, 8 Nov 2024 16:26:12 +0100 Subject: [PATCH 5/5] removed the lock file as the template ignores this anyway this just causes extra confusion when debugging dependencies --- Gemfile.lock | 57 ---------------------------------------------------- 1 file changed, 57 deletions(-) delete mode 100644 Gemfile.lock diff --git a/Gemfile.lock b/Gemfile.lock deleted file mode 100644 index 1267f17..0000000 --- a/Gemfile.lock +++ /dev/null @@ -1,57 +0,0 @@ -GEM - specs: - ethon (0.16.0) - ffi (>= 1.15.0) - faraday-typhoeus (1.1.0) - faraday (~> 2.0) - typhoeus (~> 1.4) - typhoeus (1.4.1) - ethon (>= 0.9.0) - -GEM - remote: https://rubygems.org/ - specs: - base64 (0.2.0) - concurrent-ruby (1.3.4) - elasticsearch (7.17.11) - elasticsearch-api (= 7.17.11) - elasticsearch-transport (= 7.17.11) - elasticsearch-api (7.17.11) - multi_json - elasticsearch-transport (7.17.11) - base64 - faraday (>= 1, < 3) - multi_json - faraday (2.12.0) - faraday-net_http (>= 2.0, < 3.4) - json - logger - faraday-net_http (3.3.0) - net-http - ffi (1.17.0-java) - json (2.8.1-java) - listen (3.9.0) - rb-fsevent (~> 0.10, >= 0.10.3) - rb-inotify (~> 0.9, >= 0.9.10) - logger (1.6.1) - multi_json (1.15.0) - net-http (0.5.0) - uri - parallel (1.17.0) - rb-fsevent (0.11.2) - rb-inotify (0.11.1) - ffi (~> 1.0) - uri (1.0.1) - -PLATFORMS - universal-java-1.8 - -DEPENDENCIES - concurrent-ruby (~> 1.1)! - elasticsearch (~> 7.17)! - faraday-typhoeus (~> 1.1) - listen (~> 3.0)! - parallel (~> 1.17.0)! - -BUNDLED WITH - 2.5.5