Is Ruby pipe streaming asynchronous? -
i have map/reduce streaming pipeline written in ruby behaving strangely. pipeline looks this:
mapper | sort | reducer | expander | sort | splitter | uploader
the mapper writes stdout (via puts), reducer reads stdin (via argf.each) , writes stdout (via puts) etc. etc.
it seems when uploader executed files splitter should've have created not created yet. uploader doesn't upload anything.
here's pipeline class:
class pipeline def run(context) raise argumenterror, 'context nil' unless context raise argumenterror, 'context[:logger] nil' unless context[:logger] current_path = file.dirname(__file__) mapper = file.join(current_path, 'mapper.rb') reducer = file.join(current_path, 'reducer.rb') expander = file.join(current_path, 'expander.rb') splitter = file.join(current_path, 'splitter.rb') uploader = file.join(current_path, 'uploader.rb') mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}" command_line = "ruby #{mapper}#{mapper_args} | sort | ruby #{reducer} | ruby #{expander} | sort | ruby #{splitter} | ruby #{uploader}" context[:logger].debug command_line %x{#{command_line}} end end if piped streaming asynchronous in ruby, i'm wondering if doing rubymine fix this. example, before run ruby script prepend command line this: ruby -e $stdout.sync=true;$stderr.sync=true;load($0=argv.shift).
i've updated code technique, however, want know if correctly. or, if there better way?
class pipeline def run(context) raise argumenterror, 'context nil' unless context raise argumenterror, 'context[:logger] nil' unless context[:logger] current_path = file.dirname(__file__) ruby = 'ruby -e $stdout.sync=true;$stderr.sync=true;load($0=argv.shift)' mapper = file.join(current_path, 'mapper.rb') reducer = file.join(current_path, 'reducer.rb') expander = file.join(current_path, 'expander.rb') splitter = file.join(current_path, 'splitter.rb') uploader = file.join(current_path, 'uploader.rb') mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}" create_reports_command_line = "#{ruby} #{mapper}#{mapper_args} | sort | #{ruby} #{reducer} | #{ruby} #{expander} | sort | #{ruby} #{splitter}" context[:logger].debug create_reports_command_line %x{#{create_reports_command_line}} sleep 60 # sleep 1 min, in case... upload_reports_command_line = "#{ruby} #{uploader}" context[:logger].debug upload_reports_command_line %x{#{upload_reports_command_line}} end end
you need enable sync. tells ruby not buffer output, instead send it's output.
sets “sync mode” true or false. when sync mode true, output flushed underlying operating system , not buffered internally.
pipelines should work sync disabled/false, won't see until first, subsequent pipes, see closed input, or have full buffers , flush them, take while.
see io.sync= more information.
Comments
Post a Comment