Skip to content

Instantly share code, notes, and snippets.

@Bouncner
Created February 10, 2023 15:26
Show Gist options
  • Select an option

  • Save Bouncner/658b089ac71325bac73f1b8ffa734c48 to your computer and use it in GitHub Desktop.

Select an option

Save Bouncner/658b089ac71325bac73f1b8ffa734c48 to your computer and use it in GitHub Desktop.
diff --git a/src/lib/operators/join_helper/join_output_writing.hpp b/src/lib/operators/join_helper/join_output_writing.cpp
index e7a618235..d4d827104 100644
--- a/src/lib/operators/join_helper/join_output_writing.hpp
+++ b/src/lib/operators/join_helper/join_output_writing.cpp
@@ -1,131 +1,94 @@
-#pragma once
+#include "join_output_writing.hpp"
#include <unordered_map>
#include <boost/functional/hash_fwd.hpp>
-#include "storage/create_iterable_from_segment.hpp"
+#include "hyrise.hpp"
+#include "scheduler/job_task.hpp"
#include "storage/segment_iterate.hpp"
-namespace hyrise {
+namespace {
-enum class OutputColumnOrder { LeftFirstRightSecond, RightFirstLeftSecond, RightOnly };
+using namespace hyrise; // NOLINT(build/namespaces)
using PosLists = std::vector<std::shared_ptr<const AbstractPosList>>;
-using PosListsByChunk = std::vector<std::shared_ptr<PosLists>>;
+using PosListsByColumn = std::vector<std::shared_ptr<PosLists>>;
+
+
+struct PosListsHasher {
+ size_t operator()(const PosLists& pos_lists) const {
+ return boost::hash_range(pos_lists.begin(), pos_lists.end());
+ }
+};
/**
- * Returns a vector where each entry with index i references a PosLists object. The PosLists object
- * contains the position list of every segment/chunk in column i.
- * @param input_table
+ * Returns a vector with the size of the input table's column count. Each value in this table
*/
-// See usage in _on_execute() for doc.
-inline PosListsByChunk setup_pos_lists_by_chunk(const std::shared_ptr<const Table>& input_table) {
+PosListsByColumn setup_pos_list_mapping(const std::shared_ptr<const Table>& input_table) {
Assert(input_table->type() == TableType::References, "Function only works for reference tables");
- struct PosListsHasher {
- size_t operator()(const PosLists& pos_lists) const {
- return boost::hash_range(pos_lists.begin(), pos_lists.end());
- }
- };
+ const auto chunk_count = input_table->chunk_count();
+ const auto column_count = input_table->column_count();
std::unordered_map<PosLists, std::shared_ptr<PosLists>, PosListsHasher> shared_pos_lists_by_pos_lists;
+ shared_pos_lists_by_pos_lists.reserve(column_count);
- PosListsByChunk pos_lists_by_segment(input_table->column_count());
- auto pos_lists_by_segment_it = pos_lists_by_segment.begin();
-
- const auto input_chunks_count = input_table->chunk_count();
- const auto input_columns_count = input_table->column_count();
+ auto pos_lists_by_column = PosListsByColumn(column_count);
+ auto pos_lists_by_column_it = pos_lists_by_column.begin();
// For every column, for every chunk
- for (auto column_id = ColumnID{0}; column_id < input_columns_count; ++column_id) {
- // Get all the input pos lists so that we only have to pointer cast the segments once
- auto pos_list_ptrs = std::make_shared<PosLists>(input_table->chunk_count());
- auto pos_lists_iter = pos_list_ptrs->begin();
+ for (auto column_id = ColumnID{0}; column_id < column_count; ++column_id) {
+ // Get all the input pos lists so that we only have to pointer cast the segments once. Without storing the
+ // pointers, we would need to fetch the position lists by fetching them from the input table (get chunk,
+ // get segment, cast segment to obtain PosList) in write_output_segments.
+ auto pos_list_ptrs = std::make_shared<PosLists>();
+ pos_list_ptrs->reserve(chunk_count);
// Iterate over every chunk and add the chunks segment with column_id to pos_list_ptrs
- for (ChunkID chunk_id{0}; chunk_id < input_chunks_count; ++chunk_id) {
+ for (auto chunk_id = ChunkID{0}; chunk_id < chunk_count; ++chunk_id) {
const auto chunk = input_table->get_chunk(chunk_id);
Assert(chunk, "Physically deleted chunk should not reach this point, see get_chunk / #1686.");
const auto& ref_segment_uncasted = chunk->get_segment(column_id);
const auto ref_segment = std::static_pointer_cast<const ReferenceSegment>(ref_segment_uncasted);
- *pos_lists_iter = ref_segment->pos_list();
- ++pos_lists_iter;
+ pos_list_ptrs->push_back(ref_segment->pos_list());
}
+ DebugAssert(pos_list_ptrs->size() == chunk_count, "Fewer position lists than expected.");
+
// pos_list_ptrs contains all position lists of the reference segments for the column_id.
auto iter = shared_pos_lists_by_pos_lists.emplace(*pos_list_ptrs, pos_list_ptrs).first;
- *pos_lists_by_segment_it = iter->second;
- ++pos_lists_by_segment_it;
+ *pos_lists_by_column_it = iter->second;
+ ++pos_lists_by_column_it;
}
- return pos_lists_by_segment;
+ return pos_lists_by_column;
}
/**
*
* @param output_segments [in/out] Vector to which the newly created reference segments will be written.
* @param input_table Table which all the position lists reference
- * @param input_pos_list_ptrs_sptrs_by_segments Contains all position lists to all columns of input table
+ * @param input_pos_list_ptrs_sptrs_by_segments Contains all position lists of all columns of input table
* @param pos_list contains the positions of rows to use from the input table
*/
-inline void write_output_segments(Segments& output_segments, const std::shared_ptr<const Table>& input_table,
- const PosListsByChunk& input_pos_list_ptrs_sptrs_by_segments,
- std::shared_ptr<RowIDPosList> pos_list) {
- std::map<std::shared_ptr<PosLists>, std::shared_ptr<RowIDPosList>> output_pos_list_cache;
+void write_output_segments(Segments& output_segments, const std::shared_ptr<const Table>& input_table,
+ const PosListsByColumn& input_pos_list_ptrs_sptrs_by_segments,
+ std::shared_ptr<RowIDPosList>& pos_list) {
+ std::unordered_map<std::shared_ptr<PosLists>, std::shared_ptr<RowIDPosList>> output_pos_list_cache;
std::shared_ptr<Table> dummy_table;
+ const auto chunk_count = input_table->chunk_count();
+ const auto column_count = input_table->column_count();
- // Add segments from input table to output chunk
- // for every column for every row in pos_list: get corresponding PosList of input_pos_list_ptrs_sptrs_by_segments
+ // Add segments from input table to output chunk.
+ // For every column and every row in pos_list: get corresponding PosList of input_pos_list_ptrs_sptrs_by_segments
// and add it to new_pos_list which is added to output_segments
- for (auto column_id = ColumnID{0}; column_id < input_table->column_count(); ++column_id) {
+ for (auto column_id = ColumnID{0}; column_id < column_count; ++column_id) {
if (input_table->type() == TableType::References) {
- if (input_table->chunk_count() > 0) {
- const auto& input_table_pos_lists = input_pos_list_ptrs_sptrs_by_segments[column_id];
-
- auto iter = output_pos_list_cache.find(input_table_pos_lists);
- if (iter == output_pos_list_cache.end()) {
- // Get the row ids that are referenced
- auto new_pos_list = std::make_shared<RowIDPosList>(pos_list->size());
- auto new_pos_list_iter = new_pos_list->begin();
- auto common_chunk_id = std::optional<ChunkID>{};
- for (const auto& row : *pos_list) {
- if (row.chunk_offset == INVALID_CHUNK_OFFSET) {
- *new_pos_list_iter = row;
- common_chunk_id = INVALID_CHUNK_ID;
- } else {
- const auto& referenced_pos_list = *(*input_table_pos_lists)[row.chunk_id];
- *new_pos_list_iter = referenced_pos_list[row.chunk_offset];
-
- // Check if the current row matches the ChunkIDs that we have seen in previous rows
- const auto referenced_chunk_id = referenced_pos_list[row.chunk_offset].chunk_id;
- if (!common_chunk_id) {
- common_chunk_id = referenced_chunk_id;
- } else if (*common_chunk_id != referenced_chunk_id) {
- common_chunk_id = INVALID_CHUNK_ID;
- }
- }
- ++new_pos_list_iter;
- }
- if (common_chunk_id && *common_chunk_id != INVALID_CHUNK_ID) {
- // Track the occuring chunk ids and set the single chunk guarantee if possible. Generally, this is the case
- // if both of the following are true: (1) The probe side input already had this guarantee and (2) no radix
- // partitioning was used. If multiple small PosLists were merged (see MIN_SIZE in join_hash.cpp), this
- // guarantee cannot be given.
- new_pos_list->guarantee_single_chunk();
- }
-
- iter = output_pos_list_cache.emplace(input_table_pos_lists, new_pos_list).first;
- }
-
- auto reference_segment = std::static_pointer_cast<const ReferenceSegment>(
- input_table->get_chunk(ChunkID{0})->get_segment(column_id));
- output_segments.push_back(std::make_shared<ReferenceSegment>(
- reference_segment->referenced_table(), reference_segment->referenced_column_id(), iter->second));
- } else {
+ if (chunk_count == 0) {
// If there are no Chunks in the input_table, we can't deduce the Table that input_table is referencing to.
// pos_list will contain only NULL_ROW_IDs anyway, so it doesn't matter which Table the ReferenceSegment that
// we output is referencing. HACK, but works fine: we create a dummy table and let the ReferenceSegment ref
@@ -134,7 +97,53 @@ inline void write_output_segments(Segments& output_segments, const std::shared_p
dummy_table = Table::create_dummy_table(input_table->column_definitions());
}
output_segments.push_back(std::make_shared<ReferenceSegment>(dummy_table, column_id, pos_list));
+ continue;
+ }
+
+ // this is the list of Pos Lists for a column id
+ const auto& input_table_pos_lists = input_pos_list_ptrs_sptrs_by_segments[column_id];
+
+ Assert(output_pos_list_cache.size() > 0 || output_pos_list_cache.find(input_table_pos_lists) == output_pos_list_cache.end(), "Whoooot?");
+
+ auto iter = output_pos_list_cache.find(input_table_pos_lists);
+ if (iter == output_pos_list_cache.end()) {
+ // Get the row ids that are referenced
+ auto new_pos_list = std::make_shared<RowIDPosList>();
+ new_pos_list->reserve(pos_list->size());
+
+ auto common_chunk_id = std::optional<ChunkID>{};
+ for (const auto& row : *pos_list) {
+ if (row.chunk_offset == INVALID_CHUNK_OFFSET) {
+ new_pos_list->push_back(row);
+ common_chunk_id = INVALID_CHUNK_ID;
+ } else {
+ const auto& referenced_pos_list = *(*input_table_pos_lists)[row.chunk_id];
+ new_pos_list->push_back(referenced_pos_list[row.chunk_offset]);
+
+ // Check if the current row matches the ChunkIDs that we have seen in previous rows
+ const auto referenced_chunk_id = referenced_pos_list[row.chunk_offset].chunk_id;
+ if (!common_chunk_id) {
+ common_chunk_id = referenced_chunk_id;
+ } else if (*common_chunk_id != referenced_chunk_id) {
+ common_chunk_id = INVALID_CHUNK_ID;
+ }
+ }
+ }
+ if (common_chunk_id && *common_chunk_id != INVALID_CHUNK_ID) {
+ // Track the occuring chunk ids and set the single chunk guarantee if possible. Generally, this is the case
+ // if both of the following are true: (1) The probe side input already had this guarantee and (2) no radix
+ // partitioning was used. If multiple small PosLists were merged (see MIN_SIZE in join_hash.cpp), this
+ // guarantee cannot be given.
+ new_pos_list->guarantee_single_chunk();
+ }
+
+ iter = output_pos_list_cache.emplace(input_table_pos_lists, new_pos_list).first;
}
+
+ auto reference_segment = std::static_pointer_cast<const ReferenceSegment>(
+ input_table->get_chunk(ChunkID{0})->get_segment(column_id));
+ output_segments.push_back(std::make_shared<ReferenceSegment>(
+ reference_segment->referenced_table(), reference_segment->referenced_column_id(), iter->second));
} else {
// Check if the PosList references a single chunk. This is easier than tracking the flag through materialization,
// radix partitioning, and so on. Also, actually checking for this property instead of simply forwarding it may
@@ -165,11 +174,14 @@ inline void write_output_segments(Segments& output_segments, const std::shared_p
}
}
}
+} // namespace
-inline std::vector<std::shared_ptr<Chunk>> write_output_chunks(
+namespace hyrise {
+
+std::vector<std::shared_ptr<Chunk>> write_output_chunks(
std::vector<RowIDPosList>& pos_lists_left, std::vector<RowIDPosList>& pos_lists_right,
const std::shared_ptr<const Table>& left_input_table, const std::shared_ptr<const Table>& right_input_table,
- bool create_left_side_pos_lists_by_segment, bool create_right_side_pos_lists_by_segment,
+ bool create_left_side_pos_lists_by_column, bool create_right_side_pos_lists_by_column,
OutputColumnOrder output_column_order, bool allow_partition_merge) {
/**
* Two Caches to avoid redundant reference materialization for Reference input tables. As there might be
@@ -184,32 +196,32 @@ inline std::vector<std::shared_ptr<Chunk>> write_output_chunks(
* They hold one entry per column in the table, not per AbstractSegment in a single chunk
*/
- PosListsByChunk left_side_pos_lists_by_segment;
- PosListsByChunk right_side_pos_lists_by_segment;
+ auto left_side_pos_lists_by_column = PosListsByColumn{};
+ auto right_side_pos_lists_by_column = PosListsByColumn{};
- if (create_left_side_pos_lists_by_segment) {
- left_side_pos_lists_by_segment = setup_pos_lists_by_chunk(left_input_table);
+ if (create_left_side_pos_lists_by_column) {
+ left_side_pos_lists_by_column = setup_pos_list_mapping(left_input_table);
}
- if (create_right_side_pos_lists_by_segment) {
- right_side_pos_lists_by_segment = setup_pos_lists_by_chunk(right_input_table);
+ if (create_right_side_pos_lists_by_column) {
+ right_side_pos_lists_by_column = setup_pos_list_mapping(right_input_table);
}
const auto pos_lists_left_size = pos_lists_left.size();
-
auto expected_output_chunk_count = size_t{0};
- for (size_t partition_id = 0; partition_id < pos_lists_left_size; ++partition_id) {
+ for (auto partition_id = size_t{0}; partition_id < pos_lists_left_size; ++partition_id) {
if (!pos_lists_left[partition_id].empty() || !pos_lists_right[partition_id].empty()) {
++expected_output_chunk_count;
}
}
- std::vector<std::shared_ptr<Chunk>> output_chunks{};
- output_chunks.reserve(expected_output_chunk_count);
+ auto output_chunks = std::vector<std::shared_ptr<Chunk>>(expected_output_chunk_count);
+ auto write_output_segments_tasks = std::vector<std::shared_ptr<AbstractTask>>{};
// For every partition, create a reference segment.
auto partition_id = size_t{0};
- auto output_chunk_id = size_t{0};
+ auto chunk_input_position = size_t{0};
+
while (partition_id < pos_lists_left_size) {
// Moving the values into a shared pos list saves us some work in write_output_segments. We know that
// left_side_pos_list and right_side_pos_list will not be used again.
@@ -251,30 +263,44 @@ inline std::vector<std::shared_ptr<Chunk>> write_output_chunks(
}
}
- Segments output_segments;
- // Swap back the inputs, so that the order of the output columns is not changed.
- switch (output_column_order) {
- case OutputColumnOrder::LeftFirstRightSecond:
- write_output_segments(output_segments, left_input_table, left_side_pos_lists_by_segment, left_side_pos_list);
- write_output_segments(output_segments, right_input_table, right_side_pos_lists_by_segment, right_side_pos_list);
- break;
-
- case OutputColumnOrder::RightFirstLeftSecond:
- write_output_segments(output_segments, right_input_table, right_side_pos_lists_by_segment, right_side_pos_list);
- write_output_segments(output_segments, left_input_table, left_side_pos_lists_by_segment, left_side_pos_list);
- break;
-
- case OutputColumnOrder::RightOnly:
- write_output_segments(output_segments, right_input_table, right_side_pos_lists_by_segment, right_side_pos_list);
- break;
- }
+ // We need to pass the pos lists as parameters to ensure the shared_ptr is copied. Capturing by value result in
+ // const shared_ptrs and write_output_segments expects non-const.
+ auto write_output_segments_task = [&, chunk_input_position] (auto left_side_pos_list, auto right_side_pos_list) {
+ Segments output_segments;
- auto output_chunk = std::make_shared<Chunk>(std::move(output_segments));
+ // Swap back the inputs, so that the order of the output columns is not changed.
+ switch (output_column_order) {
+ case OutputColumnOrder::LeftFirstRightSecond:
+ write_output_segments(output_segments, left_input_table, left_side_pos_lists_by_column, left_side_pos_list);
+ write_output_segments(output_segments, right_input_table, right_side_pos_lists_by_column, right_side_pos_list);
+ break;
+
+ case OutputColumnOrder::RightFirstLeftSecond:
+ write_output_segments(output_segments, right_input_table, right_side_pos_lists_by_column, right_side_pos_list);
+ write_output_segments(output_segments, left_input_table, left_side_pos_lists_by_column, left_side_pos_list);
+ break;
+
+ case OutputColumnOrder::RightOnly:
+ write_output_segments(output_segments, right_input_table, right_side_pos_lists_by_column, right_side_pos_list);
+ break;
+ }
+
+ output_chunks[chunk_input_position] = std::make_shared<Chunk>(std::move(output_segments));
+ };
+
+ // Bind parameters to lambda before passing it to constructor of JobTask.
+ auto write_output_segments_task_params = std::bind(write_output_segments_task, left_side_pos_list,
+ right_side_pos_list);
+ write_output_segments_tasks.emplace_back(std::make_shared<JobTask>(write_output_segments_task_params));
+ write_output_segments_tasks.back()->schedule();
- output_chunks.emplace_back(output_chunk);
++partition_id;
- ++output_chunk_id;
+ ++chunk_input_position;
}
+
+ Hyrise::get().scheduler()->wait_for_tasks(write_output_segments_tasks);
+
+ output_chunks.resize(chunk_input_position);
return output_chunks;
}
} // namespace hyrise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment