Added an asynchronous processing queue 🎉

This commit is contained in:
Chris Watson 2019-07-01 09:05:34 -07:00
parent f63fc41dd4
commit 4db9350336
No known key found for this signature in database
GPG Key ID: 37DAEF5F446370A4
4 changed files with 71 additions and 143 deletions

View File

@ -192,6 +192,7 @@ Arachnid has a ton of configration options which can be passed to the mehthods l
- **referer** - Referer to use - **referer** - Referer to use
- **fetch_delay** - Delay in between fetching resources - **fetch_delay** - Delay in between fetching resources
- **queue** - Preload the queue with urls - **queue** - Preload the queue with urls
- **fibers** - Maximum amount of fibers to spin up for asynchronous processing
- **history** - Links that should not be visited - **history** - Links that should not be visited
- **limit** - Maximum number of resources to visit - **limit** - Maximum number of resources to visit
- **max_depth** - Maximum crawl depth - **max_depth** - Maximum crawl depth

View File

@ -3,6 +3,7 @@ require "./agent/filters"
require "./agent/events" require "./agent/events"
require "./agent/actions" require "./agent/actions"
require "./agent/robots" require "./agent/robots"
require "./agent/queue"
require "./resource" require "./resource"
require "./session_cache" require "./session_cache"
require "./cookie_jar" require "./cookie_jar"
@ -44,7 +45,7 @@ module Arachnid
getter failures : Set(URI) getter failures : Set(URI)
# Queue of URLs to visit. # Queue of URLs to visit.
getter queue : Hash(String, URI) getter queue : Queue(URI)
# The session cache. # The session cache.
property sessions : SessionCache property sessions : SessionCache
@ -74,7 +75,8 @@ module Arachnid
user_agent : String? = nil, user_agent : String? = nil,
referer : String? = nil, referer : String? = nil,
fetch_delay : (Int32 | Time::Span)? = nil, fetch_delay : (Int32 | Time::Span)? = nil,
queue : Hash(String, URI)? = nil, queue : Array(URI)? = nil,
fibers : Int32? = nil,
history : Set(URI)? = nil, history : Set(URI)? = nil,
limit : Int32? = nil, limit : Int32? = nil,
max_depth : Int32? = nil, max_depth : Int32? = nil,
@ -94,7 +96,7 @@ module Arachnid
@fetch_delay = fetch_delay || 0 @fetch_delay = fetch_delay || 0
@history = history || Set(URI).new @history = history || Set(URI).new
@failures = Set(URI).new @failures = Set(URI).new
@queue = queue || {} of String => URI @queue = Queue(URI).new(queue, fibers)
@limit = limit @limit = limit
@levels = {} of URI => Int32 @levels = {} of URI => Int32
@ -163,47 +165,22 @@ module Arachnid
self self
end end
# Start spidering at a given URL.
# def start_at(url, &block : Resource ->)
# enqueue(url)
# run(&block)
# end
# Start spidering at a given URL. # Start spidering at a given URL.
def start_at(url, force = false) def start_at(url, force = false)
enqueue(url, force: force) enqueue(url, force: force)
return run return run
end end
# Start spidering until the queue becomes empty or the
# agent is paused.
# def run(&block : Resource ->)
# @running = true
# until @queue.empty? || paused? || limit_reached?
# begin
# visit_resource(dequeue, &block)
# rescue Actions::Paused
# return self
# rescue Actions::Action
# end
# end
# @running = false
# @sessions.clear
# self
# end
# Start spidering until the queue becomes empty or the # Start spidering until the queue becomes empty or the
# agent is paused. # agent is paused.
def run def run
@running = true @running = true
until @queue.empty? || paused? || limit_reached? || !running? @queue.run do |uri|
begin begin
visit_resource(dequeue) visit_resource(uri)
rescue Actions::Paused rescue Actions::Paused
return self @queue.pause!
rescue Actions::Action rescue Actions::Action
end end
end end
@ -259,11 +236,11 @@ module Arachnid
# Sets the queue of URLs to visit. # Sets the queue of URLs to visit.
# Sets the list of failed URLs. # Sets the list of failed URLs.
def queue=(new_queue) def queue=(new_queue : Array(URI))
@queue.clear @queue.clear
new_queue.each do |url| new_queue.each do |url|
@queue[queue_key(url)] = url @queue.enqueue(url)
end end
@queue @queue
@ -271,7 +248,7 @@ module Arachnid
# Determines whether the given URL has been queued for visiting. # Determines whether the given URL has been queued for visiting.
def queued?(key) def queued?(key)
@queue.has_key?(key) @queue.queued?(key)
end end
# Enqueues a given URL for visiting, only if it passes all # Enqueues a given URL for visiting, only if it passes all
@ -306,7 +283,7 @@ module Arachnid
rescue Actions::Action rescue Actions::Action
end end
@queue[queue_key(url)] = url @queue.enqueue(url)
@levels[url] = level @levels[url] = level
true true
end end
@ -317,8 +294,8 @@ module Arachnid
def get_resource(url, &block) def get_resource(url, &block)
url = url.is_a?(URI) ? url : URI.parse(url) url = url.is_a?(URI) ? url : URI.parse(url)
prepare_request(url) do |session, path, handlers| prepare_request(url) do |session, path, headers|
new_resource = Resource.new(url, session.get(path, headers: handlers)) new_resource = Resource.new(url, session.get(path, headers: headers))
# save any new cookies # save any new cookies
@cookies.from_resource(new_resource) @cookies.from_resource(new_resource)
@ -332,8 +309,8 @@ module Arachnid
def get_resource(url) def get_resource(url)
url = url.is_a?(URI) ? url : URI.parse(url) url = url.is_a?(URI) ? url : URI.parse(url)
prepare_request(url) do |session, path, handlers| prepare_request(url) do |session, path, headers|
new_resource = Resource.new(url, session.get(path, handlers)) new_resource = Resource.new(url, session.get(path, headers: headers))
# save any new cookies # save any new cookies
@cookies.from_resource(new_resource) @cookies.from_resource(new_resource)
@ -347,8 +324,8 @@ module Arachnid
def post_resource(url, post_data = "", &block) def post_resource(url, post_data = "", &block)
url = url.is_a?(URI) ? url : URI.parse(url) url = url.is_a?(URI) ? url : URI.parse(url)
prepare_request(url) do |session, path, handlers| prepare_request(url) do |session, path, headers|
new_resource = Resource.new(url, session.post(path, post_data, handlers)) new_resource = Resource.new(url, session.post(path, post_data, headers: headers))
# save any new cookies # save any new cookies
@cookies.from_resource(new_resource) @cookies.from_resource(new_resource)
@ -362,8 +339,8 @@ module Arachnid
def post_resource(url, post_data = "") def post_resource(url, post_data = "")
url = url.is_a?(URI) ? url : URI.parse(url) url = url.is_a?(URI) ? url : URI.parse(url)
prepare_request(url) do |session, path, handlers| prepare_request(url) do |session, path, headers|
new_resource = Resource.new(url, session.post(path, post_data, handlers)) new_resource = Resource.new(url, session.post(path, post_data, headers: headers))
# save any new cookies # save any new cookies
@cookies.from_resource(new_resource) @cookies.from_resource(new_resource)
@ -372,44 +349,6 @@ module Arachnid
end end
end end
# Visits a given URL and enqueues the links recovered
# from the resource to be visited later.
# def visit_resource(url, &block : Resource ->)
# url = sanitize_url(url)
# get_resource(url) do |resource|
# @history << resource.url
# begin
# @every_resource_blocks.each { |resource_block| resource_block.call(resource) }
# yield resource
# rescue action : Actions::Paused
# raise(action)
# rescue Actions::SkipResource
# return Nil
# rescue Actions::Action
# end
# resource.each_url do |next_url|
# begin
# @every_link_blocks.each do |link_block|
# link_block.call(resource.url, next_url)
# end
# rescue action : Actions::Paused
# raise(action)
# rescue Actions::SkipLink
# next
# rescue Actions::Action
# end
# if @max_depth.nil? || @max_depth.not_nil! > (@levels[url]? || 0)
# @levels[url] ||= 0
# enqueue(next_url, @levels[url] + 1)
# end
# end
# end
# end
# Visits a given URL and enqueues the links recovered # Visits a given URL and enqueues the links recovered
# from the resource to be visited later. # from the resource to be visited later.
def visit_resource(url) def visit_resource(url)
@ -507,7 +446,7 @@ module Arachnid
# Dequeues a URL that will later be visited. # Dequeues a URL that will later be visited.
def dequeue def dequeue
@queue.shift[1] @queue.dequeue
end end
# Determines if the maximum limit has been reached. # Determines if the maximum limit has been reached.
@ -536,9 +475,5 @@ module Arachnid
@every_failed_url_blocks.each { |fail_block| fail_block.call(url) } @every_failed_url_blocks.each { |fail_block| fail_block.call(url) }
true true
end end
private def queue_key(url)
url.to_s
end
end end
end end

