Add multithreading support in the T1 (entropy phase) encoder

- API wise, opj_codec_set_threads() can be used on the encoding side
- opj_compress has a -threads switch similar to opj_uncompress
This commit is contained in:
Even Rouault 2020-04-29 11:50:17 +02:00
parent 1d358f25c8
commit 97eb7e0bf1
No known key found for this signature in database
GPG Key ID: 33EBBFC47B3DD87D
6 changed files with 239 additions and 129 deletions

View File

@ -301,6 +301,10 @@ static void encode_help_display(void)
fprintf(stdout, " Currently supports only RPCL order.\n"); fprintf(stdout, " Currently supports only RPCL order.\n");
fprintf(stdout, "-C <comment>\n"); fprintf(stdout, "-C <comment>\n");
fprintf(stdout, " Add <comment> in the comment marker segment.\n"); fprintf(stdout, " Add <comment> in the comment marker segment.\n");
if (opj_has_thread_support()) {
fprintf(stdout, " -threads <num_threads|ALL_CPUS>\n"
" Number of threads to use for encoding or ALL_CPUS for all available cores.\n");
}
/* UniPG>> */ /* UniPG>> */
#ifdef USE_JPWL #ifdef USE_JPWL
fprintf(stdout, "-W <params>\n"); fprintf(stdout, "-W <params>\n");
@ -579,7 +583,8 @@ static int parse_cmdline_encoder(int argc, char **argv,
img_fol_t *img_fol, raw_cparameters_t *raw_cp, char *indexfilename, img_fol_t *img_fol, raw_cparameters_t *raw_cp, char *indexfilename,
size_t indexfilename_size, size_t indexfilename_size,
int* pOutFramerate, int* pOutFramerate,
OPJ_BOOL* pOutPLT) OPJ_BOOL* pOutPLT,
int* pOutNumThreads)
{ {
OPJ_UINT32 i, j; OPJ_UINT32 i, j;
int totlen, c; int totlen, c;
@ -596,7 +601,8 @@ static int parse_cmdline_encoder(int argc, char **argv,
{"jpip", NO_ARG, NULL, 'J'}, {"jpip", NO_ARG, NULL, 'J'},
{"mct", REQ_ARG, NULL, 'Y'}, {"mct", REQ_ARG, NULL, 'Y'},
{"IMF", REQ_ARG, NULL, 'Z'}, {"IMF", REQ_ARG, NULL, 'Z'},
{"PLT", NO_ARG, NULL, 'A'} {"PLT", NO_ARG, NULL, 'A'},
{"threads", REQ_ARG, NULL, 'B'}
}; };
/* parse the command line */ /* parse the command line */
@ -1679,6 +1685,19 @@ static int parse_cmdline_encoder(int argc, char **argv,
} }
break; break;
/* ----------------------------------------------------- */
case 'B': { /* Number of threads */
if (strcmp(opj_optarg, "ALL_CPUS") == 0) {
*pOutNumThreads = opj_get_num_cpus();
if (*pOutNumThreads == 1) {
*pOutNumThreads = 0;
}
} else {
sscanf(opj_optarg, "%d", pOutNumThreads);
}
}
break;
/* ------------------------------------------------------ */ /* ------------------------------------------------------ */
@ -1860,6 +1879,7 @@ int main(int argc, char **argv)
OPJ_FLOAT64 t = opj_clock(); OPJ_FLOAT64 t = opj_clock();
OPJ_BOOL PLT = OPJ_FALSE; OPJ_BOOL PLT = OPJ_FALSE;
int num_threads = 0;
/* set encoding parameters to default values */ /* set encoding parameters to default values */
opj_set_default_encoder_parameters(&parameters); opj_set_default_encoder_parameters(&parameters);
@ -1880,7 +1900,7 @@ int main(int argc, char **argv)
parameters.tcp_mct = (char) parameters.tcp_mct = (char)
255; /* This will be set later according to the input image or the provided option */ 255; /* This will be set later according to the input image or the provided option */
if (parse_cmdline_encoder(argc, argv, &parameters, &img_fol, &raw_cp, if (parse_cmdline_encoder(argc, argv, &parameters, &img_fol, &raw_cp,
indexfilename, sizeof(indexfilename), &framerate, &PLT) == 1) { indexfilename, sizeof(indexfilename), &framerate, &PLT, &num_threads) == 1) {
ret = 1; ret = 1;
goto fin; goto fin;
} }
@ -2141,6 +2161,15 @@ int main(int argc, char **argv)
} }
} }
if (num_threads >= 1 &&
!opj_codec_set_threads(l_codec, num_threads)) {
fprintf(stderr, "failed to set number of threads\n");
opj_destroy_codec(l_codec);
opj_image_destroy(image);
ret = 1;
goto fin;
}
/* open a byte stream for writing and allocate memory for all tiles */ /* open a byte stream for writing and allocate memory for all tiles */
l_stream = opj_stream_create_default_file_stream(parameters.outfile, OPJ_FALSE); l_stream = opj_stream_create_default_file_stream(parameters.outfile, OPJ_FALSE);
if (! l_stream) { if (! l_stream) {

View File

@ -657,6 +657,9 @@ opj_codec_t* OPJ_CALLCONV opj_create_compress(OPJ_CODEC_FORMAT p_format)
const char* const*, const char* const*,
struct opj_event_mgr *)) opj_j2k_encoder_set_extra_options; struct opj_event_mgr *)) opj_j2k_encoder_set_extra_options;
l_codec->opj_set_threads =
(OPJ_BOOL(*)(void * p_codec, OPJ_UINT32 num_threads)) opj_j2k_set_threads;
l_codec->m_codec = opj_j2k_create_compress(); l_codec->m_codec = opj_j2k_create_compress();
if (! l_codec->m_codec) { if (! l_codec->m_codec) {
opj_free(l_codec); opj_free(l_codec);
@ -700,6 +703,9 @@ opj_codec_t* OPJ_CALLCONV opj_create_compress(OPJ_CODEC_FORMAT p_format)
const char* const*, const char* const*,
struct opj_event_mgr *)) opj_jp2_encoder_set_extra_options; struct opj_event_mgr *)) opj_jp2_encoder_set_extra_options;
l_codec->opj_set_threads =
(OPJ_BOOL(*)(void * p_codec, OPJ_UINT32 num_threads)) opj_jp2_set_threads;
l_codec->m_codec = opj_jp2_create(OPJ_FALSE); l_codec->m_codec = opj_jp2_create(OPJ_FALSE);
if (! l_codec->m_codec) { if (! l_codec->m_codec) {
opj_free(l_codec); opj_free(l_codec);

View File

@ -1348,15 +1348,14 @@ OPJ_API OPJ_BOOL OPJ_CALLCONV opj_setup_decoder(opj_codec_t *p_codec,
* number, or "ALL_CPUS". If OPJ_NUM_THREADS is set and this function is called, * number, or "ALL_CPUS". If OPJ_NUM_THREADS is set and this function is called,
* this function will override the behaviour of the environment variable. * this function will override the behaviour of the environment variable.
* *
* Currently this function must be called after opj_setup_decoder() and * This function must be called after opj_setup_decoder() and
* before opj_read_header(). * before opj_read_header() for the decoding side, or after opj_setup_encoder()
* and before opj_start_compress() for the encoding side.
* *
* Note: currently only has effect on the decompressor. * @param p_codec decompressor or compressor handler
*
* @param p_codec decompressor handler
* @param num_threads number of threads. * @param num_threads number of threads.
* *
* @return OPJ_TRUE if the decoder is correctly set * @return OPJ_TRUE if the function is successful.
*/ */
OPJ_API OPJ_BOOL OPJ_CALLCONV opj_codec_set_threads(opj_codec_t *p_codec, OPJ_API OPJ_BOOL OPJ_CALLCONV opj_codec_set_threads(opj_codec_t *p_codec,
int num_threads); int num_threads);

View File

@ -177,7 +177,8 @@ static OPJ_FLOAT64 opj_t1_getwmsedec(
const OPJ_FLOAT64 * mct_norms, const OPJ_FLOAT64 * mct_norms,
OPJ_UINT32 mct_numcomps); OPJ_UINT32 mct_numcomps);
static void opj_t1_encode_cblk(opj_t1_t *t1, /** Return "cumwmsedec" that should be used to increase tile->distotile */
static double opj_t1_encode_cblk(opj_t1_t *t1,
opj_tcd_cblk_enc_t* cblk, opj_tcd_cblk_enc_t* cblk,
OPJ_UINT32 orient, OPJ_UINT32 orient,
OPJ_UINT32 compno, OPJ_UINT32 compno,
@ -186,7 +187,6 @@ static void opj_t1_encode_cblk(opj_t1_t *t1,
OPJ_FLOAT64 stepsize, OPJ_FLOAT64 stepsize,
OPJ_UINT32 cblksty, OPJ_UINT32 cblksty,
OPJ_UINT32 numcomps, OPJ_UINT32 numcomps,
opj_tcd_tile_t * tile,
const OPJ_FLOAT64 * mct_norms, const OPJ_FLOAT64 * mct_norms,
OPJ_UINT32 mct_numcomps); OPJ_UINT32 mct_numcomps);
@ -2100,42 +2100,37 @@ static OPJ_BOOL opj_t1_decode_cblk(opj_t1_t *t1,
} }
typedef struct {
OPJ_UINT32 compno;
OPJ_UINT32 resno;
opj_tcd_cblk_enc_t* cblk;
opj_tcd_tile_t *tile;
opj_tcd_band_t* band;
opj_tcd_tilecomp_t* tilec;
opj_tccp_t* tccp;
const OPJ_FLOAT64 * mct_norms;
OPJ_UINT32 mct_numcomps;
volatile OPJ_BOOL* pret;
opj_mutex_t* mutex;
} opj_t1_cblk_encode_processing_job_t;
/** Procedure to deal with a asynchronous code-block encoding job.
OPJ_BOOL opj_t1_encode_cblks(opj_t1_t *t1, *
opj_tcd_tile_t *tile, * @param user_data Pointer to a opj_t1_cblk_encode_processing_job_t* structure
opj_tcp_t *tcp, * @param tls TLS handle.
const OPJ_FLOAT64 * mct_norms, */
OPJ_UINT32 mct_numcomps static void opj_t1_clbl_encode_processor(void* user_data, opj_tls_t* tls)
)
{ {
OPJ_UINT32 compno, resno, bandno, precno, cblkno; opj_t1_cblk_encode_processing_job_t* job =
(opj_t1_cblk_encode_processing_job_t*)user_data;
opj_tcd_cblk_enc_t* cblk = job->cblk;
const opj_tcd_band_t* band = job->band;
const opj_tcd_tilecomp_t* tilec = job->tilec;
const opj_tccp_t* tccp = job->tccp;
const OPJ_UINT32 resno = job->resno;
opj_t1_t* t1;
const OPJ_UINT32 tile_w = (OPJ_UINT32)(tilec->x1 - tilec->x0);
tile->distotile = 0; /* fixed_quality */
for (compno = 0; compno < tile->numcomps; ++compno) {
opj_tcd_tilecomp_t* tilec = &tile->comps[compno];
opj_tccp_t* tccp = &tcp->tccps[compno];
OPJ_UINT32 tile_w = (OPJ_UINT32)(tilec->x1 - tilec->x0);
for (resno = 0; resno < tilec->numresolutions; ++resno) {
opj_tcd_resolution_t *res = &tilec->resolutions[resno];
for (bandno = 0; bandno < res->numbands; ++bandno) {
opj_tcd_band_t* OPJ_RESTRICT band = &res->bands[bandno];
OPJ_INT32 bandconst;
/* Skip empty bands */
if (opj_tcd_is_band_empty(band)) {
continue;
}
bandconst = 8192 * 8192 / ((OPJ_INT32) floor(band->stepsize * 8192));
for (precno = 0; precno < res->pw * res->ph; ++precno) {
opj_tcd_precinct_t *prc = &band->precincts[precno];
for (cblkno = 0; cblkno < prc->cw * prc->ch; ++cblkno) {
opj_tcd_cblk_enc_t* cblk = &prc->cblks.enc[cblkno];
OPJ_INT32* OPJ_RESTRICT tiledp; OPJ_INT32* OPJ_RESTRICT tiledp;
OPJ_UINT32 cblk_w; OPJ_UINT32 cblk_w;
OPJ_UINT32 cblk_h; OPJ_UINT32 cblk_h;
@ -2144,6 +2139,18 @@ OPJ_BOOL opj_t1_encode_cblks(opj_t1_t *t1,
OPJ_INT32 x = cblk->x0 - band->x0; OPJ_INT32 x = cblk->x0 - band->x0;
OPJ_INT32 y = cblk->y0 - band->y0; OPJ_INT32 y = cblk->y0 - band->y0;
if (!*(job->pret)) {
opj_free(job);
return;
}
t1 = (opj_t1_t*) opj_tls_get(tls, OPJ_TLS_KEY_T1);
if (t1 == NULL) {
t1 = opj_t1_create(OPJ_TRUE); /* OPJ_TRUE == T1 for encoding */
opj_tls_set(tls, OPJ_TLS_KEY_T1, t1, opj_t1_destroy_wrapper);
}
if (band->bandno & 1) { if (band->bandno & 1) {
opj_tcd_resolution_t *pres = &tilec->resolutions[resno - 1]; opj_tcd_resolution_t *pres = &tilec->resolutions[resno - 1];
x += pres->x1 - pres->x0; x += pres->x1 - pres->x0;
@ -2157,7 +2164,9 @@ OPJ_BOOL opj_t1_encode_cblks(opj_t1_t *t1,
t1, t1,
(OPJ_UINT32)(cblk->x1 - cblk->x0), (OPJ_UINT32)(cblk->x1 - cblk->x0),
(OPJ_UINT32)(cblk->y1 - cblk->y0))) { (OPJ_UINT32)(cblk->y1 - cblk->y0))) {
return OPJ_FALSE; *(job->pret) = OPJ_FALSE;
opj_free(job);
return;
} }
cblk_w = t1->w; cblk_w = t1->w;
@ -2185,6 +2194,9 @@ OPJ_BOOL opj_t1_encode_cblks(opj_t1_t *t1,
tileIndex += tileLineAdvance; tileIndex += tileLineAdvance;
} }
} else { /* if (tccp->qmfbid == 0) */ } else { /* if (tccp->qmfbid == 0) */
const OPJ_INT32 bandconst = 8192 * 8192 / ((OPJ_INT32) floor(
band->stepsize * 8192));
for (j = 0; j < cblk_h; ++j) { for (j = 0; j < cblk_h; ++j) {
for (i = 0; i < cblk_w; ++i) { for (i = 0; i < cblk_w; ++i) {
OPJ_INT32 tmp = tiledp[tileIndex]; OPJ_INT32 tmp = tiledp[tileIndex];
@ -2198,26 +2210,100 @@ OPJ_BOOL opj_t1_encode_cblks(opj_t1_t *t1,
} }
} }
{
OPJ_FLOAT64 cumwmsedec =
opj_t1_encode_cblk( opj_t1_encode_cblk(
t1, t1,
cblk, cblk,
band->bandno, band->bandno,
compno, job->compno,
tilec->numresolutions - 1 - resno, tilec->numresolutions - 1 - resno,
tccp->qmfbid, tccp->qmfbid,
band->stepsize, band->stepsize,
tccp->cblksty, tccp->cblksty,
tile->numcomps, job->tile->numcomps,
tile, job->mct_norms,
mct_norms, job->mct_numcomps);
mct_numcomps); if (job->mutex) {
opj_mutex_lock(job->mutex);
}
job->tile->distotile += cumwmsedec;
if (job->mutex) {
opj_mutex_unlock(job->mutex);
}
}
opj_free(job);
}
OPJ_BOOL opj_t1_encode_cblks(opj_tcd_t* tcd,
opj_tcd_tile_t *tile,
opj_tcp_t *tcp,
const OPJ_FLOAT64 * mct_norms,
OPJ_UINT32 mct_numcomps
)
{
volatile OPJ_BOOL ret = OPJ_TRUE;
opj_thread_pool_t* tp = tcd->thread_pool;
OPJ_UINT32 compno, resno, bandno, precno, cblkno;
opj_mutex_t* mutex = opj_mutex_create();
tile->distotile = 0; /* fixed_quality */
for (compno = 0; compno < tile->numcomps; ++compno) {
opj_tcd_tilecomp_t* tilec = &tile->comps[compno];
opj_tccp_t* tccp = &tcp->tccps[compno];
for (resno = 0; resno < tilec->numresolutions; ++resno) {
opj_tcd_resolution_t *res = &tilec->resolutions[resno];
for (bandno = 0; bandno < res->numbands; ++bandno) {
opj_tcd_band_t* OPJ_RESTRICT band = &res->bands[bandno];
/* Skip empty bands */
if (opj_tcd_is_band_empty(band)) {
continue;
}
for (precno = 0; precno < res->pw * res->ph; ++precno) {
opj_tcd_precinct_t *prc = &band->precincts[precno];
for (cblkno = 0; cblkno < prc->cw * prc->ch; ++cblkno) {
opj_tcd_cblk_enc_t* cblk = &prc->cblks.enc[cblkno];
opj_t1_cblk_encode_processing_job_t* job =
(opj_t1_cblk_encode_processing_job_t*) opj_calloc(1,
sizeof(opj_t1_cblk_encode_processing_job_t));
if (!job) {
ret = OPJ_FALSE;
goto end;
}
job->compno = compno;
job->tile = tile;
job->resno = resno;
job->cblk = cblk;
job->band = band;
job->tilec = tilec;
job->tccp = tccp;
job->mct_norms = mct_norms;
job->mct_numcomps = mct_numcomps;
job->pret = &ret;
job->mutex = mutex;
opj_thread_pool_submit_job(tp, opj_t1_clbl_encode_processor, job);
} /* cblkno */ } /* cblkno */
} /* precno */ } /* precno */
} /* bandno */ } /* bandno */
} /* resno */ } /* resno */
} /* compno */ } /* compno */
return OPJ_TRUE;
end:
opj_thread_pool_wait_completion(tcd->thread_pool, 0);
if (mutex) {
opj_mutex_destroy(mutex);
}
return ret;
} }
/* Returns whether the pass (bpno, passtype) is terminated */ /* Returns whether the pass (bpno, passtype) is terminated */
@ -2252,7 +2338,7 @@ static int opj_t1_enc_is_term_pass(opj_tcd_cblk_enc_t* cblk,
/** mod fixed_quality */ /** mod fixed_quality */
static void opj_t1_encode_cblk(opj_t1_t *t1, static OPJ_FLOAT64 opj_t1_encode_cblk(opj_t1_t *t1,
opj_tcd_cblk_enc_t* cblk, opj_tcd_cblk_enc_t* cblk,
OPJ_UINT32 orient, OPJ_UINT32 orient,
OPJ_UINT32 compno, OPJ_UINT32 compno,
@ -2261,7 +2347,6 @@ static void opj_t1_encode_cblk(opj_t1_t *t1,
OPJ_FLOAT64 stepsize, OPJ_FLOAT64 stepsize,
OPJ_UINT32 cblksty, OPJ_UINT32 cblksty,
OPJ_UINT32 numcomps, OPJ_UINT32 numcomps,
opj_tcd_tile_t * tile,
const OPJ_FLOAT64 * mct_norms, const OPJ_FLOAT64 * mct_norms,
OPJ_UINT32 mct_numcomps) OPJ_UINT32 mct_numcomps)
{ {
@ -2297,7 +2382,7 @@ static void opj_t1_encode_cblk(opj_t1_t *t1,
T1_NMSEDEC_FRACBITS) : 0; T1_NMSEDEC_FRACBITS) : 0;
if (cblk->numbps == 0) { if (cblk->numbps == 0) {
cblk->totalpasses = 0; cblk->totalpasses = 0;
return; return cumwmsedec;
} }
bpno = (OPJ_INT32)(cblk->numbps - 1); bpno = (OPJ_INT32)(cblk->numbps - 1);
@ -2343,7 +2428,6 @@ static void opj_t1_encode_cblk(opj_t1_t *t1,
tempwmsedec = opj_t1_getwmsedec(nmsedec, compno, level, orient, bpno, qmfbid, tempwmsedec = opj_t1_getwmsedec(nmsedec, compno, level, orient, bpno, qmfbid,
stepsize, numcomps, mct_norms, mct_numcomps) ; stepsize, numcomps, mct_norms, mct_numcomps) ;
cumwmsedec += tempwmsedec; cumwmsedec += tempwmsedec;
tile->distotile += tempwmsedec;
pass->distortiondec = cumwmsedec; pass->distortiondec = cumwmsedec;
if (opj_t1_enc_is_term_pass(cblk, cblksty, bpno, passtype)) { if (opj_t1_enc_is_term_pass(cblk, cblksty, bpno, passtype)) {
@ -2425,4 +2509,6 @@ static void opj_t1_encode_cblk(opj_t1_t *t1,
} }
} }
#endif #endif
return cumwmsedec;
} }

View File

@ -216,13 +216,13 @@ typedef struct opj_t1 {
/** /**
Encode the code-blocks of a tile Encode the code-blocks of a tile
@param t1 T1 handle @param tcd TCD handle
@param tile The tile to encode @param tile The tile to encode
@param tcp Tile coding parameters @param tcp Tile coding parameters
@param mct_norms FIXME DOC @param mct_norms FIXME DOC
@param mct_numcomps Number of components used for MCT @param mct_numcomps Number of components used for MCT
*/ */
OPJ_BOOL opj_t1_encode_cblks(opj_t1_t *t1, OPJ_BOOL opj_t1_encode_cblks(opj_tcd_t* tcd,
opj_tcd_tile_t *tile, opj_tcd_tile_t *tile,
opj_tcp_t *tcp, opj_tcp_t *tcp,
const OPJ_FLOAT64 * mct_norms, const OPJ_FLOAT64 * mct_norms,

View File

@ -2506,16 +2506,10 @@ static OPJ_BOOL opj_tcd_dwt_encode(opj_tcd_t *p_tcd)
static OPJ_BOOL opj_tcd_t1_encode(opj_tcd_t *p_tcd) static OPJ_BOOL opj_tcd_t1_encode(opj_tcd_t *p_tcd)
{ {
opj_t1_t * l_t1;
const OPJ_FLOAT64 * l_mct_norms; const OPJ_FLOAT64 * l_mct_norms;
OPJ_UINT32 l_mct_numcomps = 0U; OPJ_UINT32 l_mct_numcomps = 0U;
opj_tcp_t * l_tcp = p_tcd->tcp; opj_tcp_t * l_tcp = p_tcd->tcp;
l_t1 = opj_t1_create(OPJ_TRUE);
if (l_t1 == 00) {
return OPJ_FALSE;
}
if (l_tcp->mct == 1) { if (l_tcp->mct == 1) {
l_mct_numcomps = 3U; l_mct_numcomps = 3U;
/* irreversible encoding */ /* irreversible encoding */
@ -2529,13 +2523,9 @@ static OPJ_BOOL opj_tcd_t1_encode(opj_tcd_t *p_tcd)
l_mct_norms = (const OPJ_FLOAT64 *)(l_tcp->mct_norms); l_mct_norms = (const OPJ_FLOAT64 *)(l_tcp->mct_norms);
} }
if (! opj_t1_encode_cblks(l_t1, p_tcd->tcd_image->tiles, l_tcp, l_mct_norms, return opj_t1_encode_cblks(p_tcd,
l_mct_numcomps)) { p_tcd->tcd_image->tiles, l_tcp, l_mct_norms,
opj_t1_destroy(l_t1); l_mct_numcomps);
return OPJ_FALSE;
}
opj_t1_destroy(l_t1);
return OPJ_TRUE; return OPJ_TRUE;
} }