Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Persist inbound stripe webhooks #2972

Merged
merged 19 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions app/controllers/webhooks_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@

class WebhooksController < ApplicationController
def stripe
result = PaymentProviders::Stripe::HandleIncomingWebhookService.call(
result = InboundWebhooks::CreateService.call(
organization_id: params[:organization_id],
webhook_source: :stripe,
code: params[:code].presence,
body: request.body.read,
signature: request.headers['HTTP_STRIPE_SIGNATURE']
payload: request.body.read,
signature: request.headers["HTTP_STRIPE_SIGNATURE"],
event_type: params[:type]
)

unless result.success?
if result.error.is_a?(BaseService::ServiceFailure) && result.error.code == 'webhook_error'
return head(:bad_request)
end

result.raise_if_error!
end
return head(:bad_request) unless result.success?

head(:ok)
end
Expand Down
13 changes: 13 additions & 0 deletions app/jobs/clock/inbound_webhooks_cleanup_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

module Clock
class InboundWebhooksCleanupJob < ApplicationJob
include SentryCronConcern

queue_as "clock"

def perform
InboundWebhook.where("updated_at < ?", 90.days.ago).destroy_all
end
end
end
15 changes: 15 additions & 0 deletions app/jobs/clock/inbound_webhooks_retry_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

module Clock
class InboundWebhooksRetryJob < ApplicationJob
include SentryCronConcern

queue_as "clock"

def perform
InboundWebhook.retriable.find_each do |inbound_webhook|
InboundWebhooks::ProcessJob.perform_later(inbound_webhook:)
end
end
end
end
11 changes: 11 additions & 0 deletions app/jobs/inbound_webhooks/process_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module InboundWebhooks
class ProcessJob < ApplicationJob
queue_as :default

def perform(inbound_webhook:)
InboundWebhooks::ProcessService.call!(inbound_webhook:)
end
end
end
53 changes: 53 additions & 0 deletions app/models/inbound_webhook.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

class InboundWebhook < ApplicationRecord
WEBHOOK_PROCESSING_WINDOW = 2.hours

belongs_to :organization

validates :event_type, :payload, :source, :status, presence: true

STATUSES = {
pending: "pending",
processing: "processing",
succeeded: "succeeded",
failed: "failed"
}

enum :status, STATUSES

scope :retriable, -> { reprocessable.or(old_pending) }
scope :reprocessable, -> { processing.where("processing_at <= ?", WEBHOOK_PROCESSING_WINDOW.ago) }
scope :old_pending, -> { pending.where("created_at <= ?", WEBHOOK_PROCESSING_WINDOW.ago) }
ancorcruz marked this conversation as resolved.
Show resolved Hide resolved

def processing!
update!(status: :processing, processing_at: Time.zone.now)
end
end

# == Schema Information
#
# Table name: inbound_webhooks
#
# id :uuid not null, primary key
# code :string
# event_type :string not null
# payload :jsonb not null
# processing_at :datetime
# signature :string
# source :string not null
# status :enum default("pending"), not null
# created_at :datetime not null
# updated_at :datetime not null
# organization_id :uuid not null
#
# Indexes
#
# index_inbound_webhooks_on_organization_id (organization_id)
# index_inbound_webhooks_on_status_and_created_at (status,created_at) WHERE (status = 'pending'::inbound_webhook_status)
# index_inbound_webhooks_on_status_and_processing_at (status,processing_at) WHERE (status = 'processing'::inbound_webhook_status)
#
# Foreign Keys
#
# fk_rails_... (organization_id => organizations.id)
#
50 changes: 50 additions & 0 deletions app/services/inbound_webhooks/create_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# frozen_string_literal: true

module InboundWebhooks
class CreateService < BaseService
def initialize(organization_id:, webhook_source:, payload:, event_type:, code: nil, signature: nil)
@organization_id = organization_id
@webhook_source = webhook_source
@code = code
@payload = payload
@signature = signature
@event_type = event_type

super
end

def call
return validate_payload_result unless validate_payload_result.success?

inbound_webhook = InboundWebhook.create!(
organization_id:,
source: webhook_source,
code:,
payload:,
signature:,
event_type:
)

InboundWebhooks::ProcessJob.perform_later(inbound_webhook:)

result.inbound_webhook = inbound_webhook
result
rescue ActiveRecord::RecordInvalid => e
result.record_validation_failure!(record: e.record)
end

private

attr_reader :organization_id, :webhook_source, :code, :payload, :signature, :event_type

def validate_payload_result
@validate_payload_result ||= InboundWebhooks::ValidatePayloadService.call(
organization_id:,
code:,
payload:,
signature:,
webhook_source:
)
end
end
end
56 changes: 56 additions & 0 deletions app/services/inbound_webhooks/process_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

module InboundWebhooks
class ProcessService < BaseService
WEBHOOK_HANDLER_SERVICES = {
stripe: PaymentProviders::Stripe::HandleIncomingWebhookService
}

def initialize(inbound_webhook:)
@inbound_webhook = inbound_webhook

super
end

def call
return result if within_processing_window?
return result if inbound_webhook.failed?
return result if inbound_webhook.succeeded?

inbound_webhook.processing!
ancorcruz marked this conversation as resolved.
Show resolved Hide resolved

handler_result = handler_service_klass.call(inbound_webhook:)

unless handler_result.success?
inbound_webhook.failed!
return handler_result
end

inbound_webhook.succeeded!

result.inbound_webhook = inbound_webhook
result
rescue
inbound_webhook.failed!
raise
end

private

attr_reader :inbound_webhook

def handler_service_klass
WEBHOOK_HANDLER_SERVICES.fetch(webhook_source) do
raise NameError, "Invalid inbound webhook source: #{webhook_source}"
end
end

def webhook_source
inbound_webhook.source.to_sym
end

def within_processing_window?
inbound_webhook.processing? && inbound_webhook.processing_at > InboundWebhook::WEBHOOK_PROCESSING_WINDOW.ago
end
end
end
54 changes: 54 additions & 0 deletions app/services/inbound_webhooks/validate_payload_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

module InboundWebhooks
class ValidatePayloadService < BaseService
WEBHOOK_SOURCES = {
stripe: PaymentProviders::Stripe::ValidateIncomingWebhookService
}

def initialize(organization_id:, code:, payload:, webhook_source:, signature:)
@organization_id = organization_id
@code = code
@payload = payload
@signature = signature
@webhook_source = webhook_source&.to_sym

super
end

def call
return result.service_failure!(code: "webhook_error", message: "Invalid webhook source") unless webhook_source_valid?
return payment_provider_result unless payment_provider_result.success?

validate_webhook_payload_result
end

private

attr_reader :organization_id, :code, :payload, :signature, :webhook_source

def webhook_source_valid?
WEBHOOK_SOURCES.include?(webhook_source)
end

def validate_webhook_payload_result
WEBHOOK_SOURCES[webhook_source].call(
payload:,
signature:,
payment_provider:
)
end

def payment_provider
payment_provider_result.payment_provider
end

def payment_provider_result
@payment_provider_result ||= PaymentProviders::FindService.call(
organization_id:,
code:,
payment_provider_type: webhook_source.to_s
)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,37 @@
module PaymentProviders
module Stripe
class HandleIncomingWebhookService < BaseService
def initialize(organization_id:, body:, signature:, code: nil)
@organization_id = organization_id
@body = body
@signature = signature
@code = code
extend Forwardable

def initialize(inbound_webhook:)
@inbound_webhook = inbound_webhook

super
end

def call
payment_provider_result = PaymentProviders::FindService.call(
organization_id:,
code:,
payment_provider_type: "stripe"
)

return payment_provider_result unless payment_provider_result.success?

event = ::Stripe::Webhook.construct_event(
body,
signature,
payment_provider_result.payment_provider&.webhook_secret
)

PaymentProviders::Stripe::HandleEventJob.perform_later(
organization: payment_provider_result.payment_provider.organization,
event: event.to_json
organization:,
event: stripe_event.to_json
)

result.event = event
result.event = stripe_event
result
rescue JSON::ParserError
result.service_failure!(code: "webhook_error", message: "Invalid payload")
rescue ::Stripe::SignatureVerificationError
result.service_failure!(code: "webhook_error", message: "Invalid signature")
end

private

attr_reader :organization_id, :body, :signature, :code
def_delegators :@inbound_webhook, :organization, :payload

def stripe_event
@stripe_event ||= ::Stripe::Event.construct_from(json_payload)
end

def json_payload
JSON.parse(payload, symbolize_names: true)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

module PaymentProviders
module Stripe
class ValidateIncomingWebhookService < BaseService
def initialize(payload:, signature:, payment_provider:)
@payload = payload
@signature = signature
@provider = payment_provider

super
end

def call
::Stripe::Webhook::Signature.verify_header(
payload,
signature,
webhook_secret,
tolerance: ::Stripe::Webhook::DEFAULT_TOLERANCE
)

result
rescue ::Stripe::SignatureVerificationError
result.service_failure!(code: "webhook_error", message: "Invalid signature")
end

private

attr_reader :payload, :signature, :provider

def webhook_secret
provider.webhook_secret
end
end
end
end
Loading
Loading