LE Blog

Инженер от души

20.10.2010 firtree_right Определение, запущен ли процесс

Пролог

Ого! Уже три месяца я ничего не писал в этот блог! Лето выдалось жаркое не только на погоду. Поскольку летом погода лучше, а световой день длиннее, было много работы. Причём работы связанной с поддержкой того, что уже и так нормально функционировало в прошлом сезоне. Ничего серьёзно нового не писалось активно, а значит и захватывающих сюжетов для статей не находилось.

Но теперь у меня появилась возможность писать кое-что новое. Поэтому есть, что рассказать.

to feed or not to feed

Введение

Если вы любите процессы-демоны, как люблю их я, то, возможно, перед вами уже возникала задача определить, запущен ли уже такой демон, перед тем как создавать дочерний процесс. Об этом и будет сегодняшняя статья.

Баш в помощь

Предположим, что у нас есть простейший демон. Хорошо бы имя у него было уникальное, чтобы можно его потом было отыскать. Файл uniq_name_simple_daemon:

#!/usr/bin/env ruby

pid = fork do
  begin
    running = true
    Signal.trap("TERM") do
      running = false
    end
    while running
      sleep 0.01
    end
  rescue Exception => e
    puts e.to_s
    puts e.backtrace.join "\n"
  ensure
    exit!
  end
end

Мы всегда можем запускать с помощью другого скрипта, например на баше (simple_daemon_runner.sh):

#!/bin/bash

if ps ax | grep uniq_name_simple_daemon | grep -vq grep
then
  echo "uniq_name_simple_daemon is already running"
else
  echo "starting uniq_name_simple_daemon"
  ./uniq_name_simple_daemon
fi

На подобной команде будут базироваться все наши последующие методы. Тут, если кто не понял, мы фильтруем вывод ps ax сначала ища там имя нашего скрипта, а затем исключая из списка сам процесс поиска (команду grep). Ключ q позволяет нам получить код выхода, не выводя ничего на экран. То есть если строчка найдена, то запускаем первый блок, если нет, то второй.

Можно сделать такой же скрипт для остановки процесса (simple_daemon_stopper.sh):

#!/bin/bash

pid=$(ps ax | grep uniq_name_simple_daemon | grep -v grep | awk '{ print $1; }')

if [[ -n $pid ]]
then
  echo "stopping uniq_name_simple_daemon"
  kill -TERM $pid
else
  echo "nothing to stop"
fi

Конечно же, при таком раскладе всегда есть возможность запустить нашего демона без помощи скриптов. И тогда проверка делаться не будет. В таком случае полезно проверять, запущен ли процесс уже внутри самого руби, перед тем, как отпочковать дочерний процесс.

Сам себе хозяин

В данном случае задача сводится к проверке наличия в памяти ещё одного процесса с таким же именем кроме текущего. Так же нужно уметь останавливать процесс с помощью того же файла. Вот, какое решение получилось у меня (uniq_name_auto_daemon):

#!/usr/bin/env ruby

ps_ax = `ps ax | grep #{File.basename(__FILE__)} | grep -v grep`.split("\n").map{ |l| l.strip.split(/\s+/) }.reject{ |l| l[0].to_i == Process.pid }

if ps_ax.any?
  case ARGV[0]
    when /stop/i
      ps_ax.each do |l|
        system "kill -TERM #{l[0]}"
      end
    when /kill/i
      ps_ax.each do |l|
        system "kill -KILL #{l[0]}"
      end
    else
      puts "#{File.basename(__FILE__)} is already running. If you want to stop it, run './#{File.basename(__FILE__)} stop|kill'"
  end
else
  pid = fork do
    begin
      running = true
      Signal.trap("TERM") do
        running = false
      end
      while running
        sleep 0.01
      end
    rescue Exception => e
      puts e.to_s
      puts e.backtrace.join "\n"
    ensure
      exit!
    end
  end
end

Во-первых, обходимся одним файлом, который никак иначе не запустить. Во-вторых, нигде не нужно хардкодить его имя. По-моему, очень удобно.

Оффтопик

С одной стороны, когда я пишу текст, то мне удобнее писать все термины по-русски и склонять их: «демоны», «руби», «баш», но с другой стороны это не поможет тому, кто будет искать решение похожей задачи.

