Skip to content

Commit

Permalink
Support closing stream (sferik#877)
Browse files Browse the repository at this point in the history
* Support closing stream

In this version, we can close stream as following:

```ruby
client = Twitter::Streaming::Client.new do |config|
  # Set up configuration
end

t = Thread.new do
  client.filter(track: keyword) do |object|
    # process object
  end
end

Signal.trap(:TERM) do
  # terminate process silently
  client.close
  exit true
end
```

* Add Twitter::Streaming::Connection#stream spec

* Wait for connection

* Suppress rubocop warning

    lib/twitter/streaming/connection.rb:17:7: C: Method has too many lines. [13/10]
          def stream(request, response) ...
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* Use port number 8443 instead of 11443

Because rubocop reports following warnings:

    spec/twitter/streaming/connection_spec.rb:88:48: C: Use underscores(_) as decimal mark and separate every 3 digits with them.
        let(:client) {  TCPSocket.new('127.0.0.1', 11443) }
                                                   ^^^^^
    spec/twitter/streaming/connection_spec.rb:94:44: C: Use underscores(_) as decimal mark and separate every 3 digits with them.
          @server = TCPServer.new('127.0.0.1', 11443)
                                               ^^^^^
  • Loading branch information
okkez authored and sferik committed Jun 12, 2018
1 parent 91c037c commit 89e3543
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 3 deletions.
4 changes: 4 additions & 0 deletions lib/twitter/streaming/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def before_request(&block)
end
end

def close
@connection.close
end

private

def request(method, uri, params)
Expand Down
19 changes: 16 additions & 3 deletions lib/twitter/streaming/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,23 @@ def initialize(options = {})
@tcp_socket_class = options.fetch(:tcp_socket_class) { TCPSocket }
@ssl_socket_class = options.fetch(:ssl_socket_class) { OpenSSL::SSL::SSLSocket }
@using_ssl = options.fetch(:using_ssl) { false }
@write_pipe = nil
end

def stream(request, response)
def stream(request, response) # rubocop:disable MethodLength
client = connect(request)
request.stream(client)
while body = client.readpartial(1024) # rubocop:disable AssignmentInCondition
response << body
read_pipe, @write_pipe = IO.pipe
loop do
read_ios, _write_ios, _exception_ios = IO.select([read_pipe, client])
case read_ios.first
when client
response << client.readpartial(1024)
when read_pipe
break
end
end
client.close
end

def connect(request)
Expand All @@ -30,6 +39,10 @@ def connect(request)
ssl_client.connect
end

def close
@write_pipe.write('q') if @write_pipe
end

private

def new_tcp_socket(host, port)
Expand Down
37 changes: 37 additions & 0 deletions spec/twitter/streaming/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,41 @@ def <<(data); end
end
end
end

describe 'stream' do
subject(:connection) do
Twitter::Streaming::Connection.new(tcp_socket_class: DummyTCPSocket, ssl_socket_class: DummySSLSocket)
end

let(:method) { :get }
let(:uri) { 'https://stream.twitter.com:443/1.1/statuses/sample.json' }
let(:client) { TCPSocket.new('127.0.0.1', 8443) }

let(:request) { HTTP::Request.new(verb: method, uri: uri) }
let(:response) { DummyResponse.new {} }

before do
@server = TCPServer.new('127.0.0.1', 8443)
end

after do
@server.close
end

it 'close stream' do
expect(connection).to receive(:connect).with(request).and_return(client)
expect(request).to receive(:stream).with(client)

stream_closed = false
t = Thread.start do
connection.stream(request, response)
stream_closed = true
end
expect(stream_closed).to be false
sleep 1
connection.close
t.join
expect(stream_closed).to be true
end
end
end

0 comments on commit 89e3543

Please sign in to comment.