[Хд] logo

Очереди на Beanstalk

По своей сути Beanstalkd — упрощенная и легкая система очередей, которая разрабатывалась под нужны Causes. Представляется как менеджер заданий распределенного приложения, который собирает отложенные задачи (отправка почты, различные запросы рода). Beanstalkd scheme

Определенные процессы размещают задачи в очередь, а воркеры получают и выполняют задачи из очереди.

Среди основных возможностей beanstalkd:

  • приоритизация;
  • распределенность — несколько серверов beanstalkd работают по принципу Memcached (можно масштабировать);
  • отложенное выполнение (т.з. bury) для последующего выполнения;
  • внешние библиотеки (PHP, Python и многие другие), веб-консоли;
  • таймаут для автоматического помещения в очередь.

Термины и команды beanstalkd

Терминология инструмента отличается от привычных серверов сообщений, того же Gearman, к примеру:

  • jobs — то же, что и сообщения;
  • tubes — очереди, в которые помещаются сообщений для передачи работникам (workers);
  • producers — приложения, которые создают задания (или сообщения);
  • consumers — приложения, которые получают задания из очереди для обработки.

Основных команд у beanstalkd тоже немного:

  • put — добавить новое задание в очередь (в том числе отложенно);
  • reserve — получить задание из очереди;
  • delete — удалить задание после выполнения;
  • bury — отложить задание после завершения;
  • kick — вытащить задание из статуса bury и поместить в очередь;
  • release — поставить статус “готово” для задачи после выполнения, сохраняется id и приоритет.
beanstalkd queue

Больше команд и опций на официальной странице системы.

Установка и настройка

Beanstalkd входит в систему пакетов aptitude:

sudo aptitude install beanstalkd

# Также собирается из исходников с GitHub

А для запуска достаточно выполнить:

beanstalkd

# По умолчанию выполняется локально используя порт 11300

После установки демоном beanstalkd также можно управлять как сервисом ОС. Он выполняется в RAM, а для обеспечения fault tolerance запускается с дополнительной опцией:

beanstalkd -b ~/beanstore &

# Пишет данные очереди в каталог beanstore, выполнение в фоне

Если перезапустить beanstalkd с такими же параметрами, то он в первую очередь проверит лог и продолжит выполнение с места остановки.

Демона beanstalkd можно запускать с опциями:

-b DIR   директория для валидации
 -f MS    fsync каждое значение миллисекунд ( -f0 для "always fsync")
 -F       без fsync (по умолчанию)
 -l ADDR  слушать адрес (0.0.0.0 по умолчанию)
 -p PORT  слушать порт (11300 по умолчанию)
 -u USER  стать пользователем и группой
 -z BYTES максимальный размер задания в байтах (65535 по умолчанию)
 -s BYTES размер каждого файла валидации (10485760 по умолчанию)
            (будут округлены до значения, кратного 512 байт)
 -c       уменьшить binlog (по умолчанию)
 -n       не уменьшать binlog
 -v       показать версию
 -V       улучшенный вывод
 -h       показать справку

# Опции можно комбинировать

Ко всему прочему beanstalkd поддерживает клиенты для всех популярных языков и обладает API для создания проприетарных клиентов.

Пример использования Ruby

Для первого примера используем клиент Beaneater для Ruby. Размещение задачи (сообщения) в очередь будет выглядеть так:

require 'beaneater'
require 'json'

beanstalk = Beaneater::Pool.new(['localhost:11300'])
tube = beanstalkd.tubes['my-tube']
job = {some: 'key', value: 'object'}.to_json

tube.put job

# Задает адрес и порт, имя очереди и само сообщение, в формате JSON

После этого нужно создать скрипт для вылавливания заданий из очереди:

beanstalkd.tubes.watch!('my-tube')
loop do
  job = beanstalk.tubes.reserve
  begin
    # обработка задачи
    job.delete
  rescue Exception => e
    job.bury
  end
end

# Дожидается готовности задачи, обрабатывает ее и повторяет весь процесс

Для управления приложениями-потребителями логично использовать контроллеры процессов, как supervisord.

Если обрабатывается большая, тяжелая задача, то верхний пример можно представить как:

beanstalkd.jobs.register('my-tube') do |job|
  # ... обработка задачи
end

beanstalkd.jobs.process!

# “Оборачивает” скрипт, резервируя, обрабатывая, а затем удаляя или откладывая задачу, основываясь на результате

Пример использования PHP

Во втором примере реализована отправка почты при помощи Mandrill. Для удобства установки и использования клиента Pheanstalk для PHP рекомендуется использовать Composer:

composer require mandrill/mandrill pda/pheanstalk

# Установка зависимостей

Потребуется всего два скрипта: поставщика задачи в очередь и потребителя, который будет обрабатывать задания.

Код поставщика будет иметь вид:

<?php

require_once __DIR__ . '/vendor/autoload.php';

$email = array(
    'to' => 'xyz@example.com',
    'from' => 'abc@example.com',
    'subject' => 'Subject',
    'body' => 'Some text'
);

$pheanstalk = new \Pheanstalk\Pheanstalk('127.0.0.1');


# Добавляет JSON для задачи "email_queue"
$pheanstalk
    ->useTube('email_queue')
    ->put(json_encode($email));

# Создает локальную задачу email_queue — строку JSON массива с данными

А воркер будет выглядеть так:

<?php

require_once __DIR__ . '/vendor/autoload.php';

$pheanstalk = new \Pheanstalk\Pheanstalk('127.0.0.1');


# Чтение очереди beanstalkd
while (true) {
    # Проверка соединения
    if (!$pheanstalk->getConnection()->isServiceListening()) {
        echo "Ошибка соединения, подождите... \n";

        # Ждет 5 с
        sleep(5);

        # Запуск следующей итерации
        continue;
    }

    # Получить задачу из очереди, если она готова
    $job = $pheanstalk
        ->watch('email_queue')
        ->ignore('default')
        ->reserve();

    $email = json_decode($job->getData(), true);

    try {
         # Отправка почты с использованием Mandrill API
        $mandrill = new Mandrill('[MANDRILL-API-KEY');

        $message = array(
            'html' => $email['body'],
            'subject' => $email['subject'],
            'from_email' => $email['from'],
            'to' => array(
                array(
                    'email' => $email['to'],
                )
            )
        );

        # Непосредственная отправка письма
        $result = $mandrill->messages->send($message);

       # Удалить задачу из очереди
        $pheanstalk->delete($job);

        echo "Письмо отправлено \n";

    } catch (Exception $e) {
        echo "Ошибка отправки - {$e->getMessage()} \n";
    }
}

# Не забудьте использовать свой уникальный API

Затем можно запускать команду:

nohup php email_worker.php > path/to/logfile.txt &

# Игнорирует сигнал SIGHUP, записывает данные в текстовый файл, выполняется в фоне

Самое главное

Beanstalkd подходит для реализации системы очередей любой сложности на любом популярном языке программирования. А распределенное выполнение позволит использовать инструмент в высоко-нагруженных приложениях.

  read in english
[Хд]

Подписывайтесь на отборные материалы по продвинутой разработке

Google Email

Esc, чтобы подписаться позже