From 97eb7e0bf17b476d516262e0af462ec7eeb8f505 Mon Sep 17 00:00:00 2001 From: Even Rouault Date: Wed, 29 Apr 2020 11:50:17 +0200 Subject: [PATCH] Add multithreading support in the T1 (entropy phase) encoder - API wise, opj_codec_set_threads() can be used on the encoding side - opj_compress has a -threads switch similar to opj_uncompress --- src/bin/jp2/opj_compress.c | 35 ++++- src/lib/openjp2/openjpeg.c | 6 + src/lib/openjp2/openjpeg.h | 11 +- src/lib/openjp2/t1.c | 296 ++++++++++++++++++++++++------------- src/lib/openjp2/t1.h | 4 +- src/lib/openjp2/tcd.c | 16 +- 6 files changed, 239 insertions(+), 129 deletions(-) diff --git a/src/bin/jp2/opj_compress.c b/src/bin/jp2/opj_compress.c index cbc30fba..68274840 100644 --- a/src/bin/jp2/opj_compress.c +++ b/src/bin/jp2/opj_compress.c @@ -301,6 +301,10 @@ static void encode_help_display(void) fprintf(stdout, " Currently supports only RPCL order.\n"); fprintf(stdout, "-C \n"); fprintf(stdout, " Add in the comment marker segment.\n"); + if (opj_has_thread_support()) { + fprintf(stdout, " -threads \n" + " Number of threads to use for encoding or ALL_CPUS for all available cores.\n"); + } /* UniPG>> */ #ifdef USE_JPWL fprintf(stdout, "-W \n"); @@ -579,7 +583,8 @@ static int parse_cmdline_encoder(int argc, char **argv, img_fol_t *img_fol, raw_cparameters_t *raw_cp, char *indexfilename, size_t indexfilename_size, int* pOutFramerate, - OPJ_BOOL* pOutPLT) + OPJ_BOOL* pOutPLT, + int* pOutNumThreads) { OPJ_UINT32 i, j; int totlen, c; @@ -596,7 +601,8 @@ static int parse_cmdline_encoder(int argc, char **argv, {"jpip", NO_ARG, NULL, 'J'}, {"mct", REQ_ARG, NULL, 'Y'}, {"IMF", REQ_ARG, NULL, 'Z'}, - {"PLT", NO_ARG, NULL, 'A'} + {"PLT", NO_ARG, NULL, 'A'}, + {"threads", REQ_ARG, NULL, 'B'} }; /* parse the command line */ @@ -1679,6 +1685,19 @@ static int parse_cmdline_encoder(int argc, char **argv, } break; + /* ----------------------------------------------------- */ + case 'B': { /* Number of threads */ + if (strcmp(opj_optarg, "ALL_CPUS") == 0) { + *pOutNumThreads = opj_get_num_cpus(); + if (*pOutNumThreads == 1) { + *pOutNumThreads = 0; + } + } else { + sscanf(opj_optarg, "%d", pOutNumThreads); + } + } + break; + /* ------------------------------------------------------ */ @@ -1860,6 +1879,7 @@ int main(int argc, char **argv) OPJ_FLOAT64 t = opj_clock(); OPJ_BOOL PLT = OPJ_FALSE; + int num_threads = 0; /* set encoding parameters to default values */ opj_set_default_encoder_parameters(¶meters); @@ -1880,7 +1900,7 @@ int main(int argc, char **argv) parameters.tcp_mct = (char) 255; /* This will be set later according to the input image or the provided option */ if (parse_cmdline_encoder(argc, argv, ¶meters, &img_fol, &raw_cp, - indexfilename, sizeof(indexfilename), &framerate, &PLT) == 1) { + indexfilename, sizeof(indexfilename), &framerate, &PLT, &num_threads) == 1) { ret = 1; goto fin; } @@ -2141,6 +2161,15 @@ int main(int argc, char **argv) } } + if (num_threads >= 1 && + !opj_codec_set_threads(l_codec, num_threads)) { + fprintf(stderr, "failed to set number of threads\n"); + opj_destroy_codec(l_codec); + opj_image_destroy(image); + ret = 1; + goto fin; + } + /* open a byte stream for writing and allocate memory for all tiles */ l_stream = opj_stream_create_default_file_stream(parameters.outfile, OPJ_FALSE); if (! l_stream) { diff --git a/src/lib/openjp2/openjpeg.c b/src/lib/openjp2/openjpeg.c index 1e2d60a6..9c9b6eb0 100644 --- a/src/lib/openjp2/openjpeg.c +++ b/src/lib/openjp2/openjpeg.c @@ -657,6 +657,9 @@ opj_codec_t* OPJ_CALLCONV opj_create_compress(OPJ_CODEC_FORMAT p_format) const char* const*, struct opj_event_mgr *)) opj_j2k_encoder_set_extra_options; + l_codec->opj_set_threads = + (OPJ_BOOL(*)(void * p_codec, OPJ_UINT32 num_threads)) opj_j2k_set_threads; + l_codec->m_codec = opj_j2k_create_compress(); if (! l_codec->m_codec) { opj_free(l_codec); @@ -700,6 +703,9 @@ opj_codec_t* OPJ_CALLCONV opj_create_compress(OPJ_CODEC_FORMAT p_format) const char* const*, struct opj_event_mgr *)) opj_jp2_encoder_set_extra_options; + l_codec->opj_set_threads = + (OPJ_BOOL(*)(void * p_codec, OPJ_UINT32 num_threads)) opj_jp2_set_threads; + l_codec->m_codec = opj_jp2_create(OPJ_FALSE); if (! l_codec->m_codec) { opj_free(l_codec); diff --git a/src/lib/openjp2/openjpeg.h b/src/lib/openjp2/openjpeg.h index da84f399..4bbd9a8b 100644 --- a/src/lib/openjp2/openjpeg.h +++ b/src/lib/openjp2/openjpeg.h @@ -1348,15 +1348,14 @@ OPJ_API OPJ_BOOL OPJ_CALLCONV opj_setup_decoder(opj_codec_t *p_codec, * number, or "ALL_CPUS". If OPJ_NUM_THREADS is set and this function is called, * this function will override the behaviour of the environment variable. * - * Currently this function must be called after opj_setup_decoder() and - * before opj_read_header(). + * This function must be called after opj_setup_decoder() and + * before opj_read_header() for the decoding side, or after opj_setup_encoder() + * and before opj_start_compress() for the encoding side. * - * Note: currently only has effect on the decompressor. - * - * @param p_codec decompressor handler + * @param p_codec decompressor or compressor handler * @param num_threads number of threads. * - * @return OPJ_TRUE if the decoder is correctly set + * @return OPJ_TRUE if the function is successful. */ OPJ_API OPJ_BOOL OPJ_CALLCONV opj_codec_set_threads(opj_codec_t *p_codec, int num_threads); diff --git a/src/lib/openjp2/t1.c b/src/lib/openjp2/t1.c index f6f76711..1b9556ea 100644 --- a/src/lib/openjp2/t1.c +++ b/src/lib/openjp2/t1.c @@ -177,18 +177,18 @@ static OPJ_FLOAT64 opj_t1_getwmsedec( const OPJ_FLOAT64 * mct_norms, OPJ_UINT32 mct_numcomps); -static void opj_t1_encode_cblk(opj_t1_t *t1, - opj_tcd_cblk_enc_t* cblk, - OPJ_UINT32 orient, - OPJ_UINT32 compno, - OPJ_UINT32 level, - OPJ_UINT32 qmfbid, - OPJ_FLOAT64 stepsize, - OPJ_UINT32 cblksty, - OPJ_UINT32 numcomps, - opj_tcd_tile_t * tile, - const OPJ_FLOAT64 * mct_norms, - OPJ_UINT32 mct_numcomps); +/** Return "cumwmsedec" that should be used to increase tile->distotile */ +static double opj_t1_encode_cblk(opj_t1_t *t1, + opj_tcd_cblk_enc_t* cblk, + OPJ_UINT32 orient, + OPJ_UINT32 compno, + OPJ_UINT32 level, + OPJ_UINT32 qmfbid, + OPJ_FLOAT64 stepsize, + OPJ_UINT32 cblksty, + OPJ_UINT32 numcomps, + const OPJ_FLOAT64 * mct_norms, + OPJ_UINT32 mct_numcomps); /** Decode 1 code-block @@ -2100,124 +2100,210 @@ static OPJ_BOOL opj_t1_decode_cblk(opj_t1_t *t1, } +typedef struct { + OPJ_UINT32 compno; + OPJ_UINT32 resno; + opj_tcd_cblk_enc_t* cblk; + opj_tcd_tile_t *tile; + opj_tcd_band_t* band; + opj_tcd_tilecomp_t* tilec; + opj_tccp_t* tccp; + const OPJ_FLOAT64 * mct_norms; + OPJ_UINT32 mct_numcomps; + volatile OPJ_BOOL* pret; + opj_mutex_t* mutex; +} opj_t1_cblk_encode_processing_job_t; + +/** Procedure to deal with a asynchronous code-block encoding job. + * + * @param user_data Pointer to a opj_t1_cblk_encode_processing_job_t* structure + * @param tls TLS handle. + */ +static void opj_t1_clbl_encode_processor(void* user_data, opj_tls_t* tls) +{ + opj_t1_cblk_encode_processing_job_t* job = + (opj_t1_cblk_encode_processing_job_t*)user_data; + opj_tcd_cblk_enc_t* cblk = job->cblk; + const opj_tcd_band_t* band = job->band; + const opj_tcd_tilecomp_t* tilec = job->tilec; + const opj_tccp_t* tccp = job->tccp; + const OPJ_UINT32 resno = job->resno; + opj_t1_t* t1; + const OPJ_UINT32 tile_w = (OPJ_UINT32)(tilec->x1 - tilec->x0); + + OPJ_INT32* OPJ_RESTRICT tiledp; + OPJ_UINT32 cblk_w; + OPJ_UINT32 cblk_h; + OPJ_UINT32 i, j, tileLineAdvance; + OPJ_SIZE_T tileIndex = 0; + + OPJ_INT32 x = cblk->x0 - band->x0; + OPJ_INT32 y = cblk->y0 - band->y0; + + if (!*(job->pret)) { + opj_free(job); + return; + } + + t1 = (opj_t1_t*) opj_tls_get(tls, OPJ_TLS_KEY_T1); + if (t1 == NULL) { + t1 = opj_t1_create(OPJ_TRUE); /* OPJ_TRUE == T1 for encoding */ + opj_tls_set(tls, OPJ_TLS_KEY_T1, t1, opj_t1_destroy_wrapper); + } + + if (band->bandno & 1) { + opj_tcd_resolution_t *pres = &tilec->resolutions[resno - 1]; + x += pres->x1 - pres->x0; + } + if (band->bandno & 2) { + opj_tcd_resolution_t *pres = &tilec->resolutions[resno - 1]; + y += pres->y1 - pres->y0; + } + + if (!opj_t1_allocate_buffers( + t1, + (OPJ_UINT32)(cblk->x1 - cblk->x0), + (OPJ_UINT32)(cblk->y1 - cblk->y0))) { + *(job->pret) = OPJ_FALSE; + opj_free(job); + return; + } + + cblk_w = t1->w; + cblk_h = t1->h; + tileLineAdvance = tile_w - cblk_w; + + tiledp = &tilec->data[(OPJ_SIZE_T)y * tile_w + (OPJ_SIZE_T)x]; + t1->data = tiledp; + t1->data_stride = tile_w; + if (tccp->qmfbid == 1) { + /* Do multiplication on unsigned type, even if the + * underlying type is signed, to avoid potential + * int overflow on large value (the output will be + * incorrect in such situation, but whatever...) + * This assumes complement-to-2 signed integer + * representation + * Fixes https://github.com/uclouvain/openjpeg/issues/1053 + */ + OPJ_UINT32* OPJ_RESTRICT tiledp_u = (OPJ_UINT32*) tiledp; + for (j = 0; j < cblk_h; ++j) { + for (i = 0; i < cblk_w; ++i) { + tiledp_u[tileIndex] <<= T1_NMSEDEC_FRACBITS; + tileIndex++; + } + tileIndex += tileLineAdvance; + } + } else { /* if (tccp->qmfbid == 0) */ + const OPJ_INT32 bandconst = 8192 * 8192 / ((OPJ_INT32) floor( + band->stepsize * 8192)); + + for (j = 0; j < cblk_h; ++j) { + for (i = 0; i < cblk_w; ++i) { + OPJ_INT32 tmp = tiledp[tileIndex]; + tiledp[tileIndex] = + opj_int_fix_mul_t1( + tmp, + bandconst); + tileIndex++; + } + tileIndex += tileLineAdvance; + } + } + + { + OPJ_FLOAT64 cumwmsedec = + opj_t1_encode_cblk( + t1, + cblk, + band->bandno, + job->compno, + tilec->numresolutions - 1 - resno, + tccp->qmfbid, + band->stepsize, + tccp->cblksty, + job->tile->numcomps, + job->mct_norms, + job->mct_numcomps); + if (job->mutex) { + opj_mutex_lock(job->mutex); + } + job->tile->distotile += cumwmsedec; + if (job->mutex) { + opj_mutex_unlock(job->mutex); + } + } + + opj_free(job); +} -OPJ_BOOL opj_t1_encode_cblks(opj_t1_t *t1, +OPJ_BOOL opj_t1_encode_cblks(opj_tcd_t* tcd, opj_tcd_tile_t *tile, opj_tcp_t *tcp, const OPJ_FLOAT64 * mct_norms, OPJ_UINT32 mct_numcomps ) { + volatile OPJ_BOOL ret = OPJ_TRUE; + opj_thread_pool_t* tp = tcd->thread_pool; OPJ_UINT32 compno, resno, bandno, precno, cblkno; + opj_mutex_t* mutex = opj_mutex_create(); tile->distotile = 0; /* fixed_quality */ for (compno = 0; compno < tile->numcomps; ++compno) { opj_tcd_tilecomp_t* tilec = &tile->comps[compno]; opj_tccp_t* tccp = &tcp->tccps[compno]; - OPJ_UINT32 tile_w = (OPJ_UINT32)(tilec->x1 - tilec->x0); for (resno = 0; resno < tilec->numresolutions; ++resno) { opj_tcd_resolution_t *res = &tilec->resolutions[resno]; for (bandno = 0; bandno < res->numbands; ++bandno) { opj_tcd_band_t* OPJ_RESTRICT band = &res->bands[bandno]; - OPJ_INT32 bandconst; /* Skip empty bands */ if (opj_tcd_is_band_empty(band)) { continue; } - - bandconst = 8192 * 8192 / ((OPJ_INT32) floor(band->stepsize * 8192)); for (precno = 0; precno < res->pw * res->ph; ++precno) { opj_tcd_precinct_t *prc = &band->precincts[precno]; for (cblkno = 0; cblkno < prc->cw * prc->ch; ++cblkno) { opj_tcd_cblk_enc_t* cblk = &prc->cblks.enc[cblkno]; - OPJ_INT32* OPJ_RESTRICT tiledp; - OPJ_UINT32 cblk_w; - OPJ_UINT32 cblk_h; - OPJ_UINT32 i, j, tileLineAdvance; - OPJ_SIZE_T tileIndex = 0; - OPJ_INT32 x = cblk->x0 - band->x0; - OPJ_INT32 y = cblk->y0 - band->y0; - if (band->bandno & 1) { - opj_tcd_resolution_t *pres = &tilec->resolutions[resno - 1]; - x += pres->x1 - pres->x0; + opj_t1_cblk_encode_processing_job_t* job = + (opj_t1_cblk_encode_processing_job_t*) opj_calloc(1, + sizeof(opj_t1_cblk_encode_processing_job_t)); + if (!job) { + ret = OPJ_FALSE; + goto end; } - if (band->bandno & 2) { - opj_tcd_resolution_t *pres = &tilec->resolutions[resno - 1]; - y += pres->y1 - pres->y0; - } - - if (!opj_t1_allocate_buffers( - t1, - (OPJ_UINT32)(cblk->x1 - cblk->x0), - (OPJ_UINT32)(cblk->y1 - cblk->y0))) { - return OPJ_FALSE; - } - - cblk_w = t1->w; - cblk_h = t1->h; - tileLineAdvance = tile_w - cblk_w; - - tiledp = &tilec->data[(OPJ_SIZE_T)y * tile_w + (OPJ_SIZE_T)x]; - t1->data = tiledp; - t1->data_stride = tile_w; - if (tccp->qmfbid == 1) { - /* Do multiplication on unsigned type, even if the - * underlying type is signed, to avoid potential - * int overflow on large value (the output will be - * incorrect in such situation, but whatever...) - * This assumes complement-to-2 signed integer - * representation - * Fixes https://github.com/uclouvain/openjpeg/issues/1053 - */ - OPJ_UINT32* OPJ_RESTRICT tiledp_u = (OPJ_UINT32*) tiledp; - for (j = 0; j < cblk_h; ++j) { - for (i = 0; i < cblk_w; ++i) { - tiledp_u[tileIndex] <<= T1_NMSEDEC_FRACBITS; - tileIndex++; - } - tileIndex += tileLineAdvance; - } - } else { /* if (tccp->qmfbid == 0) */ - for (j = 0; j < cblk_h; ++j) { - for (i = 0; i < cblk_w; ++i) { - OPJ_INT32 tmp = tiledp[tileIndex]; - tiledp[tileIndex] = - opj_int_fix_mul_t1( - tmp, - bandconst); - tileIndex++; - } - tileIndex += tileLineAdvance; - } - } - - opj_t1_encode_cblk( - t1, - cblk, - band->bandno, - compno, - tilec->numresolutions - 1 - resno, - tccp->qmfbid, - band->stepsize, - tccp->cblksty, - tile->numcomps, - tile, - mct_norms, - mct_numcomps); + job->compno = compno; + job->tile = tile; + job->resno = resno; + job->cblk = cblk; + job->band = band; + job->tilec = tilec; + job->tccp = tccp; + job->mct_norms = mct_norms; + job->mct_numcomps = mct_numcomps; + job->pret = &ret; + job->mutex = mutex; + opj_thread_pool_submit_job(tp, opj_t1_clbl_encode_processor, job); } /* cblkno */ } /* precno */ } /* bandno */ } /* resno */ } /* compno */ - return OPJ_TRUE; + +end: + opj_thread_pool_wait_completion(tcd->thread_pool, 0); + if (mutex) { + opj_mutex_destroy(mutex); + } + + return ret; } /* Returns whether the pass (bpno, passtype) is terminated */ @@ -2252,18 +2338,17 @@ static int opj_t1_enc_is_term_pass(opj_tcd_cblk_enc_t* cblk, /** mod fixed_quality */ -static void opj_t1_encode_cblk(opj_t1_t *t1, - opj_tcd_cblk_enc_t* cblk, - OPJ_UINT32 orient, - OPJ_UINT32 compno, - OPJ_UINT32 level, - OPJ_UINT32 qmfbid, - OPJ_FLOAT64 stepsize, - OPJ_UINT32 cblksty, - OPJ_UINT32 numcomps, - opj_tcd_tile_t * tile, - const OPJ_FLOAT64 * mct_norms, - OPJ_UINT32 mct_numcomps) +static OPJ_FLOAT64 opj_t1_encode_cblk(opj_t1_t *t1, + opj_tcd_cblk_enc_t* cblk, + OPJ_UINT32 orient, + OPJ_UINT32 compno, + OPJ_UINT32 level, + OPJ_UINT32 qmfbid, + OPJ_FLOAT64 stepsize, + OPJ_UINT32 cblksty, + OPJ_UINT32 numcomps, + const OPJ_FLOAT64 * mct_norms, + OPJ_UINT32 mct_numcomps) { OPJ_FLOAT64 cumwmsedec = 0.0; @@ -2297,7 +2382,7 @@ static void opj_t1_encode_cblk(opj_t1_t *t1, T1_NMSEDEC_FRACBITS) : 0; if (cblk->numbps == 0) { cblk->totalpasses = 0; - return; + return cumwmsedec; } bpno = (OPJ_INT32)(cblk->numbps - 1); @@ -2343,7 +2428,6 @@ static void opj_t1_encode_cblk(opj_t1_t *t1, tempwmsedec = opj_t1_getwmsedec(nmsedec, compno, level, orient, bpno, qmfbid, stepsize, numcomps, mct_norms, mct_numcomps) ; cumwmsedec += tempwmsedec; - tile->distotile += tempwmsedec; pass->distortiondec = cumwmsedec; if (opj_t1_enc_is_term_pass(cblk, cblksty, bpno, passtype)) { @@ -2425,4 +2509,6 @@ static void opj_t1_encode_cblk(opj_t1_t *t1, } } #endif + + return cumwmsedec; } diff --git a/src/lib/openjp2/t1.h b/src/lib/openjp2/t1.h index 171dfb0a..bc8a8111 100644 --- a/src/lib/openjp2/t1.h +++ b/src/lib/openjp2/t1.h @@ -216,13 +216,13 @@ typedef struct opj_t1 { /** Encode the code-blocks of a tile -@param t1 T1 handle +@param tcd TCD handle @param tile The tile to encode @param tcp Tile coding parameters @param mct_norms FIXME DOC @param mct_numcomps Number of components used for MCT */ -OPJ_BOOL opj_t1_encode_cblks(opj_t1_t *t1, +OPJ_BOOL opj_t1_encode_cblks(opj_tcd_t* tcd, opj_tcd_tile_t *tile, opj_tcp_t *tcp, const OPJ_FLOAT64 * mct_norms, diff --git a/src/lib/openjp2/tcd.c b/src/lib/openjp2/tcd.c index 3a1c3026..108462ca 100644 --- a/src/lib/openjp2/tcd.c +++ b/src/lib/openjp2/tcd.c @@ -2506,16 +2506,10 @@ static OPJ_BOOL opj_tcd_dwt_encode(opj_tcd_t *p_tcd) static OPJ_BOOL opj_tcd_t1_encode(opj_tcd_t *p_tcd) { - opj_t1_t * l_t1; const OPJ_FLOAT64 * l_mct_norms; OPJ_UINT32 l_mct_numcomps = 0U; opj_tcp_t * l_tcp = p_tcd->tcp; - l_t1 = opj_t1_create(OPJ_TRUE); - if (l_t1 == 00) { - return OPJ_FALSE; - } - if (l_tcp->mct == 1) { l_mct_numcomps = 3U; /* irreversible encoding */ @@ -2529,13 +2523,9 @@ static OPJ_BOOL opj_tcd_t1_encode(opj_tcd_t *p_tcd) l_mct_norms = (const OPJ_FLOAT64 *)(l_tcp->mct_norms); } - if (! opj_t1_encode_cblks(l_t1, p_tcd->tcd_image->tiles, l_tcp, l_mct_norms, - l_mct_numcomps)) { - opj_t1_destroy(l_t1); - return OPJ_FALSE; - } - - opj_t1_destroy(l_t1); + return opj_t1_encode_cblks(p_tcd, + p_tcd->tcd_image->tiles, l_tcp, l_mct_norms, + l_mct_numcomps); return OPJ_TRUE; }