diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index e142f6c..4d6e77f 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -72,6 +72,7 @@ def initialize(opts = {}) def connected? + return @write_loop_thread.alive? && @connected if @write_loop_thread # for producers @connected end diff --git a/lib/nsq/producer.rb b/lib/nsq/producer.rb index fa3e471..823587e 100644 --- a/lib/nsq/producer.rb +++ b/lib/nsq/producer.rb @@ -11,22 +11,24 @@ def initialize(opts = {}) @ssl_context = opts[:ssl_context] @tls_options = opts[:tls_options] @tls_v1 = opts[:tls_v1] + @mx = Mutex.new - nsqlookupds = [] + @nsqlookupds = [] if opts[:nsqlookupd] - nsqlookupds = [opts[:nsqlookupd]].flatten + @nsqlookupds = [opts[:nsqlookupd]].flatten discover_repeatedly( - nsqlookupds: nsqlookupds, + nsqlookupds: @nsqlookupds, interval: @discovery_interval ) elsif opts[:nsqd] - nsqds = [opts[:nsqd]].flatten - nsqds.each{|d| add_connection(d)} + @nsqds = [opts[:nsqd]].flatten + @nsqds.each{|d| add_connection(d)} else add_connection('127.0.0.1:4150') end + update_fork_pid end def write(*raw_messages) @@ -75,8 +77,35 @@ def deferred_write_to_topic(topic, delay, *raw_messages) end end + def reconnect_on_fork + if @nsqlookupds.any? + if @discovery_thread && !@discovery_thread.alive? + @mx.synchronize do + break if @discovery_thread.alive? + debug "Discovery thread is dead; terminating connections and restarting discovery loop" + terminate + discover_repeatedly( + nsqlookupds: @nsqlookupds, + interval: @discovery_interval + ) + update_fork_pid + end + end + elsif forked? + @mx.synchronize do + break unless forked? + debug "Fork detected - recreating connections" + terminate + @nsqds.each{|d| add_connection(d)} + update_fork_pid + end + end + end + private def connection_for_write + reconnect_on_fork + # Choose a random Connection that's currently connected # Or, if there's nothing connected, just take any random one connections_currently_connected = connections.select{|_,c| c.connected?} @@ -90,5 +119,12 @@ def connection_for_write connection end + def forked? + Process.pid != @fork_pid + end + + def update_fork_pid + @fork_pid = Process.pid + end end end