· 7 years ago · Feb 15, 2019, 07:09 AM
1# -*- encoding : utf-8 -*-
2class Stream < Sequel::Model(DB[:streams])
3 # extend StreamVerifier
4 attr_reader :id, :csv_file, :file_rows, :filial_id, :start_report_period, :end_report_period, :abbrev, :form_date, :file_lines
5 attr_writer :csv_file, :abbrev
6 one_to_one :csv_file
7
8 def initialize(params = {})
9 @errors = []
10 extend params[:abbrev].constantize
11 super(params)
12 end
13
14 def data() "Stream::#{self[:abbrev]}".constantize.where(stream_id: self[:id]) if self[:abbrev] end
15 def id() self[:id] end
16
17 include AASM
18 aasm do
19 state :not_processed, initial: true
20 state :checking_name
21 state :parsing
22 state :checking_metadata
23 state :validating_data
24 state :normalizing_data
25 state :saving_data
26 state :processed
27
28 #Error states
29 state :name_error
30 state :first_row_name_error
31 state :last_row_counter_error
32
33 event(:check_name) { transitions from: :not_processed, to: :checking_name, after: Proc.new { |*args| check_file_name } }
34 event(:parse) { transitions from: :checking_name, to: :parsing, after: Proc.new { |*args| parse_file } }
35 event(:check_metadata) { transitions from: :parsing, to: :checking_metadata, after: Proc.new { |*args| check_info_data } }
36 event(:validate_data) { transitions from: :checking_metadata, to: :validating_data, after: Proc.new { |*args| validate_data } }
37 event(:normalize_data) { transitions from: :validating_data, to: :normalizing_data, after: Proc.new { |*args| fill_column_indexes } }
38 event(:save_data) { transitions from: :normalizing_data, to: :saving_data, after: Proc.new { |*args| save_to_db } }
39 event(:clean_up) { transitions from: :saving_data, to: :processed, after: Proc.new { |*args| clean_variables } }
40
41 #Error events
42 event(:name_error) { transitions from: :not_processed, to: :name_error }
43 event(:first_row_name_error) { transitions from: :parsing, to: :first_row_name_error }
44 event(:last_row_counter_error) { transitions from: :parsing, to: :last_row_counter_error }
45 end
46
47
48 def process(csv_file)
49 begin
50 update csv_file: csv_file
51 check_name!
52 parse!
53 check_metadata!
54 validate_data!
55 normalize_data!
56 save_data!
57 update filial_id: @filial_id,
58 file_lines: @file_lines,
59 abbrev: @abbrev,
60 start_of_report_period: @start_report_period,
61 end_of_report_period: @start_report_period,
62 form_date: @form_date
63 clean_up!
64 rescue ArgumentError::WrongFileName
65 name_error!
66 clean_variables
67 App.info('wrong_file_name_error!')
68 ErrorMailer.metadata_error(aasm_state, @error_msg).deliver_now
69 rescue ArgumentError::WrongFirstRowFileName
70 first_row_name_error!
71 clean_variables
72 App.info('wrong_first_row_file_name_error!')
73 ErrorMailer.metadata_error(aasm_state, @error_msg).deliver_now
74 rescue ArgumentError::WrongFileRowsCounter
75 last_row_counter_error!
76 clean_variables
77 App.info('wrong_file_rows_counter_error!')
78 ErrorMailer.metadata_error(aasm_state, @error_msg).deliver_now
79 rescue Exception => e
80 @error_msg = %{
81 <h1>В файле потока #{ abbrev } произошла ошибка.</h1>
82 <h2>Ðазвание файла: #{ @file_name }</h2>
83 <h3>Ошибка произошла на Ñтапе #{ aasm_state }!</h3>
84 <p><b>#{ e.message }</b></p>
85 <p>#{ e.backtrace.inspect }</p>
86 }
87 clean_variables
88 ErrorMailer.metadata_error(aasm_state, @error_msg).deliver_now
89 end
90 end
91
92 def check_file_name
93 @file_name = File.basename(csv_file.file.path)
94 if (match = @file_name.match(/(\d*)_([A-Za-z]*)_(\d*)_(\d*)_(\d*)\.(csv|txt)$/)) && match.to_a.map(&:present?).uniq == [true]
95 @filial_id = match[1]
96 @abbrev = match[2]
97 @form_date = match[3].to_date rescue Date.strptime(match[3], '%Y%d%m')
98 @start_report_period = match[4].to_date
99 @end_report_period = match[5].to_date
100 @regex = /#{@filial_id}_#{@abbrev}_#{@form_date.to_s.gsub("-", "")}_#{@start_report_period.to_s.gsub("-", "")}_#{@end_report_period.to_s.gsub("-", "")}/
101 elsif (match = @file_name.match(/(\d*)_([A-Za-z]*)_(\d*)_(\d*)\.(csv|txt)$/)) && match.to_a.map(&:present?).uniq == [true]
102 @filial_id = match[1]
103 @abbrev = match[2]
104 @start_report_period = match[3].to_date
105 @end_report_period = match[4].to_date
106 @regex = /#{@filial_id}_#{@abbrev}_#{@start_report_period.to_s.gsub("-", "")}_#{@end_report_period.to_s.gsub("-", "")}/
107 else
108 @error_msg = Slim::Template.new() { File.read(App.root + '/app/mailers/views/file_naming_error.slim') }.render(binding)
109 raise ArgumentError::WrongFileName, 'Ðеверный формат Ð½Ð°Ð¸Ð¼ÐµÐ½Ð¾Ð²Ð°Ð½Ð¸Ñ Ñ„Ð°Ð¹Ð»Ð°'
110 end
111 end
112
113 def parse_file()
114 @meta_line = csv_file.file.readline
115 @header_line = csv_file.file.readline
116 @rows_counter = 0
117 end
118
119 def check_info_data
120 if not human_error? and has_check_info?
121 first_row_match = @meta_line.match(@regex)
122 # last_row_match = last_line.match("#{ @rows_counter } Ñтрок в отчете")
123 if first_row_match && (true || last_row_match) # && headers == csv_columns.keys.map(&:to_s)
124 @file_lines = @rows_counter
125 else
126 if not first_row_match
127 @error_msg = Slim::Template.new() { File.read(App.root + '/app/mailers/views/meta_error.slim') }.render(binding)
128 raise ArgumentError::WrongFirstRowFileName, 'Ðаименование потока не Ñовпадает Ñ Ð¿ÐµÑ€Ð²Ð¾Ð¹ проверочной Ñтрокой файла!'
129 elsif not last_row_match
130 @error_msg = Slim::Template.new() { File.read(App.root + '/app/mailers/views/rows_counter_error.slim') }.render(binding)
131 raise ArgumentError::WrongFileRowsCounter, 'КоличеÑтво Ñтрок потока не Ñовпадает Ñ Ð¿Ð¾Ñледней проверочной Ñтрокой файла!'
132 else
133 raise ArgumentError, 'ÐеизвеÑÑ‚Ð½Ð°Ñ Ð¾ÑˆÐ¸Ð±ÐºÐ° проверки метаданных файла!'
134 end
135 end
136 else
137 @file_lines = @rows_counter + 2
138 end
139 return true
140 end
141
142 def save_to_db()
143 if has_partitions?
144 period = @start_report_period.to_s
145 table_name = [ ancestry_table_name, @filial_id, period.gsub('-', '') ].join('_')
146 DB.execute("DROP TABLE IF EXISTS #{ table_name }")
147 unless DB.fetch("SELECT 1 FROM information_schema.tables WHERE table_name='#{ table_name }'").first
148 DB.execute %{
149 CREATE TABLE #{ table_name } ( id bigint NOT NULL DEFAULT nextval('#{ ancestry_table_name }_id_seq'::regclass),
150 CONSTRAINT #{ table_name }_id_pk PRIMARY KEY (id)) INHERITS (#{ancestry_table_name});
151 ALTER TABLE #{ table_name } ADD CONSTRAINT start_report_period_check
152 CHECK (
153 filial_id = #{ @filial_id } AND
154 start_report_period >= '#{ period }'::date #{ "AND end_report_period < '#{ period }'::date + interval '1 month'" if @_end_report_period }
155 );
156 #{ create_index( @filial_id, period.gsub('-', '')) }
157 }
158 end
159 copy_header = csv_header << :stream_id
160 period = @start_report_period.to_s.gsub(/-/, '')
161 start_report_period = @start_report_period.to_s
162 filial = @filial_id.to_s
163 chunk = ''
164 chunk_counter = 0
165 chunk_max_size = 500_000
166 file = csv_file.file
167 begin
168 while row = file.readline
169 @rows_counter += 1
170 next unless row.chomp!
171 row.gsub!(QUOTES_REGEX, DOUBLE_SINGLE_QUOTE)
172 row.gsub!(EMPTY_TAB_LINE, EMPTY_STRING)
173 columns = row.split(TAB, -1)
174 if columns[@_filial_id] == filial && ( columns[@_start_report_period] == start_report_period || columns[@_start_report_period] == period ) && columns.size == csv_columns_size
175 chunk << normalize_data(columns)
176 chunk_counter += 1
177 if chunk_counter == chunk_max_size
178 DB.copy_in(table_name, copy_header, chunk)
179 chunk_counter = 0
180 chunk = ''
181 end
182 end
183 end
184 rescue EOFError
185 csv_file.file.close
186 DB.copy_in(table_name, copy_header, chunk)
187 chunk = ''
188 GC.start
189 System.gc()
190 end
191 else
192 DB.copy_in(ancestry_table_name, csv_header << :stream_id, @file_stream)
193 end
194 end
195
196 def clean_variables() nil end
197
198 def has_partitions?() false end
199 def human_error?() false end
200 def send_errors(filial_errors, period_errors, size_errors)
201 if (errors = [filial_errors, period_errors, size_errors]).map(&:present?).include?(true)
202 ErrorMailer.content_error(abbrev, @file_name, csv_columns.keys.unshift('№ Строки'), errors).deliver_now
203 end
204 end
205
206 def validate_data
207 return true
208 filial = @filial_id.to_s
209 period = @start_report_period.to_s.gsub(/-/, '')
210 empty_str = ''
211 filtered_string = ''
212 @file_stream.each_with_index do |str, i|
213 row = str.sub(/\n/, '').split("\t", -1)
214 filtered_string << str unless (row[@_filial_id] || empty_str) != filial || (row[@_start_report_period] || empty_str).gsub(/-/, '') != period || row.size != csv_columns_size
215 end
216 @file_stream.close
217 @file_stream = StringIO.new(filtered_string)
218 filtered_string = nil
219 return true
220 end
221end