Внутри примеров кода — наоборот, удобнее писать комментарии и тексты по-английски, чтобы не переключать раскладку, но как-то это не очень соответствует русскоязычном блогу.

Что же делать? :)

Материалы для самостоятельного изучения

Полный код статьи на гитхабе.

28.10.2009 firtree_right Удалённые вызовы через систему распределённых объектов в руби (dRuby)

Введение

Некоторое время назад я писал о создании подпроцессов на руби. В числе прочего один из вопросов был об общении между собой демона и родительского процесса. Об одном из методов пойдёт речь сегодня

Постановка задачи

Не только программисты знают, что важна цель коммуникации. :) Если цель общения между основным процессом и демоном в том, чтобы вызывать методы на объектах друг друга, до давайте на этом и сосредоточимся.

Решение: DRb

Для удалённого обращения с объектами существует стандартная руби-библиотека dRuby, в которой находится модуль DRb, который мы и будем использовать. Ничего устанавливать не нужно. Согласно документации, совершенно прозрачным образом можно вызвать методы на удалённом объекте даже на другой машине. Объекты и ссылки на них передаются в формате Marshal.

Ну, довольно теории! Перейдём к практике. Для эмуляции параллельных процессов (возможно на разных машинах (!)) мы будем использовать два окна терминала. В одном запустим server.rb:

# coding: utf-8
$KCODE = "utf-8" if RUBY_VERSION < "1.9.0"
require "drb/drb"

class RemoteObject
  def remote_method_with_param(param)
    puts "вызван метод на сервере с параметром #{param.inspect}"
    case param.class.to_s
    when "String"
      puts "параметр типа строка"
      param.reverse!
    when "Array"
      puts "параметр типа массив"
      param.shift
    else
      puts "параметр оставшегося типа"
      param.do_smth
    end
  end
end

$SAFE = 1 # Запретить eval() и eval-оподобные вызовы

DRb.start_service("druby://localhost:45678", RemoteObject.new)
DRb.thread.join

Здесь мы используем банальный Thread#join, чтобы при необходимости просто прервать выполнение. Но те, кто читал предыдущую статью, знают, что в это время можно делать что угодно и следить за потоком dRuby отдельно.

В другом терминале запустим клиентский код client.rb:

# coding: utf-8
$KCODE = "utf-8" if RUBY_VERSION < "1.9.0"
require "drb/drb"

class MyString
  def initialize(str)
    @string = str
  end

  def do_smth
    @string.reverse!
  end

  def inspect
    "<#{@string}>"
  end
end

rem_o = DRbObject.new_with_uri("druby://localhost:45678")

["строка", ["котик", "пёсик", "слоник"], MyString.new("суперстрока")].each do |obj|
  puts "Вызов метода вернул: #{rem_o.remote_method_with_param(obj).inspect}"
  puts "Параметр после вызова: #{obj.inspect}"
end

Вывод в терминалы будет следующий (я использую вывод для версии руби 1.9.1, потому что он нормально переворачивает кириллическую строку без колдовства) для сервера:

вызван метод на сервере с параметром "строка"
параметр типа строка
вызван метод на сервере с параметром ["котик", "пёсик", "слоник"]
параметр типа массив
вызван метод на сервере с параметром #<DRb::DRbUnknown:0x00000001248910 @name="MyString", @buf="\x04\bo:\rMyString\x06:\f@stringI\"\e\xD1\x81\xD1\x83\xD0\xBF\xD0\xB5\xD1\x80\xD1\x81\xD1\x82\xD1\x80\xD0\xBE\xD0\xBA\xD0\xB0\x06:\rencoding\"\nUTF-8">
параметр оставшегося типа

Клиент же упадёт с ошибкой:

Вызов метода вернул: "акортс"
Параметр после вызова: "строка"
Вызов метода вернул: "котик"
Параметр после вызова: ["котик", "пёсик", "слоник"]
(druby://localhost:45678) server.rb:17:in `remote_method_with_param': undefined method `do_smth' for #<DRb::DRbUnknown:0x00000001248910> (NoMethodError)
     .....

Что, безусловно, прекрасно. Прекрасно, что упал не сервер. :) Понятно, что он не знает ничего про этот объект и не знает, как с ним обращаться.

