· 6 years ago · Apr 20, 2019, 12:34 PM
1From 725d8e55bc5b7d17f3a30649f457ff6e87cc95b1 Mon Sep 17 00:00:00 2001
2From: Ryan Curtin <ryan@ratml.org>
3Date: Wed, 10 Apr 2019 08:35:48 -0400
4Subject: [PATCH] Partial attempt to add better bulk insertion support.
5
6---
7 src/DataLoader/DataLoader.jl | 125 +++++----
8 src/DataLoader/check_sorted_sink.jl | 113 ++++++++
9 src/DataLoader/sorted_bulkinsert_sink.jl | 242 ++++++++++++++++++
10 src/PagedDataStructures/betree_bulkinsert.jl | 136 +++++++++-
11 .../paged_vector_dict_bulkinsert.jl | 17 ++
12 5 files changed, 582 insertions(+), 51 deletions(-)
13 create mode 100644 src/DataLoader/check_sorted_sink.jl
14 create mode 100644 src/DataLoader/sorted_bulkinsert_sink.jl
15
16diff --git a/src/DataLoader/DataLoader.jl b/src/DataLoader/DataLoader.jl
17index 7b528d4d..b167850a 100644
18--- a/src/DataLoader/DataLoader.jl
19+++ b/src/DataLoader/DataLoader.jl
20@@ -16,6 +16,8 @@ include("convert.jl")
21 include("../REPL/conversion.jl")
22 include("source.jl")
23 include("sink.jl")
24+include("check_sorted_sink.jl")
25+include("sorted_bulkinsert_sink.jl")
26
27 # The second element of this tuple indicates whether or not the data is sorted.
28 struct DataFile
29@@ -76,62 +78,85 @@ function load_data(fullpath::Union{AbstractString,IO},
30 rows_for_type_detect=1,
31 append=append_to_relation_if_exists, kwargs...)
32 else
33- # Since the data is sorted, let's attempt to read the entire CSV in,
34- # then use bulkinsert to build the relation.
35- print("loading data file\n")
36- @time data = CSV.read(fullpath;
37- allowmissing=:none,
38- rows_for_type_detect=1,
39- kwargs...)
40+ # First, we'll make a pass to find the size of the CSV, to see if
41+ # it's sorted.
42+ @time sink_result = CSV.read(fullpath, CheckSortedSink(relation_columns),
43+ allowmissing=:none, rows_for_type_detect=1, kwargs...)
44+
45+ if sink_result.sorted
46+ # The file is sorted for all relevant keys, so now we can stream
47+ # directly to a BeTreeDict. Let's create all necessary
48+ # BeTreeDicts.
49+ pager = get_pager()
50+ @time bulkinsert_sink_result = CSV.read(fullpath,
51+ SortedBulkInsertSink(relation_columns,
52+ env,
53+ sink_result.rows,
54+ create_relation_if_notexists,
55+ pager),
56+ allowmissing=:none,
57+ rows_for_type_detect=1,
58+ append=append_to_relation_if_exists,
59+ kwargs...)
60+ else
61+
62+ # Since the data is sorted, let's attempt to read the entire CSV in,
63+ # then use bulkinsert to build the relation.
64+ print("loading data file\n")
65+ @time data = CSV.read(fullpath;
66+ allowmissing=:none,
67+ rows_for_type_detect=1,
68+ kwargs...)
69
70- # Compute the types that we will have in our relation.
71- rel_attr_types = map(convert_type, tuple(eltypes(data)...))
72- types = Tuple{rel_attr_types...}
73+ # Compute the types that we will have in our relation.
74+ rel_attr_types = map(convert_type, tuple(eltypes(data)...))
75+ types = Tuple{rel_attr_types...}
76
77- pager = get_pager()
78+ pager = get_pager()
79
80- # Create temporary RelationSinks for use with setup_relation().
81- relation_sinks = [RelationSink(
82- relation_name,
83- columns,
84- create_relation_if_notexists,
85- BeTreeConfig(),
86- Nullable{RelKey}(),
87- Nullable{BeTreeDict}()
88- ) for (relation_name, columns) in relation_columns]
89+ # Create temporary RelationSinks for use with setup_relation().
90+ relation_sinks = [RelationSink(
91+ relation_name,
92+ columns,
93+ create_relation_if_notexists,
94+ BeTreeConfig(),
95+ Nullable{RelKey}(),
96+ Nullable{BeTreeDict}()
97+ ) for (relation_name, columns) in relation_columns]
98
99- for relation_sink in relation_sinks
100- relation_sink.columns =
101- convert_columns(Dict(String(k) => v
102- for (k, v) in getfield(data, :colindex).lookup),
103- relation_sink.columns)
104- print("setup_relation()\n")
105- @time setup_relation(env, pager, append_to_relation_if_exists, types,
106- rel_attr_types, relation_sink)
107+ for relation_sink in relation_sinks
108+ relation_sink.columns =
109+ convert_columns(Dict(String(k) => v
110+ for (k, v) in getfield(data, :colindex).lookup),
111+ relation_sink.columns)
112+ print("setup_relation()\n")
113+ @time setup_relation(env, pager, append_to_relation_if_exists, types,
114+ rel_attr_types, relation_sink)
115
116- # Create something we can pass to bulkinsert!(). Note that
117- # relation_sink.columns[1] is an array of indices for key columns and
118- # relation_sink.columns[2] is an array of indices for data columns.
119- # Unfortunately we have to do some strange conversions because DataFrames are
120- # not actually iterable.
121- print("doing data transformation\n")
122- @time d = if data !== nothing
123- # Unfortunately this may be a costly conversion and should be improved or
124- # avoided.
125- keys = [tuple([row[i] for i in 1:length(relation_sink.columns[1])]...)
126- for row in eachrow(data[relation_sink.columns[1]])]
127- values = [tuple([row[i] for i in 1:length(relation_sink.columns[2])]...)
128- for row in eachrow(data[relation_sink.columns[2]])]
129- TupleWithKeyValueIter(keys, values)
130- else
131- keys = [tuple([row[i] for i in 1:length(relation_sink.columns[1])]...)
132- for row in eachrow(data[relation_sink.columns[1]])]
133- TupleWithTupledKeyAndEmptyValueIter(keys)
134+ # Create something we can pass to bulkinsert!(). Note that
135+ # relation_sink.columns[1] is an array of indices for key columns and
136+ # relation_sink.columns[2] is an array of indices for data columns.
137+ # Unfortunately we have to do some strange conversions because DataFrames are
138+ # not actually iterable.
139+ print("doing data transformation\n")
140+ @time d = if data !== nothing
141+ # Unfortunately this may be a costly conversion and should be improved or
142+ # avoided.
143+ keys = [tuple([row[i] for i in 1:length(relation_sink.columns[1])]...)
144+ for row in eachrow(data[relation_sink.columns[1]])]
145+ values = [tuple([row[i] for i in 1:length(relation_sink.columns[2])]...)
146+ for row in eachrow(data[relation_sink.columns[2]])]
147+ TupleWithKeyValueIter(keys, values)
148+ else
149+ keys = [tuple([row[i] for i in 1:length(relation_sink.columns[1])]...)
150+ for row in eachrow(data[relation_sink.columns[1]])]
151+ TupleWithTupledKeyAndEmptyValueIter(keys)
152+ end
153+ # d has to be a key-value-type dict thing of some sort
154+ print("doing bulkinsert!()\n")
155+ @time bulkinsert!(get(relation_sink.relation_data), d)
156+ markimmutable(get(relation_sink.relation_data))
157 end
158- # d has to be a key-value-type dict thing of some sort
159- print("doing bulkinsert!()\n")
160- @time bulkinsert!(get(relation_sink.relation_data), d)
161- markimmutable(get(relation_sink.relation_data))
162 end
163 end
164 finally
165diff --git a/src/DataLoader/check_sorted_sink.jl b/src/DataLoader/check_sorted_sink.jl
166new file mode 100644
167index 00000000..bb42f4e2
168--- /dev/null
169+++ b/src/DataLoader/check_sorted_sink.jl
170@@ -0,0 +1,113 @@
171+using DataStreams
172+import Delve.Database: EMPTY_VAL_TYPE
173+
174+const NON_EXISTING_ROW_ID = 0
175+const EMPTY_ROW = Vector
176+
177+"""
178+CheckSortedSink is a DataStreams sink that we can use to check whether or not a
179+CSV is sorted by its keys, and it also counts the number of rows in the CSV
180+(which we will need for a later bulkinsert operation).
181+"""
182+mutable struct CheckSortedSink <: Data.Sink
183+ # Whether or not the CSV is sorted. True if we haven't encountered any
184+ # out-of-order keys. This must be true for *all* keys being used in this
185+ # file.
186+ sorted::Bool
187+ # The current number of rows we've seen.
188+ rows::Int
189+ # The indices of the key columns.
190+ keys::Vector{Vector{Int}}
191+ # The last key we saw, for comparison.
192+ last_vals::NamedTuple
193+ # Input columns. We will match all the keys with column indices later.
194+ input_columns::RelationColumns
195+
196+ # Create a CheckSortedSink.
197+ CheckSortedSink(keys::RelationColumns) = new(true, NON_EXISTING_ROW_ID, [[]],
198+ NamedTuple(), keys)
199+
200+ # These constructors are required by the DataStreams interface.
201+ CheckSortedSink(sch::Data.Schema, T::DataType, append::Bool, args...;
202+ reference::Vector{UInt8}=UInt8[], kwargs...) = new(true,
203+ NON_EXISTING_ROW_ID, [[]], NamedTuple(), nothing)
204+
205+ # Using the given Data.Schema, match columns in sink.input_columns with the
206+ # numeric indices of those columns found by Data.Schema. Note that we have
207+ # multiple sets of keys, so we have to call convert_columns() on each set of
208+ # key/value pairs in sink.input_columns, and then keep only the keys after
209+ # conversion to ints.
210+ CheckSortedSink(sink::CheckSortedSink, sch::Data.Schema, T::DataType, append::Bool,
211+ args...; reference::Vector{UInt8}=UInt8[]) = new(sink.sorted, sink.rows,
212+ [convert_columns(sch.index, c[2])[1] for c in sink.input_columns],
213+ NamedTuple(), sink.input_columns)
214+end
215+
216+Data.streamtypes(::Type{CheckSortedSink}) = [Data.Row]
217+
218+"""
219+ Data.streamto!(sink::CheckSortedSink, t::Type{Data.Field}, val, row, col)
220+
221+Implementation of DataStreams streaming API for the CheckSortedSink. This adds
222+the value to the current list of keys, and then checks the keys if we are at a
223+new row.
224+"""
225+function Data.streamto!(sink::CheckSortedSink, t::Type{Data.Row}, vals::NamedTuple, row, col)
226+ # separate function so code_warntype is easy to use
227+ streamto_call!(sink, t, vals, row, col)
228+end
229+
230+function streamto_call!(sink::CheckSortedSink, t::Type{Data.Row}, vals::NamedTuple, row, col)
231+ # Given the whole row, check the keys in it.
232+ sink.rows = row
233+
234+ # No need to check if it is the first row.
235+ if sink.rows == 1
236+ sink.last_vals = vals
237+ return
238+ end
239+
240+ # If we already know it isn't sorted we're done.
241+ # TODO: probably this should terminate entirely.
242+ if !sink.sorted
243+ return
244+ end
245+
246+ # Check all the keys to see if it is sorted. This is type unstable and
247+ # really needs to be rewritten as a generated function with the exact types
248+ # of the NamedTuple.
249+ for i in 1:length(sink.keys)
250+ break_early = false
251+ for key in sink.keys[i]
252+ if vals[key] > sink.last_vals[key]
253+ break_early = true
254+ break
255+ end
256+ end
257+
258+ sink.sorted = break_early
259+ end
260+
261+ sink.last_vals = vals
262+ return
263+end
264+
265+"""
266+ Data.cleanup!(sink::CheckSortedSink)
267+
268+Clean up the stream in case of problems during reading. In our case, for
269+CheckSortedSink, we don't have to do anyting.
270+"""
271+function Data.cleanup!(sink::CheckSortedSink)
272+ nothing
273+end
274+
275+"""
276+ Data.close!(sink::CheckSortedSink)
277+
278+Finish reading a stream. For CheckSortedSink, this means just checking the last
279+row of keys to see if they are sorted.
280+"""
281+function Data.close!(sink::CheckSortedSink)::CheckSortedSink
282+ sink
283+end
284diff --git a/src/DataLoader/sorted_bulkinsert_sink.jl b/src/DataLoader/sorted_bulkinsert_sink.jl
285new file mode 100644
286index 00000000..d9d72cb6
287--- /dev/null
288+++ b/src/DataLoader/sorted_bulkinsert_sink.jl
289@@ -0,0 +1,242 @@
290+# sorted_bulkinsert_sink.jl
291+#
292+# The SortedBulkInsertSink is used after the CheckSortedSink to insert in bulk
293+# into a BeTreeDict from a DataStream. This is a second pass over the file.
294+
295+using DataStreams
296+using ..Database
297+using ..PagedDataStructures
298+using ..ExtendedPagedDataStructures
299+import ..PagedDataStructures: PMAImpl
300+using InteractiveUtils
301+
302+"""
303+ BulkRelationSink{K, V, E}
304+
305+This holds the data for a single relation so that bulk insertion can be done.
306+Although this is now partially type stable...
307+
308+ 1. We need to avoid the use of Nullable{}
309+ 2. Insertion is not type stable and extremely slow.
310+"""
311+mutable struct BulkRelationSink{K, V, E}
312+ relation_name::Symbol
313+ key_indices::Vector{Int}
314+ values_indices::Vector{Int}
315+
316+ # Relation key.
317+ relation_key::Nullable{RelKey}
318+ # Relation data.
319+ relation_data::Nullable{BeTreeDict{K, V, E}}
320+
321+ # Information used during bulk insertion.
322+ leaf_pages::Vector{Tuple{Pager.PagePtr, Blob{BeLeafNode{K, V, E}}}}
323+ current_pma_index::Int
324+ current_pma::Nullable{PMAImpl{K, V}}
325+ current_pma_capacity::Int
326+ current_pma_item_index::Int
327+ internal_nodes_data::Vector{Tuple{K, PageId}}
328+end
329+
330+"""
331+ SortedBulkInsertSink
332+
333+This is basically a wrapper class around all the individual bulk relation sinks.
334+"""
335+mutable struct SortedBulkInsertSink <: Data.Sink
336+ # Database environment.
337+ env::Database.Environment
338+
339+ # Total number of rows, found by CheckSortedSink.
340+ rows::Int
341+ # Current row we are parsing.
342+ current_row::Int
343+ # Number of rows we have finished processing (this could be different than
344+ # current_row because of CSV headers).
345+ rows_processed::Int
346+ # Keys and values for each relation.
347+ input_columns::RelationColumns
348+ # Type of the relation arguments.
349+ rel_attr_types::Tuple
350+
351+ create_relation_if_notexists::Bool
352+ pager::AbstractPager
353+ create_config::StorageConfig
354+ # Collection of RelationSinks, which holds the BeTreeDicts.
355+ relation_sinks::Vector{BulkRelationSink}
356+
357+ # Not sure if this is going to be the way to go with this.
358+ SortedBulkInsertSink(input_columns::RelationColumns,
359+ env::Database.Environment,
360+ rows::Int,
361+ create_relation_if_notexists::Bool,
362+ pager::AbstractPager,
363+ create_config::StorageConfig = BeTreeConfig()) = new(env, rows, 1, 0,
364+ input_columns, EMPTY_VAL_TYPE(), create_relation_if_notexists,
365+ pager, create_config, [])
366+
367+ function SortedBulkInsertSink(sink::SortedBulkInsertSink, sch::Data.Schema,
368+ T::DataType, append::Bool; reference::Vector{UInt8}=UInt8[])
369+
370+ # Get the types for each column in the schema.
371+ rel_attr_types = map(convert_type, Data.types(sch))
372+
373+ # Check that an invalid type isn't specified...
374+ for T in rel_attr_types
375+ if T isa Union
376+ error("Missing data is not supported in SortedBulkInsertSink")
377+ end
378+ end
379+
380+ pager = sink.pager
381+ rec_type = Tuple{rel_attr_types...}
382+
383+ # Next, set up the current_keys and current_values vectors, as well as
384+ # the relations themselves.
385+ relation_sinks = Vector{BulkRelationSink}()
386+ for (rname, cols) in sink.input_columns
387+ mapped_cols = convert_columns(sch.index, cols)
388+
389+ key_types = Tuple{[fieldtype(rec_type, k) for k in mapped_cols[1]]...}
390+ val_types = Tuple{[fieldtype(rec_type, v) for v in mapped_cols[2]]...}
391+
392+ push!(relation_sinks, BulkRelationSink(rname,
393+ mapped_cols[1], mapped_cols[2], Nullable{RelKey}(),
394+ Nullable{BeTreeDict{key_types, val_types,
395+ sink.create_config.epsilon}}(),
396+ Vector{Tuple{Pager.PagePtr, Blob{BeLeafNode{key_types,
397+ val_types, sink.create_config.epsilon}}}}(),
398+ 0, Nullable{PMAImpl{key_types,
399+ val_types}}(), 0, 0, Vector{Tuple{key_types, PageId}}()))
400+
401+ setup_bulk_relation(sink.env, pager, append, mapped_cols, rec_type,
402+ rel_attr_types, sink.create_relation_if_notexists,
403+ sink.create_config, last(relation_sinks))
404+
405+ # Allocate space for the relation and prepare for bulk insertion.
406+ tree = get(last(relation_sinks).relation_data)
407+
408+ last(relation_sinks).leaf_pages = bulkinsert_allocate!(tree, sink.rows, pager)
409+ (last(relation_sinks).current_pma_index,
410+ last(relation_sinks).current_pma,
411+ last(relation_sinks).current_pma_capacity,
412+ last(relation_sinks).internal_nodes_data) = bulkinsert_get_first_pma!(last(relation_sinks).leaf_pages, sink.rows)
413+ last(relation_sinks).current_pma_item_index = 1
414+ end
415+
416+ new(sink.env, sink.rows, 1, 0, sink.input_columns, rel_attr_types,
417+ sink.create_relation_if_notexists, pager, sink.create_config,
418+ relation_sinks)
419+ end
420+end
421+
422+# Read a whole row at a time. It helps speed but isn't great.
423+Data.streamtypes(::Type{SortedBulkInsertSink}) = [Data.Row]
424+
425+function setup_bulk_relation(env::Database.Environment, pager::AbstractPager,
426+ append::Bool, cols::Columns, rec_type::DataType, rel_attr_types::Tuple,
427+ create_relation_if_notexists::Bool,
428+ create_config::StorageConfig,
429+ relation_sink::BulkRelationSink) :: Nothing
430+ (key_types, value_types) =
431+ if isa(cols, Nothing)
432+ (Tuple{rel_attr_types...}, Tuple{})
433+ else begin
434+ (key_types, value_types) = map_columns(DataType, i -> rec_type.parameters[i], cols)
435+ (key_types, value_types) = (Tuple{key_types...}, Tuple{value_types...})
436+ end
437+ end
438+ relkey = RelKey(relation_sink.relation_name, key_types, value_types)
439+ relation_sink.relation_key = relkey
440+ ((page, tree), rel) = try
441+ # TODO Note that this will find the relation if it has value
442+ # arguments. It would probably be necessary to iterate over all
443+ # relations with this name and heuristically search for
444+ # matches. Or, drop the two modes and simply require the relation
445+ # to either be clearly specified upfront (so no get here), or
446+ # always created without functional dependencies.
447+ r = get_relation(relkey, env.schema)
448+ if !append
449+ error("Relation $(to_string(r)) already exists, but the user requested not to append to an existing table.")
450+ end
451+ (load_relation_mutable(r, env.schema, pager), r)
452+ # TODO catch a more specific exception here or check without exception
453+ catch e
454+ if !create_relation_if_notexists
455+ error("Relation $(to_string(relkey)) does not exist and the user requested not to create it.")
456+ else
457+ @warn("Relation $(to_string(relkey)) is being created.")
458+
459+ data = MutableBeTreeSet{rec_type,1.0}(get_pager(env))
460+ (data, RelationInfo(relation_sink.relation_name, data))
461+ end
462+ create_relation(relkey, get_schema(env), create_config), get_relation(relkey, get_schema(env))
463+ end
464+ (K,V,E) = treetypes(tree)
465+ relation_sink.relation_data = MutableBeTreeDict{K,V,E}(page, tree, pager)
466+ nothing
467+end
468+
469+# Process a row for an individual relation sink. Although this works reasonably
470+# because of multiple dispatch, it is still type unstable.
471+function streamrow_into_sink!(relation_sink::BulkRelationSink, rows_processed::Int, rows::Int, vals::NamedTuple)
472+ tree = get(relation_sink.relation_data)
473+
474+ # Before we insert, do we need a new PMA?
475+ # Checking here means that we will never insert past the end of the
476+ # PMAs.
477+ if relation_sink.current_pma_item_index > relation_sink.current_pma_capacity
478+ (relation_sink.current_pma_index,
479+ relation_sink.current_pma,
480+ relation_sink.current_pma_capacity) = bulkinsert_get_next_pma!(
481+ relation_sink.leaf_pages,
482+ relation_sink.internal_nodes_data,
483+ relation_sink.current_pma_index,
484+ relation_sink.current_pma_capacity,
485+ rows_processed,
486+ rows)
487+
488+ # Reset to 1 since we are now inserting items into a new PMA.
489+ relation_sink.current_pma_item_index = 1
490+ end
491+
492+ # These lines are a disaster. We need a generated function and maybe a
493+ # generated mutable tuple type.
494+ ks = tuple([convert_value(vals[i]) for i in relation_sink.key_indices]...)
495+ vs = tuple([convert_value(vals[i]) for i in relation_sink.values_indices]...)
496+ bulkinsert_at_location!(get(relation_sink.current_pma), ks, vs,
497+ relation_sink.current_pma_item_index)
498+ relation_sink.current_pma_item_index += 1
499+end
500+
501+function Data.streamto!(sink::SortedBulkInsertSink, t::Type{Data.Row}, vals::NamedTuple, row, col)
502+ for i in 1:length(sink.relation_sinks)
503+ relation_sink = sink.relation_sinks[i]
504+ #@code_warntype
505+ streamrow_into_sink!(relation_sink, sink.rows_processed,
506+ sink.rows, vals)
507+ end
508+
509+ sink.rows_processed += 1
510+end
511+
512+function Data.cleanup!(sink::SortedBulkInsertSink)
513+ # TODO: handle problems
514+end
515+
516+function Data.close!(sink::SortedBulkInsertSink)::SortedBulkInsertSink
517+ # Lastly, handle finishing the bulk insertion process.
518+ for relation_sink in sink.relation_sinks
519+ tree = get(relation_sink.relation_data)
520+
521+ bulkinsert_finalize!(tree.tree, relation_sink.leaf_pages,
522+ relation_sink.internal_nodes_data,
523+ relation_sink.current_pma_capacity, sink.rows, sink.pager)
524+
525+ rel_data = markimmutable(tree)
526+ rel_info = relation_info(sink.env, get(relation_sink.relation_key))
527+ set_relation(sink.env, rel_info, rel_data)
528+ end
529+
530+ sink
531+end
532diff --git a/src/PagedDataStructures/betree_bulkinsert.jl b/src/PagedDataStructures/betree_bulkinsert.jl
533index b38facef..78774f44 100644
534--- a/src/PagedDataStructures/betree_bulkinsert.jl
535+++ b/src/PagedDataStructures/betree_bulkinsert.jl
536@@ -2,13 +2,144 @@ using Blobs
537 using ..Pager
538 import ..PagedDataStructures: BeTree, BeInternalNode, BeLeafNode, setrightmost!,
539 bulkinsert!, set_root, get_mutable_root!, alloc_paged_obj,
540- usable_bulk_capacity, PMAImpl
541+ usable_bulk_capacity, PMAImpl, MutableBeTreeDict
542 import ..MiniSearchAPI: iter_init, iter_next
543 import ..Common: @dassert1
544
545 #######################################
546 # Bulk Insert Impl for BeTree
547 #######################################
548+
549+"""
550+ bulkinsert_allocate!(tree, num_elems, pager)
551+
552+Allocate the leaf pages for bulk insertion into a BeTree. Note that after
553+calling this, you'll have to use bulkinsert_get_next_pma!() in order to get a
554+PMAImpl{} to insert data into. After all that is done, you'll have to call
555+bulkinsert_finalize!() to finish the bulk insertion process.
556+"""
557+function bulkinsert_allocate!(tree::MutableBeTreeDict{K, V, E}, num_elems::Int, pager::AbstractPager) where {K, V, E}
558+
559+ # Don't call this on a non-empty tree!
560+ @dassert1 isempty(tree.tree)
561+
562+ capacity = tree.tree.info.capacity
563+ leaf_pages = create_leaf_pages(tree.tree, num_elems, pager)
564+
565+ # Now it is the user's responsibility to imitate fill_node_pages!() via
566+ # bulkinsert_get_next_pma!().
567+ leaf_pages
568+end
569+
570+"""
571+ bulkinsert_get_first_pma!(leaf_pages, total_num_tuples)
572+
573+Once the leaf pages are allocated, call this to get the first PMAImpl to insert
574+elements into with bulkinsert_at_location!(). This returns four values:
575+
576+ 1. current_pma_index (1)
577+ 2. current_pma (PMAImpl{V, E})
578+ 3. current_pma_capacity (number of elements we can put in it)
579+ 4. internal_nodes_data (empty vector to be passed to bulkinsert_get_next_pma!())
580+"""
581+function
582+bulkinsert_get_first_pma!(leaf_pages::Vector{Tuple{Pager.PagePtr,
583+ Blob{BeLeafNode{K, V, E}}}},
584+ total_num_tuples::Int)::Tuple{Int,
585+ Any, # PMAImpl{V, E},
586+ Int,
587+ Vector{Tuple{K, PageId}}} where {K, V, E}
588+
589+ # Compute the desired length of the first PMAImpl.
590+ total_pmas = length(leaf_pages)
591+ pma_capacity = Int(ceil(total_num_tuples / total_pmas))
592+
593+ # For the internal_nodes_data we return a new empty vector. That will get
594+ # filled by bulkinsert_get_next_pma!().
595+ (1, leaf_pages[1][2].items[], pma_capacity, Vector{Tuple{K, PageId}}())
596+
597+end
598+
599+"""
600+ bulkinsert_get_next_pma!(leaf_pages, internal_nodes_data, current_pma_index,
601+ current_pma_capacity, current_tuple_index, total_num_tuples)
602+
603+Get the next PMA to insert into given the index of the current one we are
604+looking at. This returns a Tuple{Int, PMAImpl{V, E}, Int}; the first element is
605+the index of the new PMA; the second is the PMAImpl itself, and the third is the
606+number of elements that should be inserted into the PMAImpl before calling this
607+function again to get a new one.
608+
609+This also does cleanup on the previous PMA, and updates the internal_nodes_data
610+that will be needed to pass to bulkinsert_finalize!() when bulk insertion is
611+done.
612+"""
613+function bulkinsert_get_next_pma!(leaf_pages::Vector{Tuple{Pager.PagePtr,
614+ Blob{BeLeafNode{K, V, E}}}},
615+ internal_nodes_data::Vector{Tuple{K, PageId}},
616+ current_pma_index::Int,
617+ current_pma_capacity::Int,
618+ current_tuple_index::Int,
619+ total_num_tuples::Int)::Tuple{Int, PMAImpl{K, V}, Int} where {K, V, E}
620+ # We need the last element key, which we can extract from the end of the
621+ # current PMAImpl{}, as well as the page pointer.
622+ last_elem_key = leaf_pages[current_pma_index][2].items[].keys[current_pma_capacity]
623+ page_ptr = leaf_pages[current_pma_index][1]
624+ push!(internal_nodes_data, (last_elem_key, get_page_id(page_ptr)))
625+
626+ # Clean up current PMAImpl{} after all the insertions.
627+ setlength!(leaf_pages[current_pma_index][2].items[], current_pma_capacity)
628+ set_sorted_length!(leaf_pages[current_pma_index][2].items[], current_pma_capacity)
629+
630+ # Compute the number of points that will fall into this next PMAImpl{}.
631+ # Note that this may cause earlier PMAImpl{}s to have more points than
632+ # later ones.
633+ tuples_left = total_num_tuples - current_tuple_index
634+ total_pmas = length(leaf_pages)
635+
636+ pma_capacity = Int(ceil(tuples_left / (total_pmas - current_pma_index)))
637+
638+ return current_pma_index + 1, leaf_pages[current_pma_index + 1][2].items[], pma_capacity
639+end
640+
641+"""
642+ bulkinsert_finalize!(tree, leaf_pages, internal_nodes_data,
643+ final_pma_capacity, num_elems, pager)
644+
645+After bulk insertion is completed and all the PMAs are full, this should be
646+called to build the rest of the tree. After caling this the tree is ready.
647+"""
648+function bulkinsert_finalize!(tree::BeTree{K, V, E},
649+ leaf_pages::Vector{Tuple{Pager.PagePtr,
650+ Blob{BeLeafNode{K, V, E}}}},
651+ internal_nodes_data::Vector{Tuple{K, PageId}},
652+ final_pma_capacity::Int,
653+ num_elems::Int,
654+ pager::AbstractPager) where {K, V, E}
655+ # Finalize last PMA state.
656+ last_elem_key = last(leaf_pages)[2].items[].keys[final_pma_capacity]
657+ page_ptr = last(leaf_pages)[1]
658+ push!(internal_nodes_data, (last_elem_key, get_page_id(page_ptr)))
659+
660+ setlength!(last(leaf_pages)[2].items[], final_pma_capacity)
661+ set_sorted_length!(last(leaf_pages)[2].items[], final_pma_capacity)
662+
663+ t = tree.state
664+ if length(leaf_pages) > 1
665+ # Some internal nodes will be required.
666+ init_height = 2 # The height of internal nodes above leaf nodes.
667+ (root_page, root_node, height) = bulk_create_internal_nodes(tree,
668+ convert(Array{Tuple{K, PageId}}, internal_nodes_data),
669+ init_height,
670+ pager)
671+
672+ set_root(t, root_page)
673+ t.height[] = height
674+ end
675+
676+ t.count[] = num_elems
677+end
678+
679 @inline function bulkinsert!(node::Blob{BeInternalNode{K,V,E}}, iter, iter_state,
680 height::Int, num_elems_per_bucket::Int) where {K,V,E}
681 #(num_elems_per_bucket-1) pages are added as pivots
682@@ -154,3 +285,6 @@ function create_internal_pages(tree::BeTree{K,V,E}, num_elems::Int, height::Int,
683 end
684 internal_pages
685 end
686+
687+# probably not the right place to do this
688+export bulkinsert_allocate!, bulkinsert_get_first_pma!, bulkinsert_get_next_pma!, bulkinsert_finalize!
689diff --git a/src/PagedDataStructures/paged_vector_dict_bulkinsert.jl b/src/PagedDataStructures/paged_vector_dict_bulkinsert.jl
690index 3495012f..e5bf12c8 100644
691--- a/src/PagedDataStructures/paged_vector_dict_bulkinsert.jl
692+++ b/src/PagedDataStructures/paged_vector_dict_bulkinsert.jl
693@@ -25,3 +25,20 @@ function bulkinsert!(s::PagedVectorDict{K,V}, iter, iter_state,
694
695 last_iter_next_res
696 end
697+
698+"""
699+ bulkinsert_at_location!(s::PagedVectorDict{K, V}, key, value, loc)
700+
701+Insert the given key, value pair at the given location in the PagedVectorDict s.
702+It's up to you to ensure that 'loc' is between 1 and the length of s. There are
703+no guarantees on what happens if you get that wrong.
704+"""
705+function bulkinsert_at_location!(s::PagedVectorDict{K, V}, key::K, value::V, loc::Int) where {K, V}
706+
707+ s.keys[loc] = key
708+ s.values[loc] = value
709+
710+end
711+
712+# probably not the right place to do this
713+export bulkinsert_at_location!
714--
7152.20.1