Skip to content

Commit

Permalink
Added support for DLQ
Browse files Browse the repository at this point in the history
1. Used the same logic as elasticsearch output plugin to find out if dlq is enabled or not
[execution_context? && execution_context.dlq_writer? && execution_context.dlq_writer is not a dummy writer?]

2. if dlq is enabled, send it to dlq_writer or log and drop the the events otherwise.

Fixes logstash-plugins#109

Signed-off-by: RashmiRam <[email protected]>
  • Loading branch information
RashmiRam committed Mar 23, 2020
1 parent 2cca9dc commit 93542f9
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
28 changes: 27 additions & 1 deletion lib/logstash/outputs/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base
# If encountered as response codes this plugin will retry these requests
config :retryable_codes, :validate => :number, :list => true, :default => [429, 500, 502, 503, 504]

# If encountered as response codes, this plugin will write these events to DLQ
config :dlq_retryable_codes, :validate => :number, :list => true, :default => [400, 403, 404, 401]

# If you would like to consider some non-2xx codes to be successes
# enumerate them here. Responses returning these codes will be considered successes
config :ignorable_codes, :validate => :number, :list => true
Expand Down Expand Up @@ -97,7 +100,7 @@ def register
# tokens must be added back by the client on success
@request_tokens = SizedQueue.new(@pool_max)
@pool_max.times {|t| @request_tokens << true }

@dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil
@requests = Array.new

if @content_type.nil?
Expand Down Expand Up @@ -154,6 +157,15 @@ def log_error_response(response, url, event)
)
end

def write_to_dlq(url, event, response)
# To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior)
if @dlq_writer
@dlq_writer.write(event, "Sending #{response.code} erred HTTP request to DLQ, url: #{url}, response: #{response}")
else
log_error_response(response, url, event)
end
end

def send_events(events)
successes = java.util.concurrent.atomic.AtomicInteger.new(0)
failures = java.util.concurrent.atomic.AtomicInteger.new(0)
Expand Down Expand Up @@ -242,6 +254,9 @@ def send_event(event, attempt)
if retryable_response?(response)
log_retryable_response(response)
return :retry, event, attempt
elsif dlq_retryable_response?(response)
write_to_dlq(url, event, response)
return :failure, event, attempt
else
log_error_response(response, url, event)
return :failure, event, attempt
Expand Down Expand Up @@ -287,6 +302,10 @@ def retryable_response?(response)
@retryable_codes && @retryable_codes.include?(response.code)
end

def dlq_retryable_response?(response)
@dlq_retryable_codes && @dlq_retryable_codes.include?(response.code)
end

def retryable_exception?(exception)
RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) }
end
Expand Down Expand Up @@ -379,4 +398,11 @@ def validate_format!
end
end
end

def dlq_enabled?
# this is the only way to determine if current logstash is supporting a dlq and dlq is also enabled
# Reference: https://github.com/elastic/logstash/issues/8064
respond_to?(:execution_context) && execution_context.respond_to?(:dlq_writer) &&
!execution_context.dlq_writer.inner_writer.is_a?(::LogStash::Util::DummyDeadLetterQueueWriter)
end
end
57 changes: 55 additions & 2 deletions spec/outputs/http_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def self.retry_fail_count()
end

multiroute(%w(get post put patch delete), "/bad") do
self.class.last_request = request
[415, "YUP"]
end

multiroute(%w(get post put patch delete), "/dlq_bad") do
self.class.last_request = request
[400, "YUP"]
end
Expand Down Expand Up @@ -117,14 +122,23 @@ def sinatra_run_wait(app, opts)

let(:expected_method) { method.clone.to_sym }
let(:client) { subject.client }
let(:dlq_enabled) { false }
let(:dlq_writer) { double("dlq_writer") }
let(:execution_context) { double("execution_context") }

before do
allow(subject).to receive(:dlq_enabled?).with(any_args).and_return(dlq_enabled)
allow(subject).to receive(:execution_context).with(any_args).and_return(execution_context)
allow(execution_context).to receive(:dlq_writer).with(any_args).and_return(dlq_writer)
allow(dlq_writer).to receive(:write).with(any_args)

subject.register
allow(client).to receive(:send).
with(expected_method, url, anything).
and_call_original
allow(subject).to receive(:log_failure).with(any_args)
allow(subject).to receive(:log_retryable_response).with(any_args)
allow(subject).to receive(:write_to_dlq).with(any_args).and_call_original
end

context 'sending no events' do
Expand Down Expand Up @@ -165,21 +179,60 @@ def sinatra_run_wait(app, opts)
it "should log a failure" do
expect(subject).to have_received(:log_failure).with(any_args)
end

it "should not be sent to dlq" do
expect(subject).not_to have_received(:write_to_dlq).with(any_args)
end
end

context "with ignorable failing requests" do
let(:url) { "http://localhost:#{port}/bad"}
let(:verb_behavior_config) { super.merge("ignorable_codes" => [400]) }
let(:verb_behavior_config) { super.merge("ignorable_codes" => [415]) }

before do
subject.multi_receive([event])
end

it "should log a failure" do
it "should not log a failure" do
expect(subject).not_to have_received(:log_failure).with(any_args)
end

it "should not be sent to dlq" do
expect(subject).not_to have_received(:write_to_dlq).with(any_args)
end
end

context "with DLQ qualified failing requests" do
let(:url) { "http://localhost:#{port}/dlq_bad"}
let(:dlq_enabled) { true }
let(:verb_behavior_config) { super.merge("dlq_retryable_codes" => [400]) }

before do
subject.multi_receive([event])
end

it "should write to dlq" do
expect(subject).to have_received(:write_to_dlq).with(any_args)
end

it "should not log a failure" do
expect(subject).not_to have_received(:log_failure).with(any_args)
end
end

context "when DLQ is not enabled" do
let(:url) { "http://localhost:#{port}/dlq_bad"}
let(:verb_behavior_config) { super.merge("dlq_retryable_codes" => [400]) }

before do
subject.multi_receive([event])
end

it "should not send the event to the DLQ instead, instead log" do
expect(subject).to have_received(:log_failure).with(any_args)
end
end

context "with retryable failing requests" do
let(:url) { "http://localhost:#{port}/retry"}

Expand Down

0 comments on commit 93542f9

Please sign in to comment.