Как видно из вывода, объекты передаются в виде копий. Нашим же третьим, самодельным объектом, мы можем исследовать две возможности: таки передавать копию объекта или передавать лишь ссылку на него, чтобы вызовы выполнялись на клиентской копии. Для первой возможности достаточно вынести определение класса в общедоступное для клиента и сервера место -- common.rb:

# coding: utf-8
$KCODE = "utf-8" if RUBY_VERSION < "1.9.0"
require "drb/drb"

REM_URI = "druby://localhost:45678"

class MyStringCopied
  def initialize(str)
    @string = str
  end

  def do_smth
    @string.reverse!
    self
  end

  def inspect
    "<<#{@string}>>"
  end
end

class MyStringSingle
  include DRb::DRbUndumped # это ключ :)
  def initialize(str)
    @string = str
  end

  def do_smth
    @string.reverse!
    self
  end

  def inspect
    "<#{@string}>"
  end
end

Добавим require "common.rb" в серверный код, а клиентский преобразится до такого:

# coding: utf-8
require "common"

rem_o = DRbObject.new_with_uri(REM_URI)

DRb.start_service # Это нужно для объекта, который не копируется при передаче

["строка",
  ["котик", "пёсик", "слоник"],
  MyStringCopied.new("суперстрока"),
  MyStringSingle.new("суперстрока без копий")].each do |obj|
  puts "Вызов метода вернул: #{rem_o.remote_method_with_param(obj).inspect}"
  puts "Параметр после вызова: #{obj.inspect}"
end

Как видно, мы сразу позаботились и о второй возможности, создав для неё ещё один класс. Секрет заключается во включении модуля DRb::DRbUndumped и старте ещё одного серверного процесса на клиенте (для вызовов методов объектов клиента удалённо) Клиентский вывод теперь выглядит так:

Вызов метода вернул: "акортс"
Параметр после вызова: "строка"
Вызов метода вернул: "котик"
Параметр после вызова: ["котик", "пёсик", "слоник"]
Вызов метода вернул: <<акортсрепус>>
Параметр после вызова: <<суперстрока>>
Вызов метода вернул: #<DRb::DRbObject:0x000000012588c8 @uri="druby://127.0.1.1:43998", @ref=9631244>
Параметр после вызова: <йипок зеб акортсрепус>

Если немножко почитать, и разобраться, какие объекты можно и нужно «маршализировать», а какие нельзя или не нужно, то получается вполне себе прекрасный инструмент. Который, повторюсь, входит в стандартную библиотеку и не требует никаких внешних зависимостей.

Материалы для самостоятельного изучения

  1. Полный код статьи на github
  2. Документация по DRb (rdoc)
  3. Документация по Marshal

22.10.2009 firtree_right Работа с потоками (Thread) в руби

Введение

Сначала я расскажу, почему на сегодняшний день я не очень много работаю с подпроцессами на базе Thread, предпочитая им Kernel.fork. А потом покажу простой способ следить за потоками при работе приложения.

На текущий момент, основная проблема потоков -- это «ненастоящее» распределение ресурсов. Все потоки руби на самом деле находятся в одном системном потоке, который по очереди передаёт им управление. Это влечёт за собой полтора следствия.

Зависание

Когда имеешь дело с внешним оборудованием, сторонними библиотеками и серийными портами, зависание потока может случиться на самом низком уровне. Это можно симулировать небольшой программой на си -- block_thread.c:

#include <ruby.h>

VALUE rb_mBlockThread;

/*
 * call-seq:
 *   BlockThread::cycle(interval=5)
 *
 * Блокирует текущий поток на <code>interval</code> секунд.
 *
 */

VALUE bt_cycle(int argc, VALUE *argv, VALUE self) {
  int i, max;
  max = 5;

  if (argc == 1) {
    max = FIX2INT(argv[0]);
  } else if (argc > 1) {
    rb_raise(rb_eArgError, "Неправильное количество аргументов (%d вместо 0 или 1)");
  }

  for (i=0; i<max; i++) {
    sleep(1);
  }
  return Qnil;
}

void Init_block_thread() {
  /*
   * Модуль содержит методы для демонстрации работы потока
   */
  rb_mBlockThread = rb_define_module("BlockThread");
  rb_define_module_function(rb_mBlockThread, "cycle", bt_cycle, -1);
}

