diff --git a/README.md b/README.md index ad2f742..5270e03 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,6 @@ This repository contains plugins to both read from and write to Vespa by using Logstash. -Check out the README.md files in the respective plugin directories for more information. \ No newline at end of file +Check out the README.md files in the respective plugin directories for more information. + +This repository doesn't have issues enabled, but feel free to open related issues at https://github.com/vespa-engine/vespa diff --git a/logstash-input-vespa/README.md b/logstash-input-vespa/README.md index a226166..916ee2d 100644 --- a/logstash-input-vespa/README.md +++ b/logstash-input-vespa/README.md @@ -20,6 +20,45 @@ input { } } +output { + stdout {} +} +``` + +With all the options: +``` +input { + vespa { + # Vespa endpoint + vespa_url => "http://localhost:8080" + + # cluster name from services.xml + cluster => "test_cluster" + + # mTLS certificate and key + client_cert => "/Users/myuser/.vespa/mytenant.myapp.default/data-plane-public-cert.pem" + client_key => "/Users/myuser/.vespa/mytenant.myapp.default/data-plane-private-key.pem" + + # page size + page_size => 100 + + # Backend concurrency + backend_concurrency => 1 + + # Selection statement + selection => "doc AND id.namespace == 'open'" + + # HTTP request timeout + timeout => 180 + + # lower timestamp bound (microseconds since epoch) + from_timestamp => 1600000000000000 + + # upper timestamp bound (microseconds since epoch) + to_timestamp => 1800000000000000 + } +} + output { stdout {} } diff --git a/logstash-input-vespa/docs/index.asciidoc b/logstash-input-vespa/docs/index.asciidoc index dd1c45a..656574e 100644 --- a/logstash-input-vespa/docs/index.asciidoc +++ b/logstash-input-vespa/docs/index.asciidoc @@ -49,6 +49,70 @@ URL to the Vespa instance. The cluster name to be used while visiting: https://docs.vespa.ai/en/reference/document-v1-api-reference.html#visit +[id="plugins-{type}s-{plugin}-client_cert"] +===== `client_cert` + +* Value type is <> +* Default value is `nil` + +Path to the client certificate file for mTLS. + +[id="plugins-{type}s-{plugin}-client_key"] +===== `client_key` + +* Value type is <> +* Default value is `nil` + +Path to the client key file for mTLS. + +[id="plugins-{type}s-{plugin}-page_size"] +===== `page_size` + +* Value type is <> +* Default value is `100` + +Desired page size for the visit request, i.e. the wantedDocumentCount parameter. + +[id="plugins-{type}s-{plugin}-backend_concurrency"] +===== `backend_concurrency` + +* Value type is <> +* Default value is `1` + +Backend concurrency for the visit request, i.e. the concurrency parameter. + +[id="plugins-{type}s-{plugin}-selection"] +===== `selection` + +* Value type is <> +* Default value is `nil` + +Selection. A query in Vespa selector language. + +[id="plugins-{type}s-{plugin}-timeout"] +===== `timeout` + +* Value type is <> +* Default value is `180` + +Timeout for each HTTP request. + +[id="plugins-{type}s-{plugin}-from_timestamp"] +===== `from_timestamp` + +* Value type is <> +* Default value is `nil` + +Lower timestamp limit for the visit request, i.e. the fromTimestamp parameter. +Microseconds since epoch. + +[id="plugins-{type}s-{plugin}-to_timestamp"] +===== `to_timestamp` + +* Value type is <> +* Default value is `nil` + +Upper timestamp limit for the visit request, i.e. the toTimestamp parameter. [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] diff --git a/logstash-input-vespa/lib/logstash/inputs/vespa.rb b/logstash-input-vespa/lib/logstash/inputs/vespa.rb index 1ce746e..d458e4e 100644 --- a/logstash-input-vespa/lib/logstash/inputs/vespa.rb +++ b/logstash-input-vespa/lib/logstash/inputs/vespa.rb @@ -4,6 +4,7 @@ require "net/http" require "uri" require "json" +require "openssl" # This is the logstash vespa input plugin. It is used to read from Vespa # via Visit : https://docs.vespa.ai/en/reference/document-v1-api-reference.html#visit @@ -21,28 +22,71 @@ class LogStash::Inputs::Vespa < LogStash::Inputs::Base # The cluster parameter to use in the request. config :cluster, :validate => :string, :required => true + # Path to the client certificate file for mTLS. + config :client_cert, :validate => :path + + # Path to the client key file for mTLS. + config :client_key, :validate => :path + + # desired page size for the visit request, i.e. the wantedDocumentCount parameter + config :page_size, :validate => :number, :default => 100 + + # backend concurrency for the visit request, i.e. the concurrency parameter + config :backend_concurrency, :validate => :number, :default => 1 + + # selection. A query in Vespa selector language + config :selection, :validate => :string + + # timeout for each HTTP request + config :timeout, :validate => :number, :default => 180 + + # lower timestamp limit for the visit request, i.e. the fromTimestamp parameter + # microseconds since epoch + config :from_timestamp, :validate => :number + + # upper timestamp limit for the visit request, i.e. the toTimestamp parameter + config :to_timestamp, :validate => :number + public def register - # nothing to do here + if @client_cert != nil + @cert = OpenSSL::X509::Certificate.new(File.read(@client_cert)) + end + if @client_key != nil + @key = OpenSSL::PKey::RSA.new(File.read(@client_key)) + end + + if @client_cert.nil? ^ @client_key.nil? + raise LogStash::ConfigurationError, "Both client_cert and client_key must be set, you can't have just one" + end + + @uri_params = { + :cluster => @cluster, + :wantedDocumentCount => @page_size, + :concurrency => @backend_concurrency, + :timeout => @timeout + } + + if @selection != nil + @uri_params[:selection] = @selection + end + + if @from_timestamp != nil + @uri_params[:fromTimestamp] = @from_timestamp + end + + if @to_timestamp != nil + @uri_params[:toTimestamp] = @to_timestamp + end end # def register def run(queue) - uri = URI.parse("#{@vespa_url}/document/v1/?cluster=#{@cluster}") + uri = URI.parse("#{@vespa_url}/document/v1/") + uri.query = URI.encode_www_form(@uri_params) continuation = nil loop do - - if continuation != nil - uri.query = URI.encode_www_form({:cluster => @cluster, :continuation => continuation}) - end - - # TODO we'll want to retry here - begin - response = Net::HTTP.get_response(uri) - rescue => e - @logger.error("Failed to make HTTP request to Vespa", :error => e.message) - break - end + response = fetch_documents_from_vespa(uri) # response should look like: # { # "pathId":"/document/v1/","documents":[ @@ -52,30 +96,31 @@ def run(queue) # } if response.is_a?(Net::HTTPSuccess) - begin - response_parsed = JSON.parse(response.body) - rescue JSON::ParserError => e - @logger.error("Failed to parse JSON response", :error => e.message) - break - end + response_parsed = parse_response(response) + break unless response_parsed document_count = response_parsed["documentCount"] # record the continuation token for the next request (if it exists) continuation = response_parsed["continuation"] documents = response_parsed["documents"] - documents.each do |document| - # TODO we need to also get the document ID, namespace and doctype - event = LogStash::Event.new(document["fields"]) - decorate(event) - queue << event - end # documents.each + process_documents(documents, queue) # Exit the loop if there are no more documents to process - break if document_count == 0 + if continuation != nil + uri.query = URI.encode_www_form(@uri_params.merge({:continuation => continuation})) + else + @logger.info("No continuation ID => no more documents to fetch from Vespa") + break + end + + if @stopping + @logger.info("Stopping Vespa input") + break + end else - @logger.error("Failed to fetch documents from Vespa", + @logger.error("Failed to fetch documents from Vespa", :request => uri.to_s, :response_code => response.code, :response_message => response.message) break # TODO retry? Only on certain codes? end # if response.is_a?(Net::HTTPSuccess) @@ -83,11 +128,38 @@ def run(queue) end # loop do end # def run + def fetch_documents_from_vespa(uri) + http = Net::HTTP.new(uri.host, uri.port) + if uri.scheme == "https" + http.use_ssl = true + http.cert = @cert + http.key = @key + http.verify_mode = OpenSSL::SSL::VERIFY_PEER + end + + request = Net::HTTP::Get.new(uri.request_uri) + http.request(request) + rescue => e + @logger.error("Failed to make HTTP request to Vespa", :error => e.message) + nil + end # def fetch_documents_from_vespa + + def parse_response(response) + JSON.parse(response.body) + rescue JSON::ParserError => e + @logger.error("Failed to parse JSON response", :error => e.message) + nil + end # def parse_response + + def process_documents(documents, queue) + documents.each do |document| + event = LogStash::Event.new(document) + decorate(event) + queue << event + end + end # def process_documents + def stop - # TODO - # examples of common "stop" tasks: - # * close sockets (unblocking blocking reads/accepts) - # * cleanup temporary files - # * terminate spawned threads + @stopping = true end end # class LogStash::Inputs::Vespa diff --git a/logstash-input-vespa/logstash-input-vespa.gemspec b/logstash-input-vespa/logstash-input-vespa.gemspec index c162ec9..8a9b489 100644 --- a/logstash-input-vespa/logstash-input-vespa.gemspec +++ b/logstash-input-vespa/logstash-input-vespa.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-input-vespa' - s.version = '0.1.0' + s.version = '0.2.0' s.licenses = ['Apache-2.0'] s.summary = "Logstash input plugin reading from Vespa" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"