View File

@ -19,9 +19,9 @@ module Arachnid
end end
# Continue spidering # Continue spidering
def continue!(&block) def continue!
@paused = false @paused = false
run(&block) @queue.resume
end end
# Sets the pause state of the agent. # Sets the pause state of the agent.

View File

@ -1,84 +1,76 @@
require "uri"
require "./actions"
require "benchmark"
module Arachnid module Arachnid
class Agent class Agent
class Queue # An asynchronous data queue using a pool of
# `Concurrent::Future` to allow for async
# fetching of multiple pages at once.
class Queue(T)
@queue : Array(URI) @queue : Array(T)
@pool_size : Int32 @max_pool_size : Int32
@exceptions : Array(Exception) @pool : Array(Concurrent::Future(Nil))
property mutex : Mutex @paused : Bool
def self.new(array = nil, pool_size = nil) @block : Proc(T, Void)?
array ||= [] of URI
pool_size ||= 10 delegate :clear, :empty?, to: @queue
new(array, pool_size, nil)
end # Create a new Queue
def initialize(queue : Array(T)? = nil, max_pool_size : Int32? = nil)
private def initialize(@queue : Array(URI), @pool_size : Int32, dummy) @queue = queue || [] of T
@mutex = Mutex.new @max_pool_size = max_pool_size || 10
@exceptions = [] of Exception @pool = [] of Concurrent::Future(Nil)
@paused = false
@block = nil
end end
# Add an item to the queue
def enqueue(item) def enqueue(item)
@queue << item @queue << item
end end
def clear private def dequeue
@queue.clear @queue.shift
end end
# See if an item is currently queued
def queued?(url) def queued?(url)
@queue.includes?(url) @queue.includes?(url)
end end
private def worker(item : URI, &block : URI ->) def pause!
signal_channel = Channel::Unbuffered(Actions::Action).new @paused = true
spawn do
begin
block.call(item)
rescue ex
signal_channel.send(Actions::SkipLink.new)
else
signal_channel.send(Actions::Action.new)
end
end end
signal_channel.receive_select_action def paused?
@paused
end end
def run(&block : URI ->) def resume!
pool_counter = 0 @paused = false
worker_channels = [] of Channel::ReceiveAction(Channel::Unbuffered(Actions::Action)) run(@block)
queue = @queue.each end
more_pools = true
# Run the queue, calling `block` for every item.
# Returns when the queue is empty.
def run(&block : T ->)
# Keep a reference to the block so we can resume
# after pausing.
@block = block
@paused = false
loop do loop do
break if !more_pools && worker_channels.empty? fut = future { block.call(dequeue) }
while pool_counter < @pool_size && more_pools if @pool.size < @max_pool_size
item = queue.next @pool << fut
else
if item.is_a?(Iterator::Stop::INSTANCE) @pool.shift.get
more_pools = false
break
end end
pool_counter += 1 break if @queue.empty? || @paused
worker_channels << worker(item.as(URI), &block)
end
index, signal_exception = Channel.select(worker_channels)
worker_channels.delete_at(index)
pool_counter -= 1
@exceptions << signal_exception if signal_exception && signal_exception.is_a?(Actions::SkipLink)
end end
end end
end end