Is Ruby pipe streaming asynchronous? -


i have map/reduce streaming pipeline written in ruby behaving strangely. pipeline looks this:

mapper | sort | reducer | expander | sort | splitter | uploader

the mapper writes stdout (via puts), reducer reads stdin (via argf.each) , writes stdout (via puts) etc. etc.

it seems when uploader executed files splitter should've have created not created yet. uploader doesn't upload anything.

here's pipeline class:

class pipeline    def run(context)     raise argumenterror, 'context nil' unless context     raise argumenterror, 'context[:logger] nil' unless context[:logger]      current_path = file.dirname(__file__)     mapper       = file.join(current_path, 'mapper.rb')     reducer      = file.join(current_path, 'reducer.rb')     expander     = file.join(current_path, 'expander.rb')     splitter     = file.join(current_path, 'splitter.rb')     uploader     = file.join(current_path, 'uploader.rb')      mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}"      command_line = "ruby #{mapper}#{mapper_args} | sort | ruby #{reducer} | ruby #{expander} | sort | ruby #{splitter} | ruby #{uploader}"      context[:logger].debug command_line      %x{#{command_line}}   end  end 

if piped streaming asynchronous in ruby, i'm wondering if doing rubymine fix this. example, before run ruby script prepend command line this: ruby -e $stdout.sync=true;$stderr.sync=true;load($0=argv.shift).

i've updated code technique, however, want know if correctly. or, if there better way?

class pipeline    def run(context)     raise argumenterror, 'context nil' unless context     raise argumenterror, 'context[:logger] nil' unless context[:logger]      current_path = file.dirname(__file__)     ruby         = 'ruby -e $stdout.sync=true;$stderr.sync=true;load($0=argv.shift)'     mapper       = file.join(current_path, 'mapper.rb')     reducer      = file.join(current_path, 'reducer.rb')     expander     = file.join(current_path, 'expander.rb')     splitter     = file.join(current_path, 'splitter.rb')     uploader     = file.join(current_path, 'uploader.rb')      mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}"      create_reports_command_line = "#{ruby} #{mapper}#{mapper_args} | sort | #{ruby} #{reducer} | #{ruby} #{expander} | sort | #{ruby} #{splitter}"      context[:logger].debug create_reports_command_line      %x{#{create_reports_command_line}}      sleep 60 # sleep 1 min, in case...      upload_reports_command_line = "#{ruby} #{uploader}"      context[:logger].debug upload_reports_command_line      %x{#{upload_reports_command_line}}   end  end 

you need enable sync. tells ruby not buffer output, instead send it's output.

sets “sync mode” true or false. when sync mode true, output flushed underlying operating system , not buffered internally.

pipelines should work sync disabled/false, won't see until first, subsequent pipes, see closed input, or have full buffers , flush them, take while.

see io.sync= more information.


Comments

Popular posts from this blog

javascript - Bootstrap Popover: iOS Safari strange behaviour -

Magento/PHP - Get phones on all members in a customer group -

session - Logging Out Using PHP -