使用SIMD优化列式最大值

恩佐·费拉扎诺(Enzo Ferrazzano)

我在花费大量时间的代码中使用了此函数,并且如果可能的话,我想通过vectorization-SIMD-compiler内部函数对其进行优化。

它实质上是在列上的矩阵上找到最大值的值和位置,并将其存储:

  • val_ptr:输入矩阵:主要列(Fortran样式)n_rows-by-n_cols(通常为n_rows >> n_cols)
  • opt_pos_ptr:长度为n_rows的int向量,用于存储最大值的位置。输入时填充零。
  • max_ptr:长度为n_rows的浮点向量,用于存储最大值。在输入时填充了val_ptr第一列的副本
  • 该函数将在并行循环中调用
  • 保证存储区域不重叠
  • 我真的不需要填充max_ptr,目前它仅用于簿记和避免内存分配
  • 我在Windows 10上使用MSVC,C ++ 17。这意味着要运行现代的Intel CPU。

代码,其中模板类型应为float或double:

template <typename eT>
find_max(const int n_cols, 
         const int n_rows, 
         const eT* val_ptr,
         int* opt_pos_ptr,
         eT* max_ptr){
    for (int col = 1; col < n_cols; ++col)
    {
        //Getting the pointer to the beginning of the column
        const auto* value_col = val_ptr + col * n_rows;
        //Looping over the rows
        for (int row = 0; row < n_rows; ++row)
        {
            //If the value is larger than the current maximum, we replace and we store its positions
            if (value_col[row] > max_ptr[row])
            {
                max_ptr[row] = value_col[row];
                opt_pos_ptr[row] = col;
            }
        }
    }
}

到目前为止我尝试过的是:

  • 我试图在内部循环上使用OpenMP并行,但是仅在非常大的行上带来一些东西,比我当前的使用情况大一些。
  • 内部循环中的if阻止了#pragma omp simd正常工作,没有它我无法重写它。
安德烈·谢马舍夫(Andrey Semashev)

根据您发布的代码示例,您似乎想要计算垂直最大值,这意味着在您的情况下,“列”是水平的。在C / C ++中,元素的水平序列(即两个相邻元素在内存中一个元素的距离)通常称为行和垂直(其中两个相邻元素在内存中行大小的距离)-列。在下面的回答中,我将使用传统的术语,其中行是水平的,列是垂直的。

另外,为简洁起见,我将重点介绍矩阵元素的一种可能类型float的基本思想相同double,主要区别在于每个向量的元素数量和_ps/_pd内部选择。最后我将提供一个版本double


这个想法是,您可以使用_mm_max_ps/并行计算多列的垂直最大值_mm_max_pd为了也记录找到的最大值的位置,您可以将先前的最大值与当前元素进行比较。比较的结果是一个掩码,其中元素是全1,其中最大值被更新。该遮罩还可用于选择需要更新的位置。

我必须注意,以下算法假定,如果一列中有多个相等的max元素,则记录哪个max元素的位置并不重要。另外,我假设矩阵不包含NaN值,这会影响比较。稍后对此进行更多讨论。

void find_max(const int n_cols, 
         const int n_rows, 
         const float* val_ptr,
         int* opt_pos_ptr,
         float* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 4 floats or 2 doubles
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128 mm_max = _mm_loadu_ps(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128 mm_value = _mm_loadu_ps(col_ptr);

            // See if this value is greater than the old maximum
            __m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));

            // Compute the maximum
            mm_max = _mm_max_ps(mm_value, mm_max);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_ps(max_ptr + row, mm_max);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}

由于混合内在函数,以上代码需要SSE4.1。您可以使用_mm_and_si128/ _ps_mm_andnot_si128/_ps_mm_or_si128/的组合替换它们_ps,在这种情况下,要求将降低到SSE2。有关特定内在函数的详细信息,请参阅《Intel Intrinsics Guide》,包括它们需要的指令集扩展。


关于NaN值的注释。如果您的矩阵可以包含NaN,则_mm_cmplt_ps测试将始终返回false。至于_mm_max_ps,通常不知道它将返回什么。maxps如果其中一个操作数是NaN,则内部函数转换为指令将返回其第二个(源)操作数,因此,通过排列指令的操作数,您可以实现任一行为。但是,没有记录_mm_max_ps内在函数的哪个参数表示指令的哪个操作数,甚至编译器在不同情况下可能使用不同的关联。有关更多详细信息,请参见此答案。

为了确保wrt的正确行为。NaN可以使用内联汇编器来强制maxps操作数的正确顺序不幸的是,这不是您说过使用的x86-64目标MSVC的选项,因此您可以将_mm_cmplt_ps结果重用于第二次混合,如下所示:

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);

这将抑制NaNs产生的最大值。如果要保留NaN,则可以使用第二个比较来检测NaN:

// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));

如果使用更宽的向量(__m256__m512)并以较小的比例展开外循环,则可能会进一步提高上述算法的性能,以便在内循环的每次迭代中至少加载一条有价值的行数据缓存行。


这是的实现示例double这里要注意的重要一点是,因为double每个向量只有两个元素,每个向量仍然有四个位置,所以我们必须展开外循环以一次处理两个向量,double然后通过与以前的最大值来混合32位位置。

void find_max(const int n_cols, 
         const int n_rows, 
         const double* val_ptr,
         int* opt_pos_ptr,
         double* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 2 doubles, but we want to process
    // two vectors at a time.
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);
            __m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            __m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
            // Compress the 2 masks into one
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);
            mm_max2 = _mm_max_pd(mm_value2, mm_max2);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        _mm_storeu_pd(max_ptr + row + 2, mm_max2);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process 2 doubles at once
    if (tail_size >= 2)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            // Compress the mask. The upper half doesn't matter.
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        // Only store the lower two positions
        _mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);

        row += 2;
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

优化最大值查询

来自分类Dev

使用Python Pandas搜索最大值和最大值百分比时的优化

来自分类Dev

SIMD / SSE:短点积和最大值

来自分类Dev

使用粒子群优化算法在海浪图上找到最大值

来自分类Dev

选择最大值而不使用最大值?

来自分类Dev

优化算法以查找所有局部最大值

来自分类Dev

Rails:优化查询关联表中的最大值

来自分类Dev

使用折叠的列表的局部最大值

来自分类Dev

使用if语句确定最大值

来自分类Dev

SQL:使用最大值过滤行

来自分类Dev

使用保留来跟踪最大值

来自分类Dev

使用OpenMP的数组中的最大值

来自分类Dev

无法使用注释获取最大值

来自分类Dev

使用 subQuery SQL 选择最大值

来自分类Dev

熊猫数据框通过绝对值优化获得最大值和最小值

来自分类Dev

使用 SIMD 进行 HOG 优化

来自分类Dev

通过合并行的值来优化和查找Pandas Df中的最大值

来自分类Dev

在pyspark中使用自定义顺序选择最大值/最大值

来自分类Dev

预期的最大值

来自分类Dev

varchar的最大值

来自分类Dev

每行的最大值

来自分类Dev

AND运算的最大值

来自分类Dev

每行的最大值

来自分类Dev

JTextField的最大值

来自分类Dev

计算最大值

来自分类Dev

选择的最大值

来自分类Dev

使用最小值和最大值的置信带

来自分类Dev

使用awk的最大值和最小值

来自分类Dev

使用Comparable查找最大值/最小值