如何用Sbroker在Elixir中构建一个反压队列系统

栏目: 编程语言 · 发布时间: 5年前

内容简介:在PSPDFKit,我们使用sbroker这个Erlang库提供了用于创建池和(或)负载调节器的构建模块。 它使用让我们看一个如何在Elixir应用程序中使用sbroker库的简单示例。

在PSPDFKit,我们使用 Elixir 构建可靠且高性能的分布式系统。 在这样的系统中,我们经常需要调用外部服务,同时异步消息传递允许客户端进行这些调用而不必等待响应。 无法立即处理的消息将排队并稍后处理,但是当队列过载时会发生什么呢? 由于我们不希望系统崩溃,因此我们必须使用反压机制,以防止队列无限增长。 此文介绍了如何使用 sbroker 库将反压机制应用于Elixir应用程序。

在Elixir应用中使用sbroker的Erlang库

sbroker这个Erlang库提供了用于创建池和(或)负载调节器的构建模块。 它使用 broker模式 ,其中与服务worker的通信由负责worker与其调用之间协调的broker处理。

一个简单例子

让我们看一个如何在Elixir应用程序中使用sbroker库的简单示例。

首先我们在命令行运行如下命令:

mix new example

它将在当前目录下创建一个样例工程,名字就叫做 “example” 。在该示例中,我们将模拟在一个worker中对外部服务的调用并由broker处理相关通信。为此,我们编辑example/mix.exs文件,以便将sbroker库添加到我们的应用程序中:

