Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

input plugin MVP #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

This repository contains plugins to both read from and write to Vespa by using Logstash.

Check out the README.md files in the respective plugin directories for more information.
Check out the README.md files in the respective plugin directories for more information.

This repository doesn't have issues enabled, but feel free to open related issues at https://github.com/vespa-engine/vespa
39 changes: 39 additions & 0 deletions logstash-input-vespa/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,45 @@ input {
}
}

output {
stdout {}
}
```

With all the options:
```
input {
vespa {
# Vespa endpoint
vespa_url => "http://localhost:8080"

# cluster name from services.xml
cluster => "test_cluster"

# mTLS certificate and key
client_cert => "/Users/myuser/.vespa/mytenant.myapp.default/data-plane-public-cert.pem"
client_key => "/Users/myuser/.vespa/mytenant.myapp.default/data-plane-private-key.pem"

# page size
page_size => 100

# Backend concurrency
backend_concurrency => 1

# Selection statement
selection => "doc AND id.namespace == 'open'"

# HTTP request timeout
timeout => 180

# lower timestamp bound (microseconds since epoch)
from_timestamp => 1600000000000000

# upper timestamp bound (microseconds since epoch)
to_timestamp => 1800000000000000
}
}

output {
stdout {}
}
Expand Down
64 changes: 64 additions & 0 deletions logstash-input-vespa/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,70 @@ URL to the Vespa instance.

The cluster name to be used while visiting: https://docs.vespa.ai/en/reference/document-v1-api-reference.html#visit

[id="plugins-{type}s-{plugin}-client_cert"]
===== `client_cert`

* Value type is <<string,string>>
* Default value is `nil`

Path to the client certificate file for mTLS.

[id="plugins-{type}s-{plugin}-client_key"]
===== `client_key`

* Value type is <<string,string>>
* Default value is `nil`

Path to the client key file for mTLS.

[id="plugins-{type}s-{plugin}-page_size"]
===== `page_size`

* Value type is <<number,number>>
* Default value is `100`

Desired page size for the visit request, i.e. the wantedDocumentCount parameter.

[id="plugins-{type}s-{plugin}-backend_concurrency"]
===== `backend_concurrency`

* Value type is <<number,number>>
* Default value is `1`

Backend concurrency for the visit request, i.e. the concurrency parameter.

[id="plugins-{type}s-{plugin}-selection"]
===== `selection`

* Value type is <<string,string>>
* Default value is `nil`

Selection. A query in Vespa selector language.

[id="plugins-{type}s-{plugin}-timeout"]
===== `timeout`

* Value type is <<number,number>>
* Default value is `180`

Timeout for each HTTP request.

[id="plugins-{type}s-{plugin}-from_timestamp"]
===== `from_timestamp`

* Value type is <<number,number>>
* Default value is `nil`

Lower timestamp limit for the visit request, i.e. the fromTimestamp parameter.
Microseconds since epoch.

[id="plugins-{type}s-{plugin}-to_timestamp"]
===== `to_timestamp`

* Value type is <<number,number>>
* Default value is `nil`

Upper timestamp limit for the visit request, i.e. the toTimestamp parameter.

[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]
Expand Down
138 changes: 105 additions & 33 deletions logstash-input-vespa/lib/logstash/inputs/vespa.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "net/http"
require "uri"
require "json"
require "openssl"

# This is the logstash vespa input plugin. It is used to read from Vespa
# via Visit : https://docs.vespa.ai/en/reference/document-v1-api-reference.html#visit
Expand All @@ -21,28 +22,71 @@ class LogStash::Inputs::Vespa < LogStash::Inputs::Base
# The cluster parameter to use in the request.
config :cluster, :validate => :string, :required => true

# Path to the client certificate file for mTLS.
config :client_cert, :validate => :path

# Path to the client key file for mTLS.
config :client_key, :validate => :path

# desired page size for the visit request, i.e. the wantedDocumentCount parameter
config :page_size, :validate => :number, :default => 100

# backend concurrency for the visit request, i.e. the concurrency parameter
config :backend_concurrency, :validate => :number, :default => 1

# selection. A query in Vespa selector language
config :selection, :validate => :string

# timeout for each HTTP request
config :timeout, :validate => :number, :default => 180

# lower timestamp limit for the visit request, i.e. the fromTimestamp parameter
# microseconds since epoch
config :from_timestamp, :validate => :number