Если вы никогда не расширяли руби с помощью си, поясню, что в этой программе мы создаём модуль BlockTread, в котором создаём метода класса cycle, который указанное число раз (по умолчанию 5) в цикле ждёт одну секунду. Напишем extconf.rb:

require "mkmf"
create_makefile("block_thread")

И программу на руби, в которой будут два потока, один из которых мы заблокируем на низком уровне block_threads.rb:

# coding: utf-8
require "block_thread.so"

t1 = Thread.new do
  10.times { |i| puts i; sleep 0.1 }
end

t2 = Thread.new do
  puts "Блокируем"
  BlockThread.cycle
  puts "Разблокируем"
end

t1.join
t2.join

Скомпилируем и запустим:

ruby extconf.rb
make
ruby block_threads.rb

И что же мы видим? Мы видим, как все потоки, включая основной, блокируются на пять секунд (или любое число секунд, которое мы укажем) И даже ctrl + c не в силах нам помочь. Помогает только ctrl + z и потом killall ...

В случае же с Kernel.fork, процессы действительно равномерно делят между собой ресурсы, и один подпроцесс не способен заблокировать всё.

Синхронизация

Я говорил про полторы проблемы. Об одной уже рассказал, а вторая известна давно -- попробуйте выполнить следующий код:

# coding: utf-8
$cnt = 0

t1 = Thread.new do
  100000.times { $cnt += 1 }
end

t2 = Thread.new do
  100000.times { $cnt += 1 }
end

t1.join
t2.join

puts "Without sync: #{$cnt}"

Если вы не используете руби 1.9, то вы получите неожиданный и каждый раз разный результат. Всё дело в том, что переключение между потоками происходит между элементарными операциями, а += состоит из трёх элементарных операций: достать значение, прибавить к нему число, записать значение. Чтобы этого не произошло, нужно либо использовать синхронизацию с помощью Mutex, либо руби 1.9. Ссылка на полный код для этой статьи в конце, т.к. я спешу перейти к более интересной части. :)

Слежение за потоками с помощью менеджера ThreadsWait

Совершенно недавно открыл для себя интересный способ следить за статусом пакетов в блокирующей и неблокирующей манере:

# coding: utf-8
require "thwait"

t1 = Thread.new do
  10.times { |i| puts "поток 1 тик #{i}"; sleep 0.5 }
end

t2 = Thread.new do
  10.times { |i| puts "поток 2 тик #{i}"; sleep 0.7 }
end

tw = ThreadsWait.new t1, t2

t3 = Thread.new do
  10.times { |i| puts "поток 3 тик #{i}"; sleep 0.3 }
end

run = true
tw.join_nowait t3

while run do
  begin
    # Неблокирующее ожидание
    puts "Закончил работу #{tw.next_wait(true).inspect }"
    run = false
  rescue ThreadsWait::ErrNoFinishedThread
    puts "Ожидаем окончания работы одного из потоков"
    sleep 0.5
  end
end

# Блокирующее ожидание
tw.all_waits do |t|
  puts "Закончил работу #{t.inspect}"
end

По-моему, весьма удобно, если вам нужно не просто ожидать окончания работы потоков, но ещё и делать что-то при этом.

Материалы для самостоятельного изучения

  1. Полный код статьи на github
  2. Документация ThreadsWait
  3. Толковая статья о многопотоковости и процессах в руби

10.06.2009 firtree_right Ruby daemon, или как сделать демона на руби

Задача

Иногда в процессе работы с разными сторонними библиотеками, которые содержат тяжелые блокирующие методы, не обойтись обычным Thread. В таком случае на помощь приходят демоны :) Вот несколько приемов, которые я освоил в работе с нимим.

Базовый механизм

Запустить демона можно с помощью метода Kernel.fork, передав ему блок. Метод возвращает pid процесса, который можно записать в файл и просто использовать в дальнейшем.

pid = fork do
  puts "from daemon"
  exit!(1)
end

Определение статуса

Я не нашёл удобного способа определения методами руби, запущен ли процесс. Есть возможность фильтровать вывод ps ax, ища в нём pid процесса. Но есть метод Process.waitpid, который можно использовать хитрым образом. Так же для будущих задач, упакуем наш код в класс:

require 'timeout'