defmodule Example.Mixfile do
  use Mix.Project

  def project do
    [
      app: :example,
      version: "0.1.0",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

  def application do
    [
      applications: [:sbroker],
      extra_applications: [:logger],
      mod: {Example, []}
    ]
  end

  defp deps do
    [{:sbroker, "~> 1.0-beta"}]
  end
end

我们将sbroker库添加到上面的依赖项和应用程序中。我们也在18行指定了我们应用的模块 mod: {Example, []},这个模块我们稍后创建。现在我们准备添加一个broker,所以我们在example/lib/example/broker.ex中创建我们对broker模块:

defmodule Example.Broker do
  @behaviour :sbroker

  def start_link() do
    start_link(timeout: 10000)
  end

  def start_link(opts) do
    :sbroker.start_link({:local, __MODULE__}, __MODULE__, opts, [])
  end

  def init(opts) do
    # See `DBConnection.Sojourn.Broker`.

    # 使broker的“左”侧成为一个FIFO队列,在超时后丢弃请求。
    client_queue =
      {:sbroker_timeout_queue,
       %{
         out: :out,
         timeout: opts[:timeout],
         drop: :drop,
         min: 0,
         max: 128
       }}

    # 使broker对“右”侧成为一个FIFO队列,这个队列没有超时。
    worker_queue =
      {:sbroker_drop_queue,
       %{
         out: :out_r,
         drop: :drop,
         timeout: :infinity
       }}

    {:ok, {client_queue, worker_queue, []}}
  end
end

上面的模块实现了sbroker行为。在第9行我们启动sbroker,并且在选项里设置超时为10秒。这个超时的意思是,当这些调用在队列里等候worker的时间超过10秒,它们将被丢弃。在init/1函数里,我们给broker定义了客户端和worker的队列。我们定义了broker模块后,我们需要定义worker模块,它负责定义worker,并且向broker请求任务。我们在 example/lib/example/worker.ex里定义worker模块:

defmodule Example.Worker do
  use GenServer

  alias Example.{Broker}

  def start_link() do
    GenServer.start_link(__MODULE__, [])
  end

  #
  # GenServer 的回调函数
  #

  def init([]) do
    state =
      ask(%{
        tag: make_ref()
      })

    {:ok, state}
  end

  def handle_info({tag, {:go, ref, {pid, {:fetch, [params]}}, _, _}}, %{tag: tag} = s) do
    send(pid, {ref, fetch_from_external_resource(params)})
    {:noreply, ask(s)}
  end

  # 当sborker发现有匹配的任务,它将给我们发送 {tag, {:go, ref, req, _, _}} 消息
  defp ask(%{tag: tag} = s) do
    {:await, ^tag, _} = :sbroker.async_ask_r(Broker, self(), {self(), tag})
    s
  end

  defp fetch_from_external_resource(params) do
    # 模拟处理工作
    Process.sleep(1000)
    {:ok, "External service called with#{inspect(params)}"}
  end
end

fetchfrom_external_resource/1 函数是一个简单的模拟函数,它将使得进程等待一秒,然后返回 {:ok, “External service called with #{inspect(params)}”} 。当worker这个GenServer进程收到 {tag, {:go, ref, {pid, {:fetch, [params]}}, , }} 这样的消息时,这个函数将被调用。这个元组中的tag变量是一个唯一标识符,它被用来标识worker并且被存储在GenServer进程的状态中。

在worker获得所需数据后,它会向broker请求新的任务。我们已经定义了broker和worker模块,因此我们现在可以定义一个监督者,它将启动broker和一个worker池。监督者在 example/lib/example/supervisor.ex 里定义:

defmodule Example.Supervisor do
  use Supervisor

  alias Example.{Broker, Worker}

  def start_link() do
    Supervisor.start_link(__MODULE__, [])
  end

  def init(_args) do
    pool_size = 5
    broker = worker(Broker, [], id: :broker)

    workers =
      for id <- 1..pool_size do
        worker(Worker, [], id: id)
      end

    worker_sup_opts = [strategy: :one_for_one, max_restarts: pool_size]
    worker_sup = supervisor(Supervisor, [workers, worker_sup_opts], id: :workers)

    supervise([broker, worker_sup], strategy: :one_for_one)
  end
end

在这个例子中,我们的worker池有5个worker。我们基本完成工作了,还剩下要创建一个application模块,这个模块定义在 example/lib/example.ex 中:

defmodule Example do
  use Application
  alias Example.{Broker}

  def start(_type, _args) do
    Example.Supervisor.start_link()
  end

  def fetch_from_external_resource(params) do
    perform({:fetch, [params]})
    |> inspect()
    |> IO.puts()
  end

  defp perform({action, args} = params) do
    case :sbroker.ask(Broker, {self(), params}) do
      {:go, ref, worker, _, _queue_time} ->
        monitor = Process.monitor(worker)

        receive do
          {^ref, result} ->
            Process.demonitor(monitor, [:flush])
            result

          {:DOWN, ^monitor, _, _, reason} ->
            exit({reason, {__MODULE__, action, args}})
        end

      {:drop, _time} ->
        {:error, :overload}
    end
  end
end

这个模块启动监督者,并且它有一个函数:fetch_from_external_resource/1,这个函数向broker请求一个worker,当broker能够为我们的请求分配一个worker的时候,会向这个worker发送了消息 {:fetch, [params]} 。当broker不能分配给我们worker的时候,返回的是 {:drop, time} 消息,这样的话,我们的私有函数 perform/1,将返回 {:error, :overload} 。函数 fetch_from_external_resource/1 将打印worker的返回值,或者因为broker丢弃了我们的请求而打印 {:error, :overload} 。

我们现在可以在iex里测试这个例子:

iex -S mix run

然后我们可以运行如下语句来从外部资源获取数据:

Example.fetch_from_external_resource("test")

一秒后在iex里会输出如下信息:

{:ok, "External service called with \"test\""}
:ok

为了模拟和测试更多的调用,我们可以通过运行以下语句多次并行地调用 Example.fetch_from_external_resource(“test”) :

Enum.each(1..500, fn _ ->
    Task.start(fn ->
      Example.fetch_from_external_resource("test")
    end)
end)

这将打印相同的行并一次打印五行,因为我们的示例worker池包含五个worker。我们也将得到 {:error, :overload} 响应,因为broker不能分配worker并且任务在队列等待太长时间。 {:error, :overload} 响应是用于防止外部服务过载的反压力示例。例如,我们的系统现在可以通过HTTP / 1.1 429 Too Many Requests回复请求服务的客户端,并且它不会因为过载而崩溃。


以上所述就是小编给大家介绍的《如何用Sbroker在Elixir中构建一个反压队列系统》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

深入浅出强化学习:原理入门

深入浅出强化学习:原理入门

郭宪、方勇纯 / 电子工业出版社 / 2018-1 / 79

《深入浅出强化学习:原理入门》用通俗易懂的语言深入浅出地介绍了强化学习的基本原理,覆盖了传统的强化学习基本方法和当前炙手可热的深度强化学习方法。开篇从最基本的马尔科夫决策过程入手,将强化学习问题纳入到严谨的数学框架中,接着阐述了解决此类问题最基本的方法——动态规划方法,并从中总结出解决强化学习问题的基本思路:交互迭代策略评估和策略改善。基于这个思路,分别介绍了基于值函数的强化学习方法和基于直接策略......一起来看看 《深入浅出强化学习:原理入门》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具