# upper timestamp limit for the visit request, i.e. the toTimestamp parameter
config :to_timestamp, :validate => :number

public
def register
# nothing to do here
if @client_cert != nil
@cert = OpenSSL::X509::Certificate.new(File.read(@client_cert))
end
if @client_key != nil
@key = OpenSSL::PKey::RSA.new(File.read(@client_key))
end

if @client_cert.nil? ^ @client_key.nil?
raise LogStash::ConfigurationError, "Both client_cert and client_key must be set, you can't have just one"
end

@uri_params = {
:cluster => @cluster,
:wantedDocumentCount => @page_size,
:concurrency => @backend_concurrency,
:timeout => @timeout
}

if @selection != nil
@uri_params[:selection] = @selection
end

if @from_timestamp != nil
@uri_params[:fromTimestamp] = @from_timestamp
end

if @to_timestamp != nil
@uri_params[:toTimestamp] = @to_timestamp
end
end # def register

def run(queue)
uri = URI.parse("#{@vespa_url}/document/v1/?cluster=#{@cluster}")
uri = URI.parse("#{@vespa_url}/document/v1/")
uri.query = URI.encode_www_form(@uri_params)
continuation = nil

loop do

if continuation != nil
uri.query = URI.encode_www_form({:cluster => @cluster, :continuation => continuation})
end

# TODO we'll want to retry here
begin
response = Net::HTTP.get_response(uri)
rescue => e
@logger.error("Failed to make HTTP request to Vespa", :error => e.message)
break
end
response = fetch_documents_from_vespa(uri)
# response should look like:
# {
# "pathId":"/document/v1/","documents":[
Expand All @@ -52,42 +96,70 @@ def run(queue)
# }

if response.is_a?(Net::HTTPSuccess)
begin
response_parsed = JSON.parse(response.body)
rescue JSON::ParserError => e
@logger.error("Failed to parse JSON response", :error => e.message)
break
end
response_parsed = parse_response(response)
break unless response_parsed

document_count = response_parsed["documentCount"]
# record the continuation token for the next request (if it exists)
continuation = response_parsed["continuation"]
documents = response_parsed["documents"]

documents.each do |document|
# TODO we need to also get the document ID, namespace and doctype
event = LogStash::Event.new(document["fields"])
decorate(event)
queue << event
end # documents.each
process_documents(documents, queue)

# Exit the loop if there are no more documents to process
break if document_count == 0
if continuation != nil
uri.query = URI.encode_www_form(@uri_params.merge({:continuation => continuation}))
else
@logger.info("No continuation ID => no more documents to fetch from Vespa")
break
end

if @stopping
@logger.info("Stopping Vespa input")
break
end

else
@logger.error("Failed to fetch documents from Vespa",
@logger.error("Failed to fetch documents from Vespa", :request => uri.to_s,
:response_code => response.code, :response_message => response.message)
break # TODO retry? Only on certain codes?
end # if response.is_a?(Net::HTTPSuccess)

end # loop do
end # def run

def fetch_documents_from_vespa(uri)
http = Net::HTTP.new(uri.host, uri.port)
if uri.scheme == "https"
http.use_ssl = true
http.cert = @cert
http.key = @key
http.verify_mode = OpenSSL::SSL::VERIFY_PEER
end

request = Net::HTTP::Get.new(uri.request_uri)
http.request(request)
rescue => e
@logger.error("Failed to make HTTP request to Vespa", :error => e.message)
nil
end # def fetch_documents_from_vespa

def parse_response(response)
JSON.parse(response.body)
rescue JSON::ParserError => e
@logger.error("Failed to parse JSON response", :error => e.message)
nil
end # def parse_response

def process_documents(documents, queue)
documents.each do |document|
event = LogStash::Event.new(document)
decorate(event)
queue << event
end
end # def process_documents

def stop
# TODO
# examples of common "stop" tasks:
# * close sockets (unblocking blocking reads/accepts)
# * cleanup temporary files
# * terminate spawned threads
@stopping = true
end
end # class LogStash::Inputs::Vespa
2 changes: 1 addition & 1 deletion logstash-input-vespa/logstash-input-vespa.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-vespa'
s.version = '0.1.0'
s.version = '0.2.0'
s.licenses = ['Apache-2.0']
s.summary = "Logstash input plugin reading from Vespa"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down