class Daemon
  class << self
    def start
      @pid = fork do
        puts "from daemon"
        sleep 1
        exit!(1)
      end
    end

    def running?
      if @pid
        begin
          Timeout::timeout(0.01) do
            Process.waitpid(@pid)
            if $?.exited?
              return false
            end
          end
        rescue Timeout::Error
        end
        return true
      else
        return false
      end
    end
  end
end

Daemon.start

puts "running: #{Daemon.running?}"
sleep 1.5

puts "running: #{Daemon.running?}"

Сообщение об ошибках

Предположим, что у нашего демона есть некоторый процесс инициализации, и мы хотим знать, завершился ли он или произошла ошибка до того, как покинем метод Daemon.start.

Для передачи подобных сообщений хорошо подходит IO.pipe. Для двустороннего общения нужно создавать два канала, но нам хватит и одного:

require 'timeout'

class Daemon
  def initialize
    puts "from daemon: initializing"
  end

  class << self
    def start
      @rd, @wr = IO.pipe
      @pid = fork do
        @rd.close
        begin
          dmn = new
          @wr.write "ok"
          @wr.close
          sleep 1
        rescue Exception => e
          @wr.write e.to_s
          @wr.close
        ensure
          exit!(1)
        end
      end
      @wr.close
      str = @rd.read
      if str == "ok"
        puts "daemon started ok"
      else
        puts "error while initializing daemon: #{str}"
      end
      @rd.close
    end

    def running?
      if @pid
        begin
          Timeout::timeout(0.01) do
            Process.waitpid(@pid)
            if $?.exited?
              return false
            end
          end
        rescue Timeout::Error
        end
        return true
      else
        return false
      end
    end
  end
end

Daemon.start

puts "running: #{Daemon.running?}"
sleep 1.5

puts "running: #{Daemon.running?}"

Остановка

У каждого демона (для того они обычно и создаются) есть циклическая часть. Нам хотелось бы запускать и останавливать процесс тогда, когда нам нужно. Если с запуском всё понятно, то для остановки потребуется ещё одна вещь. Ведь в тот момент, когда мы создали демона, он создает копии всех переменных и дальнейшее их изменение внутри и снаружи демона становится независимым. Ещё один способ общения с демоном — сигнал:

require 'timeout'

class Daemon
  def initialize
    puts "from daemon: initializing"
    @cnt = 0
  end

  def main_loop
    @cnt += 1
    puts "from daemon: running loop ##{@cnt}"
    sleep 0.1
  end

  class << self
    def start
      @rd, @wr = IO.pipe
      @pid = fork do
        @rd.close
        running = true
        Signal.trap("TERM") do
          running = false
        end
        begin
          dmn = new
          @wr.write "ok"
          @wr.close
          while running
            dmn.main_loop
          end
        rescue Exception => e
          @wr.write e.to_s
          @wr.close
        ensure
          exit!(1)
        end
      end
      @wr.close
      str = @rd.read
      if str == "ok"
        puts "daemon started ok"
      else
        puts "error while initializing daemon: #{str}"
      end
      @rd.close
    end

    def stop
      unless @pid.nil?
        Process.kill("TERM", @pid)
        @pid = nil
      end
    end

    def running?
      if @pid
        begin
          Timeout::timeout(0.01) do
            Process.waitpid(@pid)
            if $?.exited?
              return false
            end
          end
        rescue Timeout::Error
        end
        return true
      else
        return false
      end
    end
  end
end

Daemon.start

puts "running: #{Daemon.running?}"
sleep 1
Daemon.stop
puts "running: #{Daemon.running?}"

Теперь, если наследовать от этого класса свой класс и переопределить методы initialize и main_loop, получится вполне себе демон :)

Для самостоятельного изучения

Есть, конечно, ещё недостатки. Например:

  1. Если ошибка возникает в main_loop, то канал вывода уже закрыт. А если не закрывать канал, то метод IO#read не позволит нам выйти из метода start. Что делать?
  2. Если нужно периодически общаться с демоном общирными объемами информации, что делать? (я в своей задаче использовал TCPSocket, но это, ведь, не панацея)
  3. Хорошо бы хранить pid в pid-файле на случай нашествия зомби. И, соответственно, обрабатывать возникающие зомбо-проблемы.

Но это я оставлю на самостоятельное решение пытливым читателям.