Use new twitter gem streaming support
Made more complicated by the fact that this is not inherently eventmachine-based, unlike tweetstream
This commit is contained in:
parent
b73594b89d
commit
e646e24744
1 changed files with 96 additions and 40 deletions
|
@ -1,8 +1,49 @@
|
||||||
#!/usr/bin/env ruby
|
#!/usr/bin/env ruby
|
||||||
# encoding: utf-8
|
# encoding: utf-8
|
||||||
require 'twitter'
|
require 'twitter'
|
||||||
require 'tweetstream'
|
|
||||||
require 'rufus/scheduler'
|
require 'rufus/scheduler'
|
||||||
|
require 'eventmachine'
|
||||||
|
|
||||||
|
# Wrap SSLSocket so that readpartial yields the fiber instead of
|
||||||
|
# blocking when there is no data
|
||||||
|
#
|
||||||
|
# We hand this to the twitter library so we can select on the sockets
|
||||||
|
# and thus run multiple streams without them blocking
|
||||||
|
class FiberSSLSocket
|
||||||
|
def initialize(*args)
|
||||||
|
@socket = OpenSSL::SSL::SSLSocket.new(*args)
|
||||||
|
end
|
||||||
|
|
||||||
|
def readpartial(maxlen)
|
||||||
|
data = ""
|
||||||
|
|
||||||
|
loop do
|
||||||
|
begin
|
||||||
|
data = @socket.read_nonblock(maxlen)
|
||||||
|
rescue IO::WaitReadable
|
||||||
|
end
|
||||||
|
break if data.length > 0
|
||||||
|
Fiber.yield(@socket)
|
||||||
|
end
|
||||||
|
|
||||||
|
data
|
||||||
|
end
|
||||||
|
|
||||||
|
def method_missing(m, *args)
|
||||||
|
@socket.send(m, *args)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# An EventMachine handler which resumes a fiber on incoming data
|
||||||
|
class FiberSocketHandler < EventMachine::Connection
|
||||||
|
def initialize(fiber)
|
||||||
|
@fiber = fiber
|
||||||
|
end
|
||||||
|
|
||||||
|
def notify_readable
|
||||||
|
@fiber.resume
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
module Ebooks
|
module Ebooks
|
||||||
class Bot
|
class Bot
|
||||||
|
@ -36,58 +77,51 @@ module Ebooks
|
||||||
end
|
end
|
||||||
|
|
||||||
def configure
|
def configure
|
||||||
TweetStream.configure do |config|
|
|
||||||
config.consumer_key = @consumer_key
|
|
||||||
config.consumer_secret = @consumer_secret
|
|
||||||
config.oauth_token = @oauth_token
|
|
||||||
config.oauth_token_secret = @oauth_token_secret
|
|
||||||
end
|
|
||||||
|
|
||||||
@twitter = Twitter::REST::Client.new do |config|
|
@twitter = Twitter::REST::Client.new do |config|
|
||||||
config.consumer_key = @consumer_key
|
config.consumer_key = @consumer_key
|
||||||
config.consumer_secret = @consumer_secret
|
config.consumer_secret = @consumer_secret
|
||||||
config.oauth_token = @oauth_token
|
config.access_token = @oauth_token
|
||||||
config.oauth_token_secret = @oauth_token_secret
|
config.access_token_secret = @oauth_token_secret
|
||||||
end
|
end
|
||||||
|
|
||||||
needs_stream = [@on_follow, @on_message, @on_mention, @on_timeline].any? {|e| !e.nil?}
|
needs_stream = [@on_follow, @on_message, @on_mention, @on_timeline].any? {|e| !e.nil?}
|
||||||
|
|
||||||
@stream = TweetStream::Client.new if needs_stream
|
if needs_stream
|
||||||
end
|
@stream = Twitter::Streaming::Client.new(
|
||||||
|
ssl_socket_class: FiberSSLSocket
|
||||||
# Connects to tweetstream and opens event handlers for this bot
|
) do |config|
|
||||||
def start
|
config.consumer_key = @consumer_key
|
||||||
configure
|
config.consumer_secret = @consumer_secret
|
||||||
|
config.access_token = @oauth_token
|
||||||
@on_startup.call if @on_startup
|
config.access_token_secret = @oauth_token_secret
|
||||||
|
end
|
||||||
if not @stream
|
end
|
||||||
log "not bothering with stream for #@username"
|
|
||||||
return
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def start_stream
|
||||||
log "starting stream for #@username"
|
log "starting stream for #@username"
|
||||||
@stream.on_error do |msg|
|
@stream.before_request do
|
||||||
log "ERROR: #{msg}"
|
|
||||||
end
|
|
||||||
|
|
||||||
@stream.on_inited do
|
|
||||||
log "Online!"
|
log "Online!"
|
||||||
end
|
end
|
||||||
|
|
||||||
@stream.on_event(:follow) do |event|
|
@stream.user do |ev|
|
||||||
next if event[:source][:screen_name] == @username
|
p ev
|
||||||
log "Followed by #{event[:source][:screen_name]}"
|
|
||||||
@on_follow.call(event[:source]) if @on_follow
|
if ev.is_a? Twitter::DirectMessage
|
||||||
|
next if ev.sender.screen_name == @username # Don't reply to self
|
||||||
|
log "DM from @#{ev.sender.screen_name}: #{ev.text}"
|
||||||
|
@on_message.call(ev) if @on_message
|
||||||
end
|
end
|
||||||
|
|
||||||
@stream.on_direct_message do |dm|
|
next unless ev.respond_to? :name
|
||||||
next if dm[:sender][:screen_name] == @username # Don't reply to self
|
|
||||||
log "DM from @#{dm[:sender][:screen_name]}: #{dm[:text]}"
|
if ev.name == :follow
|
||||||
@on_message.call(dm) if @on_message
|
next if ev.source.screen_name == @username
|
||||||
|
log "Followed by #{ev.source.screen_name}"
|
||||||
|
@on_follow.call(ev.source) if @on_follow
|
||||||
end
|
end
|
||||||
|
|
||||||
@stream.userstream do |ev|
|
|
||||||
next unless ev.text # If it's not a text-containing tweet, ignore it
|
next unless ev.text # If it's not a text-containing tweet, ignore it
|
||||||
next if ev.user.screen_name == @username # Ignore our own tweets
|
next if ev.user.screen_name == @username # Ignore our own tweets
|
||||||
|
|
||||||
|
@ -108,7 +142,7 @@ module Ebooks
|
||||||
end
|
end
|
||||||
rescue Exception
|
rescue Exception
|
||||||
p ev.attrs[:entities][:user_mentions]
|
p ev.attrs[:entities][:user_mentions]
|
||||||
p ev[:text]
|
p ev.text
|
||||||
raise
|
raise
|
||||||
end
|
end
|
||||||
meta[:mentionless] = mless
|
meta[:mentionless] = mless
|
||||||
|
@ -126,6 +160,27 @@ module Ebooks
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Connects to tweetstream and opens event handlers for this bot
|
||||||
|
def start
|
||||||
|
configure
|
||||||
|
|
||||||
|
@on_startup.call if @on_startup
|
||||||
|
|
||||||
|
if not @stream
|
||||||
|
log "not bothering with stream for #@username"
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
fiber = Fiber.new do
|
||||||
|
start_stream
|
||||||
|
end
|
||||||
|
|
||||||
|
socket = fiber.resume
|
||||||
|
|
||||||
|
conn = EM.watch socket.io, FiberSocketHandler, fiber
|
||||||
|
conn.notify_readable = true
|
||||||
|
end
|
||||||
|
|
||||||
# Wrapper for EM.add_timer
|
# Wrapper for EM.add_timer
|
||||||
# Delays add a greater sense of humanity to bot behaviour
|
# Delays add a greater sense of humanity to bot behaviour
|
||||||
def delay(time, &b)
|
def delay(time, &b)
|
||||||
|
@ -136,11 +191,12 @@ module Ebooks
|
||||||
# Reply to a tweet or a DM.
|
# Reply to a tweet or a DM.
|
||||||
# Applies configurable @reply_delay range
|
# Applies configurable @reply_delay range
|
||||||
def reply(ev, text, opts={})
|
def reply(ev, text, opts={})
|
||||||
|
p "reply???"
|
||||||
opts = opts.clone
|
opts = opts.clone
|
||||||
|
|
||||||
if ev.is_a? Twitter::DirectMessage
|
if ev.is_a? Twitter::DirectMessage
|
||||||
log "Sending DM to @#{ev[:sender][:screen_name]}: #{text}"
|
log "Sending DM to @#{ev.sender.screen_name}: #{text}"
|
||||||
@twitter.direct_message_create(ev[:sender][:screen_name], text, opts)
|
@twitter.create_direct_message(ev.sender.screen_name, text, opts)
|
||||||
elsif ev.is_a? Twitter::Tweet
|
elsif ev.is_a? Twitter::Tweet
|
||||||
log "Replying to @#{ev.user.screen_name} with: #{text}"
|
log "Replying to @#{ev.user.screen_name} with: #{text}"
|
||||||
@twitter.update(text, in_reply_to_status_id: ev.id)
|
@twitter.update(text, in_reply_to_status_id: ev.id)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue