Como otimizar / paralelizar o seguinte algoritmo de clustering / joining:

5

Eu tenho algoritmo relativamente pequeno que ocupa ~ 60% do tempo de execução total do meu código científico (57 linhas de 3600), então eu gostaria de encontrar uma maneira de otimizar o que estou fazendo e fazer o código independente de ordem para que eu possa aplicar uma estrutura paralela cilk_for .

Veja o que ele faz, verbalmente : Eu tenho um std::vector de ponteiros para objetos personalizados chamados Segment ( vector<Segment*> newSegment ). Cada Segment contém std::vector de inteiros (índices de malha). Nesta função, eu gostaria de encontrar qualquer Segment que se sobreponha a qualquer outro, com a sobreposição sendo definida como o membro indices sobreposto na linha numérica. Se eles se sobrepuserem, gostaria de juntá-los (insira o A.indices em B.indices ) e exclua um (delete A ).

ex. 1: A.indices = {1,2,3} B.indices = {4,5,6} não se sobrepõem; não faça nada

ex. 2:   A.indices = {1,2,4} B.indices = {3,5,6} fazer sobreposição; A = excluído B.indices = {1,2,3,4,5,6}

As sobreposições são esparsas, mas existentes.

Este é o código atual :

algoritmo principal:

//make sure segments don't overlap
for (unsigned i = 0; i < newSegment.size(); ++i) {
    if (newSegment[i]->size() == 0) continue;
    for (unsigned j = i + 1; j < newSegment.size(); ++j) {
        if (newSegment[i]->size() == 0) continue;
        if (newSegment[j]->size() == 0) continue;
        int i1 = newSegment[i]->begin();
        int i2 = static_cast<int>(newSegment[i]->end());
        int j1 = newSegment[j]->begin();
        int j2 = static_cast<int>(newSegment[j]->end());
        int L1 = abs(i1 - i2); 
        int L2 = abs(j1 - j2); 
        int dist = max(i1,i2,j1,j2) - min(i1,i2,j1,j2);

        //if overlap, fold segments together
        //copy indices from shorter segment to taller segment
        if (dist <= L1 + L2) {
            unsigned more, less;
            if (newSegment[i]->slope == newSegment[j]->slope) {
                if (value_max[i] > value_max[j]) {
                    more = i;
                    less = j;
                } else {
                    more = j;
                    less = i;
                }
            } else if (newSegment[i]->size() == 1) {
                more = j; less = i;
            } else if (newSegment[j]->size() == 1) {
                more = i; less = j;
            } else assert(1 == 0);
              while(!newSegment[less]->indices.empty()) {
                unsigned index = newSegment[less]->indices.back();
                newSegment[less]->indices.pop_back();
                newSegment[more]->indices.push_back(index);
            }
        }
    }

}//end overlap check

//delete empty segments
vector<unsigned> delList;
for (unsigned i = 0; i < newSegment.size(); ++i) {
    if (newSegment[i]->size() == 0) {                            //delete empty
        delList.push_back(i);
        continue;
    }
}
while (delList.size() > 0) {
    unsigned index = delList.back();
    delete newSegment.at(index);
    newSegment.erase(newSegment.begin() + index);
    delList.pop_back();
}

Relevante Segment definição de classe de objeto e funções de membro:

class Segment{

    public:
    Segment();
    ~Segment();

    unsigned size();
    int begin();
    unsigned end();
    std::vector<int> indices;
    double slope;
};

int Segment::begin() {
    if (!is_sorted(indices.begin(),indices.end()))      std::sort(indices.begin(),indices.end());
    if (indices.size() == 0) return -1; 
    return indices[0];
}

unsigned Segment::end() {
    if (!is_sorted(indices.begin(),indices.end()))    std::sort(indices.begin(),indices.end());
    return indices.back();
}

unsigned Segment::size() {
    unsigned indSize = indices.size();
    if (indSize == 1) {
        if (indices[0] == -1) return 0;
    }   
    return indSize;
}

