From 4db9350336e21fb7807540b4d8c2c6db32100dde Mon Sep 17 00:00:00 2001 From: Chris Watson Date: Mon, 1 Jul 2019 09:05:34 -0700 Subject: [PATCH] Added an asynchronous processing queue :tada: --- README.md | 1 + src/arachnid/agent.cr | 107 +++++++--------------------------- src/arachnid/agent/actions.cr | 4 +- src/arachnid/agent/queue.cr | 102 +++++++++++++++----------------- 4 files changed, 71 insertions(+), 143 deletions(-) diff --git a/README.md b/README.md index 45fcd9c..d80ba45 100644 --- a/README.md +++ b/README.md @@ -192,6 +192,7 @@ Arachnid has a ton of configration options which can be passed to the mehthods l - **referer** - Referer to use - **fetch_delay** - Delay in between fetching resources - **queue** - Preload the queue with urls +- **fibers** - Maximum amount of fibers to spin up for asynchronous processing - **history** - Links that should not be visited - **limit** - Maximum number of resources to visit - **max_depth** - Maximum crawl depth diff --git a/src/arachnid/agent.cr b/src/arachnid/agent.cr index 51a0fab..bf4ded2 100644 --- a/src/arachnid/agent.cr +++ b/src/arachnid/agent.cr @@ -3,6 +3,7 @@ require "./agent/filters" require "./agent/events" require "./agent/actions" require "./agent/robots" +require "./agent/queue" require "./resource" require "./session_cache" require "./cookie_jar" @@ -44,7 +45,7 @@ module Arachnid getter failures : Set(URI) # Queue of URLs to visit. - getter queue : Hash(String, URI) + getter queue : Queue(URI) # The session cache. property sessions : SessionCache @@ -74,7 +75,8 @@ module Arachnid user_agent : String? = nil, referer : String? = nil, fetch_delay : (Int32 | Time::Span)? = nil, - queue : Hash(String, URI)? = nil, + queue : Array(URI)? = nil, + fibers : Int32? = nil, history : Set(URI)? = nil, limit : Int32? = nil, max_depth : Int32? = nil, @@ -94,7 +96,7 @@ module Arachnid @fetch_delay = fetch_delay || 0 @history = history || Set(URI).new @failures = Set(URI).new - @queue = queue || {} of String => URI + @queue = Queue(URI).new(queue, fibers) @limit = limit @levels = {} of URI => Int32 @@ -163,47 +165,22 @@ module Arachnid self end - # Start spidering at a given URL. - # def start_at(url, &block : Resource ->) - # enqueue(url) - # run(&block) - # end - # Start spidering at a given URL. def start_at(url, force = false) enqueue(url, force: force) return run end - # Start spidering until the queue becomes empty or the - # agent is paused. - # def run(&block : Resource ->) - # @running = true - - # until @queue.empty? || paused? || limit_reached? - # begin - # visit_resource(dequeue, &block) - # rescue Actions::Paused - # return self - # rescue Actions::Action - # end - # end - - # @running = false - # @sessions.clear - # self - # end - # Start spidering until the queue becomes empty or the # agent is paused. def run @running = true - until @queue.empty? || paused? || limit_reached? || !running? + @queue.run do |uri| begin - visit_resource(dequeue) + visit_resource(uri) rescue Actions::Paused - return self + @queue.pause! rescue Actions::Action end end @@ -259,11 +236,11 @@ module Arachnid # Sets the queue of URLs to visit. # Sets the list of failed URLs. - def queue=(new_queue) + def queue=(new_queue : Array(URI)) @queue.clear new_queue.each do |url| - @queue[queue_key(url)] = url + @queue.enqueue(url) end @queue @@ -271,7 +248,7 @@ module Arachnid # Determines whether the given URL has been queued for visiting. def queued?(key) - @queue.has_key?(key) + @queue.queued?(key) end # Enqueues a given URL for visiting, only if it passes all @@ -306,7 +283,7 @@ module Arachnid rescue Actions::Action end - @queue[queue_key(url)] = url + @queue.enqueue(url) @levels[url] = level true end @@ -317,8 +294,8 @@ module Arachnid def get_resource(url, &block) url = url.is_a?(URI) ? url : URI.parse(url) - prepare_request(url) do |session, path, handlers| - new_resource = Resource.new(url, session.get(path, headers: handlers)) + prepare_request(url) do |session, path, headers| + new_resource = Resource.new(url, session.get(path, headers: headers)) # save any new cookies @cookies.from_resource(new_resource) @@ -332,8 +309,8 @@ module Arachnid def get_resource(url) url = url.is_a?(URI) ? url : URI.parse(url) - prepare_request(url) do |session, path, handlers| - new_resource = Resource.new(url, session.get(path, handlers)) + prepare_request(url) do |session, path, headers| + new_resource = Resource.new(url, session.get(path, headers: headers)) # save any new cookies @cookies.from_resource(new_resource) @@ -347,8 +324,8 @@ module Arachnid def post_resource(url, post_data = "", &block) url = url.is_a?(URI) ? url : URI.parse(url) - prepare_request(url) do |session, path, handlers| - new_resource = Resource.new(url, session.post(path, post_data, handlers)) + prepare_request(url) do |session, path, headers| + new_resource = Resource.new(url, session.post(path, post_data, headers: headers)) # save any new cookies @cookies.from_resource(new_resource) @@ -362,8 +339,8 @@ module Arachnid def post_resource(url, post_data = "") url = url.is_a?(URI) ? url : URI.parse(url) - prepare_request(url) do |session, path, handlers| - new_resource = Resource.new(url, session.post(path, post_data, handlers)) + prepare_request(url) do |session, path, headers| + new_resource = Resource.new(url, session.post(path, post_data, headers: headers)) # save any new cookies @cookies.from_resource(new_resource) @@ -372,44 +349,6 @@ module Arachnid end end - # Visits a given URL and enqueues the links recovered - # from the resource to be visited later. - # def visit_resource(url, &block : Resource ->) - # url = sanitize_url(url) - - # get_resource(url) do |resource| - # @history << resource.url - - # begin - # @every_resource_blocks.each { |resource_block| resource_block.call(resource) } - # yield resource - # rescue action : Actions::Paused - # raise(action) - # rescue Actions::SkipResource - # return Nil - # rescue Actions::Action - # end - - # resource.each_url do |next_url| - # begin - # @every_link_blocks.each do |link_block| - # link_block.call(resource.url, next_url) - # end - # rescue action : Actions::Paused - # raise(action) - # rescue Actions::SkipLink - # next - # rescue Actions::Action - # end - - # if @max_depth.nil? || @max_depth.not_nil! > (@levels[url]? || 0) - # @levels[url] ||= 0 - # enqueue(next_url, @levels[url] + 1) - # end - # end - # end - # end - # Visits a given URL and enqueues the links recovered # from the resource to be visited later. def visit_resource(url) @@ -507,7 +446,7 @@ module Arachnid # Dequeues a URL that will later be visited. def dequeue - @queue.shift[1] + @queue.dequeue end # Determines if the maximum limit has been reached. @@ -536,9 +475,5 @@ module Arachnid @every_failed_url_blocks.each { |fail_block| fail_block.call(url) } true end - - private def queue_key(url) - url.to_s - end end end diff --git a/src/arachnid/agent/actions.cr b/src/arachnid/agent/actions.cr index bba9ad2..ee38918 100644 --- a/src/arachnid/agent/actions.cr +++ b/src/arachnid/agent/actions.cr @@ -19,9 +19,9 @@ module Arachnid end # Continue spidering - def continue!(&block) + def continue! @paused = false - run(&block) + @queue.resume end # Sets the pause state of the agent. diff --git a/src/arachnid/agent/queue.cr b/src/arachnid/agent/queue.cr index 10954c6..879aa1b 100644 --- a/src/arachnid/agent/queue.cr +++ b/src/arachnid/agent/queue.cr @@ -1,84 +1,76 @@ -require "uri" -require "./actions" -require "benchmark" - module Arachnid class Agent - class Queue + # An asynchronous data queue using a pool of + # `Concurrent::Future` to allow for async + # fetching of multiple pages at once. + class Queue(T) - @queue : Array(URI) + @queue : Array(T) - @pool_size : Int32 + @max_pool_size : Int32 - @exceptions : Array(Exception) + @pool : Array(Concurrent::Future(Nil)) - property mutex : Mutex + @paused : Bool - def self.new(array = nil, pool_size = nil) - array ||= [] of URI - pool_size ||= 10 - new(array, pool_size, nil) - end - - private def initialize(@queue : Array(URI), @pool_size : Int32, dummy) - @mutex = Mutex.new - @exceptions = [] of Exception + @block : Proc(T, Void)? + + delegate :clear, :empty?, to: @queue + + # Create a new Queue + def initialize(queue : Array(T)? = nil, max_pool_size : Int32? = nil) + @queue = queue || [] of T + @max_pool_size = max_pool_size || 10 + @pool = [] of Concurrent::Future(Nil) + @paused = false + @block = nil end + # Add an item to the queue def enqueue(item) @queue << item end - def clear - @queue.clear + private def dequeue + @queue.shift end + # See if an item is currently queued def queued?(url) @queue.includes?(url) end - private def worker(item : URI, &block : URI ->) - signal_channel = Channel::Unbuffered(Actions::Action).new - - spawn do - begin - block.call(item) - rescue ex - signal_channel.send(Actions::SkipLink.new) - else - signal_channel.send(Actions::Action.new) - end - end - - signal_channel.receive_select_action + def pause! + @paused = true end - def run(&block : URI ->) - pool_counter = 0 - worker_channels = [] of Channel::ReceiveAction(Channel::Unbuffered(Actions::Action)) - queue = @queue.each - more_pools = true + def paused? + @paused + end + + def resume! + @paused = false + run(@block) + end + + # Run the queue, calling `block` for every item. + # Returns when the queue is empty. + def run(&block : T ->) + # Keep a reference to the block so we can resume + # after pausing. + @block = block + @paused = false loop do - break if !more_pools && worker_channels.empty? + fut = future { block.call(dequeue) } - while pool_counter < @pool_size && more_pools - item = queue.next - - if item.is_a?(Iterator::Stop::INSTANCE) - more_pools = false - break - end - - pool_counter += 1 - worker_channels << worker(item.as(URI), &block) + if @pool.size < @max_pool_size + @pool << fut + else + @pool.shift.get end - index, signal_exception = Channel.select(worker_channels) - worker_channels.delete_at(index) - pool_counter -= 1 - - @exceptions << signal_exception if signal_exception && signal_exception.is_a?(Actions::SkipLink) + break if @queue.empty? || @paused end end end