28 октября 2009, 13:03

Удалённые вызовы через систему распределённых объектов в руби (dRuby)

Темы: ruby, daemon, ruby1.9

Введение

Некоторое время назад я писал о создании подпроцессов на руби. В числе прочего один из вопросов был об общении между собой демона и родительского процесса. Об одном из методов пойдёт речь сегодня

Постановка задачи

Не только программисты знают, что важна цель коммуникации. :) Если цель общения между основным процессом и демоном в том, чтобы вызывать методы на объектах друг друга, до давайте на этом и сосредоточимся.

Решение: DRb

Для удалённого обращения с объектами существует стандартная руби-библиотека dRuby, в которой находится модуль DRb, который мы и будем использовать. Ничего устанавливать не нужно. Согласно документации, совершенно прозрачным образом можно вызвать методы на удалённом объекте даже на другой машине. Объекты и ссылки на них передаются в формате Marshal.

Ну, довольно теории! Перейдём к практике. Для эмуляции параллельных процессов (возможно на разных машинах (!)) мы будем использовать два окна терминала. В одном запустим server.rb:

# coding: utf-8
$KCODE = "utf-8" if RUBY_VERSION < "1.9.0"
require "drb/drb"

class RemoteObject
  def remote_method_with_param(param)
    puts "вызван метод на сервере с параметром #{param.inspect}"
    case param.class.to_s
    when "String"
      puts "параметр типа строка"
      param.reverse!
    when "Array"
      puts "параметр типа массив"
      param.shift
    else
      puts "параметр оставшегося типа"
      param.do_smth
    end
  end
end

$SAFE = 1 # Запретить eval() и eval-оподобные вызовы

DRb.start_service("druby://localhost:45678", RemoteObject.new)
DRb.thread.join

Здесь мы используем банальный Thread#join, чтобы при необходимости просто прервать выполнение. Но те, кто читал предыдущую статью, знают, что в это время можно делать что угодно и следить за потоком dRuby отдельно.

В другом терминале запустим клиентский код client.rb:

# coding: utf-8
$KCODE = "utf-8" if RUBY_VERSION < "1.9.0"
require "drb/drb"

class MyString
  def initialize(str)
    @string = str
  end

  def do_smth
    @string.reverse!
  end

  def inspect
    "<#{@string}>"
  end
end

rem_o = DRbObject.new_with_uri("druby://localhost:45678")

["строка", ["котик", "пёсик", "слоник"], MyString.new("суперстрока")].each do |obj|
  puts "Вызов метода вернул: #{rem_o.remote_method_with_param(obj).inspect}"
  puts "Параметр после вызова: #{obj.inspect}"
end

Вывод в терминалы будет следующий (я использую вывод для версии руби 1.9.1, потому что он нормально переворачивает кириллическую строку без колдовства) для сервера:

вызван метод на сервере с параметром "строка"
параметр типа строка
вызван метод на сервере с параметром ["котик", "пёсик", "слоник"]
параметр типа массив
вызван метод на сервере с параметром #<DRb::DRbUnknown:0x00000001248910 @name="MyString", @buf="\x04\bo:\rMyString\x06:\f@stringI\"\e\xD1\x81\xD1\x83\xD0\xBF\xD0\xB5\xD1\x80\xD1\x81\xD1\x82\xD1\x80\xD0\xBE\xD0\xBA\xD0\xB0\x06:\rencoding\"\nUTF-8">
параметр оставшегося типа

Клиент же упадёт с ошибкой:

Вызов метода вернул: "акортс"
Параметр после вызова: "строка"
Вызов метода вернул: "котик"
Параметр после вызова: ["котик", "пёсик", "слоник"]
(druby://localhost:45678) server.rb:17:in `remote_method_with_param': undefined method `do_smth' for #<DRb::DRbUnknown:0x00000001248910> (NoMethodError)
     .....

Что, безусловно, прекрасно. Прекрасно, что упал не сервер. :) Понятно, что он не знает ничего про этот объект и не знает, как с ним обращаться.

Как видно из вывода, объекты передаются в виде копий. Нашим же третьим, самодельным объектом, мы можем исследовать две возможности: таки передавать копию объекта или передавать лишь ссылку на него, чтобы вызовы выполнялись на клиентской копии. Для первой возможности достаточно вынести определение класса в общедоступное для клиента и сервера место — common.rb:

# coding: utf-8
$KCODE = "utf-8" if RUBY_VERSION < "1.9.0"
require "drb/drb"

REM_URI = "druby://localhost:45678"

class MyStringCopied
  def initialize(str)
    @string = str
  end

  def do_smth
    @string.reverse!
    self
  end

  def inspect
    "<<#{@string}>>"
  end
end

class MyStringSingle
  include DRb::DRbUndumped # это ключ :)
  def initialize(str)
    @string = str
  end

  def do_smth
    @string.reverse!
    self
  end

  def inspect
    "<#{@string}>"
  end
end

Добавим require "common.rb" в серверный код, а клиентский преобразится до такого:

# coding: utf-8
require "common"

rem_o = DRbObject.new_with_uri(REM_URI)

DRb.start_service # Это нужно для объекта, который не копируется при передаче

["строка",
  ["котик", "пёсик", "слоник"],
  MyStringCopied.new("суперстрока"),
  MyStringSingle.new("суперстрока без копий")].each do |obj|
  puts "Вызов метода вернул: #{rem_o.remote_method_with_param(obj).inspect}"
  puts "Параметр после вызова: #{obj.inspect}"
end

Как видно, мы сразу позаботились и о второй возможности, создав для неё ещё один класс. Секрет заключается во включении модуля DRb::DRbUndumped и старте ещё одного серверного процесса на клиенте (для вызовов методов объектов клиента удалённо) Клиентский вывод теперь выглядит так:

Вызов метода вернул: "акортс"
Параметр после вызова: "строка"
Вызов метода вернул: "котик"
Параметр после вызова: ["котик", "пёсик", "слоник"]
Вызов метода вернул: <<акортсрепус>>
Параметр после вызова: <<суперстрока>>
Вызов метода вернул: #<DRb::DRbObject:0x000000012588c8 @uri="druby://127.0.1.1:43998", @ref=9631244>
Параметр после вызова: <йипок зеб акортсрепус>

Если немножко почитать, и разобраться, какие объекты можно и нужно «маршализировать», а какие нельзя или не нужно, то получается вполне себе прекрасный инструмент. Который, повторюсь, входит в стандартную библиотеку и не требует никаких внешних зависимостей.

Материалы для самостоятельного изучения

  1. Полный код статьи на github
  2. Документация по DRb (rdoc)
  3. Документация по Marshal

Комментарии 2 >>

22 октября 2009, 00:48

Работа с потоками (Thread) в руби

Темы: ruby, extension, syntax, daemon, ruby1.9

Введение

Сначала я расскажу, почему на сегодняшний день я не очень много работаю с подпроцессами на базе Thread, предпочитая им Kernel.fork. А потом покажу простой способ следить за потоками при работе приложения.

На текущий момент, основная проблема потоков — это «ненастоящее» распределение ресурсов. Все потоки руби на самом деле находятся в одном системном потоке, который по очереди передаёт им управление. Это влечёт за собой полтора следствия.

Зависание

Когда имеешь дело с внешним оборудованием, сторонними библиотеками и серийными портами, зависание потока может случиться на самом низком уровне. Это можно симулировать небольшой программой на си — block_thread.c:

#include <ruby.h>

VALUE rb_mBlockThread;

/*
 * call-seq:
 *   BlockThread::cycle(interval=5)
 *
 * Блокирует текущий поток на <code>interval</code> секунд.
 *
 */

VALUE bt_cycle(int argc, VALUE *argv, VALUE self) {
  int i, max;
  max = 5;

  if (argc == 1) {
    max = FIX2INT(argv[0]);
  } else if (argc > 1) {
    rb_raise(rb_eArgError, "Неправильное количество аргументов (%d вместо 0 или 1)");
  }

  for (i=0; i<max; i++) {
    sleep(1);
  }
  return Qnil;
}

void Init_block_thread() {
  /*
   * Модуль содержит методы для демонстрации работы потока
   */
  rb_mBlockThread = rb_define_module("BlockThread");
  rb_define_module_function(rb_mBlockThread, "cycle", bt_cycle, -1);
}

Если вы никогда не расширяли руби с помощью си, поясню, что в этой программе мы создаём модуль BlockTread, в котором создаём метода класса cycle, который указанное число раз (по умолчанию 5) в цикле ждёт одну секунду. Напишем extconf.rb:

require "mkmf"
create_makefile("block_thread")

И программу на руби, в которой будут два потока, один из которых мы заблокируем на низком уровне block_threads.rb:

# coding: utf-8
require "block_thread.so"

t1 = Thread.new do
  10.times { |i| puts i; sleep 0.1 }
end

t2 = Thread.new do
  puts "Блокируем"
  BlockThread.cycle
  puts "Разблокируем"
end

t1.join
t2.join

Скомпилируем и запустим:

ruby extconf.rb
make
ruby block_threads.rb

И что же мы видим? Мы видим, как все потоки, включая основной, блокируются на пять секунд (или любое число секунд, которое мы укажем) И даже ctrl + c не в силах нам помочь. Помогает только ctrl + z и потом killall ...

В случае же с Kernel.fork, процессы действительно равномерно делят между собой ресурсы, и один подпроцесс не способен заблокировать всё.

Синхронизация

Я говорил про полторы проблемы. Об одной уже рассказал, а вторая известна давно — попробуйте выполнить следующий код:

# coding: utf-8
$cnt = 0

t1 = Thread.new do
  100000.times { $cnt += 1 }
end

t2 = Thread.new do
  100000.times { $cnt += 1 }
end

t1.join
t2.join

puts "Without sync: #{$cnt}"

Если вы не используете руби 1.9, то вы получите неожиданный и каждый раз разный результат. Всё дело в том, что переключение между потоками происходит между элементарными операциями, а += состоит из трёх элементарных операций: достать значение, прибавить к нему число, записать значение. Чтобы этого не произошло, нужно либо использовать синхронизацию с помощью Mutex, либо руби 1.9. Ссылка на полный код для этой статьи в конце, т.к. я спешу перейти к более интересной части. :)

Слежение за потоками с помощью менеджера ThreadsWait

Совершенно недавно открыл для себя интересный способ следить за статусом пакетов в блокирующей и неблокирующей манере:

# coding: utf-8
require "thwait"

t1 = Thread.new do
  10.times { |i| puts "поток 1 тик #{i}"; sleep 0.5 }
end

t2 = Thread.new do
  10.times { |i| puts "поток 2 тик #{i}"; sleep 0.7 }
end

tw = ThreadsWait.new t1, t2

t3 = Thread.new do
  10.times { |i| puts "поток 3 тик #{i}"; sleep 0.3 }
end

run = true
tw.join_nowait t3

while run do
  begin
    # Неблокирующее ожидание
    puts "Закончил работу #{tw.next_wait(true).inspect }"
    run = false
  rescue ThreadsWait::ErrNoFinishedThread
    puts "Ожидаем окончания работы одного из потоков"
    sleep 0.5
  end
end

# Блокирующее ожидание
tw.all_waits do |t|
  puts "Закончил работу #{t.inspect}"
end

По-моему, весьма удобно, если вам нужно не просто ожидать окончания работы потоков, но ещё и делать что-то при этом.

Материалы для самостоятельного изучения

  1. Полный код статьи на github
  2. Документация ThreadsWait
  3. Толковая статья о многопотоковости и процессах в руби

Комментарии 0 >>

07 октября 2009, 21:33

Некоторые тонкости стыковки ruby и bash

Темы: ruby, bash

Введение

Последнее время больше занимаюсь работой системного администратора нежели программиста. Прошлую неделю даже пропустил написание статьи. Это, конечно, не означает, что совсем нечего рассказать.

Поскольку для быстроты я обычно пишу большинство скриптов на руби, необходимо чтобы они следовали некоторым тонкостям работы с командной строкой.

Условное выполнение

В bash кроме разделителей команд «&» и «;», существует ещё и условное выполнение списка команд с помощью «&&» и «||». Их работа зависит от кода, с которым произошёл выход.
exit0:

#!/usr/bin/env ruby
puts "Выход с кодом 0"
exit 0

exit1:

#!/usr/bin/env ruby
puts "Выход с кодом 1"
exit 1

Теперь если запустить скрипты в следующем сочетании:

./exit0 && ./exit0 && ./exit1 && ./exit0

То вывод будет следующий:

Выход с кодом 0
Выход с кодом 0
Выход с кодом 1

А если запустить скрипты в следующем сочетании:

./exit1 || ./exit1 || ./exit0 || ./exit1

То вывод будет следующий:

Выход с кодом 1
Выход с кодом 1
Выход с кодом 0

То есть, «&&» выполняет следующую команду, если предыдущая вышла с кодом 0, а «||» выполняет следующую команду, если предыдущая вернула ненулевой код. Условно говоря, первый список выполняется пока всё срабатывает, а второй — пока не срабатывает.

Перенаправление вывода с помощью pipeline

Тут всё просто. Если в bash команды разделены с помощью «|», то вывод первой команды будет перенаправлен на вход второй.

Простая демонстрация из трёх файлов.
show:

#!/usr/bin/env ruby
$KCODE = "utf8"
$stdout.puts "stdin содержит: #{$stdin.read.inspect}"

out:

#!/usr/bin/env ruby
$stdout.puts "Привет из stdout!"

err:

#!/usr/bin/env ruby
$stderr.puts "[*stderr*] Привет из stderr!"
$stdout.puts "Привет из stdout!"

Думаю, не составит труда выяснить, что выводят запущенные отдельно out и err, но вот, что из этого можно сделать с помощью pipeline:

./out | ./show

выводит

stdin содержит: "Привет из stdout!\n"

Когда же есть обращение к другому каналу вывода, то

./err | ./show

выводит

[*stderr*] Привет из stderr!
stdin содержит: "Привет из stdout!\n"

Вывод stderr можно перенаправить в stdout:

./err 2>&1 | ./show

выводит

stdin содержит: "[*stderr*] Привет из stderr!\nПривет из stdout!\n"

Материалы для самостоятельного изучения

Документация по bash

Комментарии 0 >>