Threading!

This commit is contained in:
Jaiden Mispy 2014-11-18 13:24:59 +11:00
parent 29beb23502
commit b72a6db0e1
3 changed files with 31 additions and 62 deletions

View file

@ -1,7 +1,7 @@
$debug = false $debug = false
def log(*args) def log(*args)
STDERR.puts args.map(&:to_s).join(' ') STDERR.print args.map(&:to_s).join(' ') + "\n"
STDERR.flush STDERR.flush
end end

View file

@ -1,51 +1,8 @@
#!/usr/bin/env ruby
# encoding: utf-8 # encoding: utf-8
require 'twitter' require 'twitter'
require 'rufus/scheduler' require 'rufus/scheduler'
require 'eventmachine'
module Ebooks module Ebooks
# Wrap SSLSocket so that readpartial yields the fiber instead of
# blocking when there is no data
#
# We hand this to the twitter library so we can select on the sockets
# and thus run multiple streams without them blocking
class FiberSSLSocket
def initialize(*args)
@socket = OpenSSL::SSL::SSLSocket.new(*args)
end
def readpartial(maxlen)
data = ""
loop do
begin
data = @socket.read_nonblock(maxlen)
rescue IO::WaitReadable
end
break if data.length > 0
Fiber.yield(@socket)
end
data
end
def method_missing(m, *args)
@socket.send(m, *args)
end
end
# An EventMachine handler which resumes a fiber on incoming data
class FiberSocketHandler < EventMachine::Connection
def initialize(fiber)
@fiber = fiber
end
def notify_readable
@fiber.resume
end
end
class ConfigurationError < Exception class ConfigurationError < Exception
end end
@ -106,7 +63,7 @@ module Ebooks
attr_accessor :consumer_key, :consumer_secret, attr_accessor :consumer_key, :consumer_secret,
:access_token, :access_token_secret :access_token, :access_token_secret
attr_reader :twitter, :stream attr_reader :twitter, :stream, :thread
# Configuration # Configuration
attr_accessor :username, :delay_range, :blacklist attr_accessor :username, :delay_range, :blacklist
@ -119,7 +76,7 @@ module Ebooks
end end
def log(*args) def log(*args)
STDOUT.puts "@#{@username}: " + args.map(&:to_s).join(' ') STDOUT.print "@#{@username}: " + args.map(&:to_s).join(' ') + "\n"
STDOUT.flush STDOUT.flush
end end
@ -154,9 +111,7 @@ module Ebooks
config.access_token_secret = @access_token_secret config.access_token_secret = @access_token_secret
end end
@stream = Twitter::Streaming::Client.new( @stream = Twitter::Streaming::Client.new do |config|
ssl_socket_class: FiberSSLSocket
) do |config|
config.consumer_key = @consumer_key config.consumer_key = @consumer_key
config.consumer_secret = @consumer_secret config.consumer_secret = @consumer_secret
config.access_token = @access_token config.access_token = @access_token
@ -239,14 +194,13 @@ module Ebooks
end end
def start_stream def start_stream
log "starting stream for #@username" log "starting tweet stream"
@stream.user do |ev| @stream.user do |ev|
receive_event ev receive_event ev
end end
end end
# Connects to tweetstream and opens event handlers for this bot def prepare
def start
# Sanity check # Sanity check
if @username.nil? if @username.nil?
raise ConfigurationError, "bot.username cannot be nil" raise ConfigurationError, "bot.username cannot be nil"
@ -254,15 +208,11 @@ module Ebooks
make_client make_client
fire(:startup) fire(:startup)
end
fiber = Fiber.new do # Connects to tweetstream and opens event handlers for this bot
start_stream def start
end start_stream
socket = fiber.resume
conn = EM.watch socket.io, FiberSocketHandler, fiber
conn.notify_readable = true
end end
# Fire an event # Fire an event

View file

@ -4,14 +4,33 @@
require 'json' require 'json'
require 'set' require 'set'
require 'digest/md5' require 'digest/md5'
require 'fileutils'
require 'csv' require 'csv'
module Ebooks module Ebooks
class Model class Model
attr_accessor :hash, :tokens, :sentences, :mentions, :keywords attr_accessor :hash, :tokens, :sentences, :mentions, :keywords
def self.consume(txtpath) # Consume a corpus file to create a model
Model.new.consume(txtpath) # @param corpus_path Path to a json, text or csv file to consume
# @param cache Optional path to a directory to store cached models
def self.consume(corpus_path, cache: nil)
if cache
FileUtils::mkdir_p cache
cache_path = File.join(cache, Digest::MD5.file(corpus_path).to_s)
if File.exists?(cache_path)
log "Reading model from cache at #{cache_path}"
return Model.load(cache_path)
end
end
model = Model.new.consume(corpus_path)
if cache
log "Caching model at #{cache_path}"
model.save(cache_path)
end
end end
def self.consume_all(paths) def self.consume_all(paths)