Idéias :

  1. Como não me importo com a ordem dos objetos Segment , eles poderiam estar em um contêiner sem ordem?
  2. No meu algoritmo, encontro a sobreposição observando o primeiro e o último indices de cada segmento. Eu faço um std::is_sorted (e, em seguida, talvez um std::sort ) quando eu buscar o indices porque a lista pode mudar quando mais índices são inseridos. Talvez eu possa colocar o indices em std::set em vez de std::vector para salvar a classificação / verificação explícita de classificação?
  3. Tenho certeza de que, editando o indices conforme eu vou, isso o torna dependente da ordem. Talvez eu pudesse dividir o código na organização a seguir usando o conceito de um gráfico não direcionado para torná-lo independente da ordem:

    • descoberta de borda (sem modificar indices )
    • junte-se a clusters de nós conectados ( Segment de objetos que se sobrepõem) usando uma passagem de gráfico
    • excluir vazios Segment objects

Perguntas

  1. As ideias acima valem a pena ou são insignificantes para o desempenho?
  2. Como posso otimizar isso?
  3. Como (se não o acima) posso tornar o algoritmo independente da ordem?
por Stershic 08.11.2015 / 16:15
fonte

2 respostas

4

A função is_sorted() é provavelmente cara e, portanto, você deve evitá-la. Por que não classificar tudo de uma só vez no início antes de entrar nos loops?

A melhor maneira de otimizar seu código é inventando um novo algoritmo que evita os loops aninhados de N, porque isso tem uma complexidade de O (N ^ 2) (veja "notação big-Oh".) Veja Bart van Ingen O comentário de Schenau abaixo sobre como conseguir isso.

    
por 08.11.2015 / 18:52
fonte
0

Eu cheguei ao algoritmo idêntico do que @BartVanIngenSchenau em este comentário Basicamente, classifique o conjunto de segmentos com base no elemento min de cada segmento. Então dois elementos adjacentes se sobrepõem se e somente se Segment[i].max >= Segment[i+1].min

Mas eu gostaria de acrescentar que a classificação parece desnecessária e manter apenas o elemento max e min. Basta atualizá-los ao mesclar segmentos. Além disso, se o segmento for classificado por elemento min, você terá (segment1+segment2).min = min(segment1.min,segment2.min) (mas isso pode ser uma otimização prematura). Observei (segment1+segment2).max = max(segment1.max,segment2.max) a mesclagem de dois segmentos.

Para a localidade do cache, o melhor para mesclagem pode ser ter um layout semelhante ao layout a seguir

ptr_to_2nd_segment
n_elt_of_1st_segment,
min_elt_of_1st_segment,
[
[other_elts_of_1st_segment,]
max_elt_of_1st_segment,]

ptr_to_3rd_segment
n_elt_of_2nd_segment,
min_elt_of_2nd_segment,
[
[other_elts_of_2nd_segment,]
max_elt_of_2nd_segment,]

...

mesclar dois elementos nessa configuração seria bem simples, seria apenas uma questão de atualizar o ptr para o próximo elemento, adicionando o número de elementos, deslocando os elementos do segundo segmento e trocando os elementos max, se necessário. Isso deixaria algum lixo após cada mesclagem (8 bytes na arquitetura de 32 bits e 16 bytes na arquitetura de 64 bits). Saber se você pode suportar esse tipo de lixo é dependente do aplicativo (além disso, você poderia fazer um tipo de coleta de lixo entre duas iterações do algoritmo).

Para paralelizar, uma vez que o conjunto de segmentos seja classificado por min, você pode dividir em n parte o conjunto de segmentos e fazer a mesclagem independentemente. Em seguida, apenas mescle na borda de cada parte. Mas como o @MikeNakis diz em este comentário como a fusão é bastante memória ligada, eles podem não ser bem paralelizar

    
por 10.11.2015 / 19:19
fonte