Skip to content

Commit c570491

Browse files
authored
Merge pull request #187 from maciekpac/release/ccl_2021.15.7-arc
Intel(R) oneAPI Collective Communications Library (oneCCL) 2021.15.7
2 parents 0a67730 + cc828d4 commit c570491

30 files changed

+738
-157
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ endif()
335335

336336
set(CCL_MAJOR_VERSION "2021")
337337
set(CCL_MINOR_VERSION "15")
338-
set(CCL_UPDATE_VERSION "6")
338+
set(CCL_UPDATE_VERSION "7")
339339
set(CCL_PRODUCT_STATUS "Gold")
340340
string(TIMESTAMP CCL_PRODUCT_BUILD_DATE "%Y-%m-%dT %H:%M:%SZ")
341341
get_vcs_properties("git")

examples/benchmark/include/benchmark.hpp

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,9 @@ void store_to_csv(const user_options_t& options,
377377
double max_time,
378378
double avg_time,
379379
double stddev,
380-
double wait_avg_time) {
380+
double wait_avg_time,
381+
double algbw,
382+
double busbw) {
381383
std::ofstream csvf;
382384
csvf.open(options.csv_filepath, std::ofstream::out | std::ofstream::app);
383385

@@ -396,7 +398,7 @@ void store_to_csv(const user_options_t& options,
396398
<< "," << ccl::get_datatype_size(dtype) << "," << elem_count << ","
397399
<< ccl::get_datatype_size(dtype) * elem_count << "," << buf_count << ","
398400
<< iter_count << "," << min_time << "," << max_time << "," << avg_time << ","
399-
<< stddev << "," << wait_avg_time << std::endl;
401+
<< stddev << "," << wait_avg_time << "," << algbw << "," << busbw << std::endl;
400402
}
401403
csvf.close();
402404
}
@@ -472,13 +474,42 @@ void print_timings(const ccl::communicator& comm,
472474
max_time /= iter_count;
473475

474476
size_t bytes = elem_count * ccl::get_datatype_size(dtype) * buf_count;
477+
478+
double algbw = bytes / total_avg_time / 1000;
479+
480+
if (ncolls == 1) {
481+
if (options.coll_names.front() == "allgather" ||
482+
options.coll_names.front() == "allgatherv" ||
483+
options.coll_names.front() == "reduce_scatter" ||
484+
options.coll_names.front() == "alltoall" ||
485+
options.coll_names.front() == "alltoallv") {
486+
algbw = algbw * nranks;
487+
}
488+
}
489+
490+
double busbw = algbw;
491+
if (ncolls == 1) {
492+
if (options.coll_names.front() == "allreduce") {
493+
busbw = algbw * 2 * (nranks - 1) / nranks;
494+
}
495+
else if (options.coll_names.front() == "allgather" ||
496+
options.coll_names.front() == "allgatherv" ||
497+
options.coll_names.front() == "reduce_scatter" ||
498+
options.coll_names.front() == "alltoall" ||
499+
options.coll_names.front() == "alltoallv") {
500+
busbw = algbw * (nranks - 1) / nranks;
501+
}
502+
}
503+
475504
std::stringstream ss;
476505
ss << std::right << std::fixed << std::setw(COL_WIDTH) << bytes << std::setw(COL_WIDTH)
477506
<< elem_count * buf_count << std::setw(COL_WIDTH) << iter_count << std::setw(COL_WIDTH)
478507
<< std::setprecision(COL_PRECISION) << min_time << std::setw(COL_WIDTH)
479508
<< std::setprecision(COL_PRECISION) << max_time << std::setw(COL_WIDTH)
480509
<< std::setprecision(COL_PRECISION) << total_avg_time << std::setw(COL_WIDTH - 3)
481-
<< std::setprecision(COL_PRECISION) << stddev << std::setw(COL_WIDTH + 3);
510+
<< std::setprecision(COL_PRECISION) << stddev << std::setw(COL_WIDTH)
511+
<< std::setprecision(COL_PRECISION) << algbw << std::setw(COL_WIDTH)
512+
<< std::setprecision(COL_PRECISION) << busbw << std::setw(COL_WIDTH + 3);
482513

483514
if (show_extened_info(options.show_additional_info)) {
484515
ss << std::right << std::fixed << std::setprecision(COL_PRECISION) << wait_avg_time;
@@ -497,7 +528,9 @@ void print_timings(const ccl::communicator& comm,
497528
max_time,
498529
total_avg_time,
499530
stddev,
500-
wait_avg_time);
531+
wait_avg_time,
532+
algbw,
533+
busbw);
501534
}
502535
}
503536

examples/benchmark/src/benchmark.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ void run(ccl::communicator& service_comm,
105105
<< "#elem_count" << std::setw(COL_WIDTH) << "#repetitions"
106106
<< std::setw(COL_WIDTH) << "t_min[usec]" << std::setw(COL_WIDTH) << "t_max[usec]"
107107
<< std::setw(COL_WIDTH) << "t_avg[usec]" << std::setw(COL_WIDTH - 3)
108-
<< "stddev[%]";
108+
<< "stddev[%]" << std::setw(COL_WIDTH) << "algbw[GB/s]" << std::setw(COL_WIDTH)
109+
<< "busbw[GB/s]";
109110

110111
if (show_extened_info(options.show_additional_info)) {
111112
ss << std::right << std::setw(COL_WIDTH + 3) << "wait_t_avg[usec]";
@@ -435,7 +436,9 @@ int main(int argc, char* argv[]) {
435436
<< "t_max[usec],"
436437
<< "t_avg[usec],"
437438
<< "stddev[%],"
438-
<< "wait_t_avg[usec]" << std::endl;
439+
<< "wait_t_avg[usec],"
440+
<< "algbw[GB/s],"
441+
<< "busbw[GB/s]" << std::endl;
439442
csvf.close();
440443
}
441444

man/doxconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
PROJECT_NAME = "Intel® oneAPI Collective Communications Library"
2-
PROJECT_NUMBER = "2021.15.6"
2+
PROJECT_NUMBER = "2021.15.7"
33

44
INPUT = ../src/common/env/vars.hpp ../src/common/env/vars_experimental.hpp
55

src/atl/ofi/atl_ofi.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,46 @@ atl_status_t atl_ofi::init(int* argc,
101101
base_hints->domain_attr->data_progress = FI_PROGRESS_MANUAL;
102102
base_hints->caps = FI_TAGGED;
103103

104+
/*
105+
* Domain selection based on local rank index
106+
* If CCL_OFI_DOMAIN_NAMES is set, parse the comma-separated list and assign
107+
* the domain corresponding to local rank index for better fabric resource utilization
108+
*/
109+
if (ccl::global_data::env().ofi_domain_names != CCL_ENV_STR_NOT_SPECIFIED) {
110+
std::string domain_names_str = ccl::global_data::env().ofi_domain_names;
111+
std::vector<std::string> domain_list;
112+
std::stringstream ss(domain_names_str);
113+
std::string domain_name;
114+
115+
// Parse comma-separated domain names
116+
while (std::getline(ss, domain_name, ',')) {
117+
// Trim whitespace
118+
size_t start = domain_name.find_first_not_of(" \t");
119+
size_t end = domain_name.find_last_not_of(" \t");
120+
if (start != std::string::npos && end != std::string::npos) {
121+
domain_list.push_back(domain_name.substr(start, end - start + 1));
122+
}
123+
}
124+
125+
// Select domain based on local rank index
126+
int local_idx = coord.local_idx;
127+
if (!domain_list.empty() && local_idx < static_cast<int>(domain_list.size())) {
128+
base_hints->domain_attr->name = strdup(domain_list[local_idx].c_str());
129+
LOG_INFO("Selected OFI domain: ",
130+
base_hints->domain_attr->name,
131+
" for local rank: ",
132+
local_idx,
133+
", global rank: ",
134+
pmi->get_rank());
135+
}
136+
else {
137+
LOG_WARN("Cannot select domain for local rank: ",
138+
local_idx,
139+
", available domains: ",
140+
domain_list.size());
141+
}
142+
}
143+
104144
prov_env = getenv("FI_PROVIDER");
105145

106146
ctx.enable_hmem = 0;

src/coll/algorithms/allgatherv/sycl/allgatherv_pcie.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,32 @@ ccl::event allgatherv_ll_ring(const void *send_buf,
4343
size_t send_size = send_count * ccl_dtype.size();
4444

4545
bool p2p = node_comm->get_topo_manager().has_p2p_access();
46-
uint32_t pattern = comm->get_rt_pattern(pattern_type::collective, -1);
46+
uint32_t pattern = node_comm->get_rt_pattern(pattern_type::collective, -1);
4747

4848
auto lambda = [&]<typename T, template <typename, int> class Proto>(int NRanks) {
4949
const size_t *offs = offsets.empty() ? NULL : offsets.data();
5050

5151
T *peerbuf0[NRanks];
5252
T *peerbuf1[NRanks];
53-
for (int i = 0; i < NRanks; i++) {
54-
peerbuf0[i] = (T *)get_remote_node_tmp_buf(0, comm)[i];
55-
peerbuf1[i] = (T *)get_remote_node_tmp_buf(1, comm)[i];
53+
T *ipcbuf0;
54+
T *ipcbuf1;
55+
if (ccl::global_data::env().sycl_ll_buffer_global) {
56+
for (int i = 0; i < NRanks; i++) {
57+
peerbuf0[i] = (T *)get_remote_node_tmp_buf(0, comm)[i];
58+
peerbuf1[i] = (T *)get_remote_node_tmp_buf(1, comm)[i];
59+
}
60+
ipcbuf0 = (T *)get_tmp_buf(0, comm);
61+
ipcbuf1 = (T *)get_tmp_buf(1, comm);
62+
}
63+
else {
64+
auto [local_tmp_buf, remote_ptrs] = node_comm->get_all_tmp_bufs(true);
65+
for (int i = 0; i < NRanks; i++) {
66+
peerbuf0[i] = (T *)remote_ptrs[i];
67+
peerbuf1[i] = (T *)((char *)remote_ptrs[i] + ccl_tmp_bufs::buf_size / 2);
68+
}
69+
ipcbuf0 = (T *)local_tmp_buf;
70+
ipcbuf1 = (T *)((char *)local_tmp_buf + ccl_tmp_bufs::buf_size / 2);
5671
}
57-
T *ipcbuf0 = (T *)get_tmp_buf(0, comm);
58-
T *ipcbuf1 = (T *)get_tmp_buf(1, comm);
5972
sycl::event e = AllGather<T, Proto, RingTransmit>::launch(NRanks,
6073
(T *)send_buf,
6174
(T *)recv_buf,
@@ -68,13 +81,17 @@ ccl::event allgatherv_ll_ring(const void *send_buf,
6881
comm_rank,
6982
pattern,
7083
q,
84+
node_comm,
7185
p2p,
7286
done);
73-
// update pattern
74-
comm->update_rt_pattern(pattern_type::collective, -1, pattern);
7587
return e;
7688
};
7789

90+
if (ccl::global_data::env().sycl_ll_buffer_global) {
91+
const bool is_cpu_barrier = ccl::global_data::env().sycl_ccl_barrier;
92+
sycl::event barrier_event = invoke_barrier(node_comm, q, {}, is_cpu_barrier);
93+
}
94+
7895
if (send_size <= ccl::global_data::env().sycl_allgatherv_ll_threshold) {
7996
// small ring with LL
8097
sycl_e = invoke_pcie_type<Rt64_PCIE>(lambda, comm_size, dtype);

src/coll/algorithms/allgatherv/sycl/allgatherv_pcie.hpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ struct AllGather : public Transmit<T, Proto, SubGroupSize> {
5858
p2p),
5959
workSize(calcWorkSize(input, output, nelems * sizeof(T))) {}
6060

61-
sycl::nd_range<1> getLaunchParam(uint32_t& updateSeqNo) const {
61+
sycl::nd_range<1> getLaunchParam(sycl::queue q,
62+
const std::shared_ptr<ccl_comm> comm,
63+
T* ipcbuf0,
64+
T* ipcbuf1,
65+
uint32_t& updateSeqNo) const {
6266
constexpr uint32_t nThreads = 64; /* TODO: get EU/thread config */
6367
#if defined(CCL_SYCL_ENABLE_PVC)
6468
constexpr size_t maxSS = 64;
@@ -73,7 +77,18 @@ struct AllGather : public Transmit<T, Proto, SubGroupSize> {
7377
size_t nSS = divUp(nWire, wirePerSS);
7478
auto actualSS = std::min(nSS, maxSS);
7579
auto nSteps = divUp(nWire, actualSS * wirePerSS);
76-
updateSeqNo += nSteps;
80+
auto nSlot = Transmit<T, Proto, SubGroupSize>::nSlot;
81+
nSteps = (nSteps + nSlot - 1) / nSlot;
82+
auto newSeqNo = comm->increase_rt_pattern(pattern_type::collective, -1, updateSeqNo, nSteps);
83+
// check for pattern wraparound
84+
rt_check_pattern<T>(q,
85+
comm,
86+
updateSeqNo,
87+
newSeqNo,
88+
ipcbuf0,
89+
ipcbuf1,
90+
RingTransmit<int, Rt64_128_PCIE>::ringSize / sizeof(T));
91+
updateSeqNo = newSeqNo;
7792
//
7893
// XXX: we over updated sequence number. Should be nSteps / nSlot
7994
// No harm, but not nice.
@@ -94,6 +109,7 @@ struct AllGather : public Transmit<T, Proto, SubGroupSize> {
94109
int rank,
95110
uint32_t& step,
96111
sycl::queue queue,
112+
const std::shared_ptr<ccl_comm> comm,
97113
bool p2p,
98114
bool& done) {
99115
sycl::event e;
@@ -105,8 +121,9 @@ struct AllGather : public Transmit<T, Proto, SubGroupSize> {
105121
}
106122
done = true;
107123

124+
const sycl::nd_range<1> ndrange = offload.getLaunchParam(queue, comm, ipcbuf0, ipcbuf1, step);
108125
e = queue.submit([&](sycl::handler& cgh) {
109-
cgh.parallel_for(offload.getLaunchParam(step), offload);
126+
cgh.parallel_for(ndrange, offload);
110127
});
111128
return e;
112129
}

src/coll/algorithms/allreduce/sycl/allreduce_pcie.cpp

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,32 @@ ccl::event allreduce_ll_ring(const void *src,
4242
q.memcpy(dst, src, dt_sz * count);
4343

4444
bool p2p = node_comm->get_topo_manager().has_p2p_access();
45-
uint32_t pattern = comm->get_rt_pattern(pattern_type::collective, -1);
45+
uint32_t pattern = node_comm->get_rt_pattern(pattern_type::collective, -1);
4646

4747
auto lambda = [&]<typename T, template <typename, int> class Proto>(int NRanks) {
4848
T *peerbuf0[NRanks];
4949
T *peerbuf1[NRanks];
50-
for (int i = 0; i < NRanks; i++) {
51-
peerbuf0[i] = (T *)get_remote_node_tmp_buf(0, comm)[i];
52-
peerbuf1[i] = (T *)get_remote_node_tmp_buf(1, comm)[i];
50+
T *ipcbuf0;
51+
T *ipcbuf1;
52+
if (ccl::global_data::env().sycl_ll_buffer_global) {
53+
// large buffer
54+
for (int i = 0; i < NRanks; i++) {
55+
peerbuf0[i] = (T *)get_remote_node_tmp_buf(0, comm)[i];
56+
peerbuf1[i] = (T *)get_remote_node_tmp_buf(1, comm)[i];
57+
}
58+
ipcbuf0 = (T *)get_tmp_buf(0, comm);
59+
ipcbuf1 = (T *)get_tmp_buf(1, comm);
60+
}
61+
else {
62+
// small buffer
63+
auto [local_tmp_buf, remote_ptrs] = node_comm->get_all_tmp_bufs(true);
64+
for (int i = 0; i < NRanks; i++) {
65+
peerbuf0[i] = (T *)remote_ptrs[i];
66+
peerbuf1[i] = (T *)((char *)remote_ptrs[i] + ccl_tmp_bufs::buf_size / 2);
67+
}
68+
ipcbuf0 = (T *)local_tmp_buf;
69+
ipcbuf1 = (T *)((char *)local_tmp_buf + ccl_tmp_bufs::buf_size / 2);
5370
}
54-
T *ipcbuf0 = (T *)get_tmp_buf(0, comm);
55-
T *ipcbuf1 = (T *)get_tmp_buf(1, comm);
5671
sycl::event e = AllReduce<T, Proto, RingTransmit>::launch(NRanks,
5772
(T *)dst,
5873
ipcbuf0,
@@ -63,12 +78,17 @@ ccl::event allreduce_ll_ring(const void *src,
6378
comm_rank,
6479
pattern,
6580
q,
81+
node_comm,
6682
p2p,
6783
done);
68-
comm->update_rt_pattern(pattern_type::collective, -1, pattern);
6984
return e;
7085
};
7186

87+
if (ccl::global_data::env().sycl_ll_buffer_global) {
88+
const bool is_cpu_barrier = ccl::global_data::env().sycl_ccl_barrier;
89+
sycl::event barrier_event = invoke_barrier(node_comm, q, {}, is_cpu_barrier);
90+
}
91+
7292
if (count * dt_sz <= ccl::global_data::env().sycl_allreduce_ll_threshold) {
7393
// small ring with LL
7494
sycl_e = invoke_pcie_type<Rt64_PCIE>(lambda, comm_size, dtype);

src/coll/algorithms/allreduce/sycl/allreduce_pcie.hpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ struct AllReduce : public Transmit<T, Proto, SubGroupSize> {
5656
static int scatterVerify(uint32_t* host, int rank, uint32_t flag, size_t nWorkElemsInInt);
5757
static int stage2Verify(T* host, int rank, uint32_t flag, size_t nWorkElemsInInt);
5858

59-
sycl::nd_range<1> getLaunchParam(uint32_t& updateSeqNo) const {
59+
sycl::nd_range<1> getLaunchParam(sycl::queue q,
60+
const std::shared_ptr<ccl_comm> comm,
61+
T* ipcbuf0,
62+
T* ipcbuf1,
63+
uint32_t& updateSeqNo) const {
6064
constexpr uint32_t nThreads = 64; /* TODO: get EU/thread config */
6165
// TODO: can be queried
6266
#if defined(CCL_SYCL_ENABLE_PVC)
@@ -72,7 +76,19 @@ struct AllReduce : public Transmit<T, Proto, SubGroupSize> {
7276
size_t nSS = divUp(nWire, wirePerSS);
7377
auto actualSS = std::min(nSS, maxSS);
7478
auto nSteps = divUp(nWire, actualSS * wirePerSS);
75-
updateSeqNo += nSteps;
79+
auto nSlot = Transmit<T, Proto, SubGroupSize>::nSlot;
80+
nSteps = (nSteps + nSlot - 1) / nSlot;
81+
auto newSeqNo =
82+
comm->increase_rt_pattern(pattern_type::collective, -1, updateSeqNo, nSteps);
83+
// check for pattern wraparound
84+
rt_check_pattern<T>(q,
85+
comm,
86+
updateSeqNo,
87+
newSeqNo,
88+
ipcbuf0,
89+
ipcbuf1,
90+
RingTransmit<int, Rt64_128_PCIE>::ringSize / sizeof(T));
91+
updateSeqNo = newSeqNo;
7692
//
7793
// XXX: we over updated sequence number. Should be nSteps / nSlot
7894
// No harm, but not nice.
@@ -91,6 +107,7 @@ struct AllReduce : public Transmit<T, Proto, SubGroupSize> {
91107
int rank,
92108
uint32_t& step,
93109
sycl::queue queue,
110+
const std::shared_ptr<ccl_comm> comm,
94111
bool p2p,
95112
bool& done) {
96113
sycl::event e;
@@ -102,8 +119,10 @@ struct AllReduce : public Transmit<T, Proto, SubGroupSize> {
102119
}
103120
done = true;
104121

122+
const sycl::nd_range<1> ndrange =
123+
offload.getLaunchParam(queue, comm, ipcbuf0, ipcbuf1, step);
105124
e = queue.submit([&](sycl::handler& cgh) {
106-
cgh.parallel_for(offload.getLaunchParam(step), offload);
125+
cgh.parallel_for(ndrange, offload);
107126
});
108127
return e;
109128
}

0 commit comments

Comments
 (0)