simplified `ThreadExecutor` class by moving some code out of it / fixed some thread safety issues (#4849)

This commit is contained in:
Oliver Stöneberg 2023-03-04 12:05:17 +01:00 committed by GitHub
parent 9291421840
commit a00b6e1f8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 221 additions and 122 deletions

View File

@ -643,7 +643,7 @@ cli/cppcheckexecutorseh.o: cli/cppcheckexecutorseh.cpp cli/cppcheckexecutor.h cl
cli/cppcheckexecutorsig.o: cli/cppcheckexecutorsig.cpp cli/cppcheckexecutor.h cli/cppcheckexecutorsig.h cli/stacktrace.h lib/color.h lib/config.h lib/errorlogger.h lib/errortypes.h lib/suppressions.h
$(CXX) ${INCLUDE_FOR_CLI} $(CPPFLAGS) $(CXXFLAGS) -c -o $@ cli/cppcheckexecutorsig.cpp
cli/executor.o: cli/executor.cpp cli/executor.h
cli/executor.o: cli/executor.cpp cli/executor.h lib/color.h lib/config.h lib/errorlogger.h lib/errortypes.h lib/importproject.h lib/library.h lib/mathlib.h lib/platform.h lib/settings.h lib/standards.h lib/suppressions.h lib/timer.h lib/utils.h
$(CXX) ${INCLUDE_FOR_CLI} $(CPPFLAGS) $(CXXFLAGS) -c -o $@ cli/executor.cpp
cli/filelister.o: cli/filelister.cpp cli/filelister.h lib/config.h lib/path.h lib/pathmatch.h lib/utils.h

View File

@ -345,6 +345,7 @@ int CppCheckExecutor::check_internal(CppCheck& cppcheck)
}
}
// TODO: not performed when multiple jobs are being used
// second loop to parse all markup files which may not work until all
// c/cpp files have been parsed and checked
for (std::map<std::string, std::size_t>::const_iterator i = mFiles.cbegin(); i != mFiles.cend(); ++i) {
@ -462,6 +463,7 @@ void CppCheckExecutor::reportStatus(std::size_t fileindex, std::size_t filecount
oss << fileindex << '/' << filecount
<< " files checked " << percentDone
<< "% done";
// TODO: do not unconditionally print in color
std::cout << Color::FgBlue << oss.str() << Color::Reset << std::endl;
}
}

View File

@ -18,6 +18,11 @@
#include "executor.h"
#include "errorlogger.h"
#include "settings.h"
#include <algorithm>
Executor::Executor(const std::map<std::string, std::size_t> &files, Settings &settings, ErrorLogger &errorLogger)
: mFiles(files), mSettings(settings), mErrorLogger(errorLogger)
{}
@ -25,3 +30,18 @@ Executor::Executor(const std::map<std::string, std::size_t> &files, Settings &se
Executor::~Executor()
{}
bool Executor::hasToLog(const ErrorMessage &msg)
{
if (!mSettings.nomsg.isSuppressed(msg))
{
std::string errmsg = msg.toString(mSettings.verbose);
std::lock_guard<std::mutex> lg(mErrorListSync);
if (std::find(mErrorList.cbegin(), mErrorList.cend(), errmsg) == mErrorList.cend()) {
mErrorList.emplace_back(std::move(errmsg));
return true;
}
}
return false;
}

View File

@ -22,10 +22,12 @@
#include <cstddef>
#include <list>
#include <map>
#include <mutex>
#include <string>
class Settings;
class ErrorLogger;
class ErrorMessage;
/// @addtogroup CLI
/// @{
@ -45,9 +47,19 @@ public:
virtual unsigned int check() = 0;
protected:
/**
* @brief Check if message is being suppressed and unique.
* @param msg the message to check
* @return true if message is not suppressed and unique
*/
bool hasToLog(const ErrorMessage &msg);
const std::map<std::string, std::size_t> &mFiles;
Settings &mSettings;
ErrorLogger &mErrorLogger;
private:
std::mutex mErrorListSync;
std::list<std::string> mErrorList;
};

View File

@ -169,16 +169,11 @@ int ProcessExecutor::handleRead(int rpipe, unsigned int &result)
std::exit(EXIT_FAILURE);
}
if (!mSettings.nomsg.isSuppressed(msg)) {
// Alert only about unique errors
std::string errmsg = msg.toString(mSettings.verbose);
if (std::find(mErrorList.cbegin(), mErrorList.cend(), errmsg) == mErrorList.cend()) {
mErrorList.emplace_back(std::move(errmsg));
if (type == PipeWriter::REPORT_ERROR)
mErrorLogger.reportErr(msg);
else
mErrorLogger.reportInfo(msg);
}
if (hasToLog(msg)) {
if (type == PipeWriter::REPORT_ERROR)
mErrorLogger.reportErr(msg);
else
mErrorLogger.reportInfo(msg);
}
} else if (type == PipeWriter::CHILD_END) {
std::istringstream iss(buf);

View File

@ -19,6 +19,7 @@
#include "threadexecutor.h"
#include "color.h"
#include "config.h"
#include "cppcheck.h"
#include "cppcheckexecutor.h"
#include "errorlogger.h"
@ -45,27 +46,70 @@ ThreadExecutor::ThreadExecutor(const std::map<std::string, std::size_t> &files,
ThreadExecutor::~ThreadExecutor()
{}
class ThreadExecutor::SyncLogForwarder : public ErrorLogger
class Data
{
public:
explicit SyncLogForwarder(ThreadExecutor &threadExecutor)
: mThreadExecutor(threadExecutor), mProcessedFiles(0), mTotalFiles(0), mProcessedSize(0) {
Data(const std::map<std::string, std::size_t> &files, const std::list<ImportProject::FileSettings> &fileSettings)
: mFiles(files), mFileSettings(fileSettings), mProcessedFiles(0), mProcessedSize(0)
{
mItNextFile = mFiles.begin();
mItNextFileSettings = mFileSettings.begin();
const std::map<std::string, std::size_t>& files = mThreadExecutor.mFiles;
mItNextFile = files.begin();
mItNextFileSettings = mThreadExecutor.mSettings.project.fileSettings.begin();
mTotalFiles = files.size() + mThreadExecutor.mSettings.project.fileSettings.size();
mTotalFileSize = std::accumulate(files.cbegin(), files.cend(), std::size_t(0), [](std::size_t v, const std::pair<std::string, std::size_t>& p) {
mTotalFiles = mFiles.size() + mFileSettings.size();
mTotalFileSize = std::accumulate(mFiles.cbegin(), mFiles.cend(), std::size_t(0), [](std::size_t v, const std::pair<std::string, std::size_t>& p) {
return v + p.second;
});
}
bool finished() {
std::lock_guard<std::mutex> l(mFileSync);
return mItNextFile == mFiles.cend() && mItNextFileSettings == mFileSettings.cend();
}
bool next(const std::string *&file, const ImportProject::FileSettings *&fs, std::size_t &fileSize) {
std::lock_guard<std::mutex> l(mFileSync);
if (mItNextFile != mFiles.end()) {
file = &mItNextFile->first;
fileSize = mItNextFile->second;
++mItNextFile;
return true;
}
if (mItNextFileSettings != mFileSettings.end()) {
fs = &(*mItNextFileSettings);
fileSize = 0;
++mItNextFileSettings;
return true;
}
return false;
}
private:
const std::map<std::string, std::size_t> &mFiles;
std::map<std::string, std::size_t>::const_iterator mItNextFile;
const std::list<ImportProject::FileSettings> &mFileSettings;
std::list<ImportProject::FileSettings>::const_iterator mItNextFileSettings;
public:
std::size_t mProcessedFiles;
std::size_t mTotalFiles;
std::size_t mProcessedSize;
std::size_t mTotalFileSize;
std::mutex mFileSync;
};
class SyncLogForwarder : public ErrorLogger
{
public:
explicit SyncLogForwarder(ThreadExecutor &threadExecutor, ErrorLogger &errorLogger)
: mThreadExecutor(threadExecutor), mErrorLogger(errorLogger) {}
void reportOut(const std::string &outmsg, Color c) override
{
std::lock_guard<std::mutex> lg(mReportSync);
mThreadExecutor.mErrorLogger.reportOut(outmsg, c);
mErrorLogger.reportOut(outmsg, c);
}
void reportErr(const ErrorMessage &msg) override {
@ -76,18 +120,6 @@ public:
report(msg, MessageType::REPORT_INFO);
}
ThreadExecutor &mThreadExecutor;
std::map<std::string, std::size_t>::const_iterator mItNextFile;
std::list<ImportProject::FileSettings>::const_iterator mItNextFileSettings;
std::size_t mProcessedFiles;
std::size_t mTotalFiles;
std::size_t mProcessedSize;
std::size_t mTotalFileSize;
std::mutex mFileSync;
std::mutex mErrorSync;
std::mutex mReportSync;
private:
@ -95,47 +127,77 @@ private:
void report(const ErrorMessage &msg, MessageType msgType)
{
if (mThreadExecutor.mSettings.nomsg.isSuppressed(msg))
if (!mThreadExecutor.hasToLog(msg))
return;
// Alert only about unique errors
bool reportError = false;
std::lock_guard<std::mutex> lg(mReportSync);
{
std::string errmsg = msg.toString(mThreadExecutor.mSettings.verbose);
switch (msgType) {
case MessageType::REPORT_ERROR:
mErrorLogger.reportErr(msg);
break;
case MessageType::REPORT_INFO:
mErrorLogger.reportInfo(msg);
break;
}
}
std::lock_guard<std::mutex> lg(mErrorSync);
if (std::find(mThreadExecutor.mErrorList.cbegin(), mThreadExecutor.mErrorList.cend(), errmsg) == mThreadExecutor.mErrorList.cend()) {
mThreadExecutor.mErrorList.emplace_back(std::move(errmsg));
reportError = true;
}
ThreadExecutor &mThreadExecutor;
ErrorLogger &mErrorLogger;
};
static unsigned int STDCALL threadProc(Data *data, SyncLogForwarder* logForwarder, const Settings &settings)
{
unsigned int result = 0;
for (;;) {
if (data->finished()) {
break;
}
if (reportError) {
std::lock_guard<std::mutex> lg(mReportSync);
const std::string *file = nullptr;
const ImportProject::FileSettings *fs = nullptr;
std::size_t fileSize;
if (!data->next(file, fs, fileSize))
break;
switch (msgType) {
case MessageType::REPORT_ERROR:
mThreadExecutor.mErrorLogger.reportErr(msg);
break;
case MessageType::REPORT_INFO:
mThreadExecutor.mErrorLogger.reportInfo(msg);
break;
CppCheck fileChecker(*logForwarder, false, CppCheckExecutor::executeCommand);
fileChecker.settings() = settings;
if (fs) {
// file settings..
result += fileChecker.check(*fs);
if (settings.clangTidy)
fileChecker.analyseClangTidy(*fs);
} else {
// Read file from a file
result += fileChecker.check(*file);
}
{
std::lock_guard<std::mutex> l(data->mFileSync);
data->mProcessedSize += fileSize;
data->mProcessedFiles++;
if (!settings.quiet) {
std::lock_guard<std::mutex> lg(logForwarder->mReportSync);
CppCheckExecutor::reportStatus(data->mProcessedFiles, data->mTotalFiles, data->mProcessedSize, data->mTotalFileSize);
}
}
}
};
return result;
}
unsigned int ThreadExecutor::check()
{
std::vector<std::future<unsigned int>> threadFutures;
threadFutures.reserve(mSettings.jobs);
SyncLogForwarder logforwarder(*this);
Data data(mFiles, mSettings.project.fileSettings);
SyncLogForwarder logforwarder(*this, mErrorLogger);
for (unsigned int i = 0; i < mSettings.jobs; ++i) {
try {
threadFutures.emplace_back(std::async(std::launch::async, threadProc, &logforwarder));
threadFutures.emplace_back(std::async(std::launch::async, &threadProc, &data, &logforwarder, mSettings));
}
catch (const std::system_error &e) {
std::cerr << "#### ThreadExecutor::check exception :" << e.what() << std::endl;
@ -147,53 +209,3 @@ unsigned int ThreadExecutor::check()
return v + f.get();
});
}
unsigned int STDCALL ThreadExecutor::threadProc(SyncLogForwarder* logForwarder)
{
unsigned int result = 0;
std::map<std::string, std::size_t>::const_iterator &itFile = logForwarder->mItNextFile;
std::list<ImportProject::FileSettings>::const_iterator &itFileSettings = logForwarder->mItNextFileSettings;
// guard static members of CppCheck against concurrent access
logForwarder->mFileSync.lock();
for (;;) {
if (itFile == logForwarder->mThreadExecutor.mFiles.cend() && itFileSettings == logForwarder->mThreadExecutor.mSettings.project.fileSettings.cend()) {
logForwarder->mFileSync.unlock();
break;
}
CppCheck fileChecker(*logForwarder, false, CppCheckExecutor::executeCommand);
fileChecker.settings() = logForwarder->mThreadExecutor.mSettings;
std::size_t fileSize = 0;
if (itFile != logForwarder->mThreadExecutor.mFiles.end()) {
const std::string &file = itFile->first;
fileSize = itFile->second;
++itFile;
logForwarder->mFileSync.unlock();
// Read file from a file
result += fileChecker.check(file);
} else { // file settings..
const ImportProject::FileSettings &fs = *itFileSettings;
++itFileSettings;
logForwarder->mFileSync.unlock();
result += fileChecker.check(fs);
if (logForwarder->mThreadExecutor.mSettings.clangTidy)
fileChecker.analyseClangTidy(fs);
}
logForwarder->mFileSync.lock();
logForwarder->mProcessedSize += fileSize;
logForwarder->mProcessedFiles++;
if (!logForwarder->mThreadExecutor.mSettings.quiet) {
std::lock_guard<std::mutex> lg(logForwarder->mReportSync);
CppCheckExecutor::reportStatus(logForwarder->mProcessedFiles, logForwarder->mTotalFiles, logForwarder->mProcessedSize, logForwarder->mTotalFileSize);
}
}
return result;
}

View File

@ -19,8 +19,6 @@
#ifndef THREADEXECUTOR_H
#define THREADEXECUTOR_H
#include "config.h"
#include "executor.h"
#include <cstddef>
@ -46,9 +44,7 @@ public:
unsigned int check() override;
private:
class SyncLogForwarder;
static unsigned int STDCALL threadProc(SyncLogForwarder *logForwarder);
friend class SyncLogForwarder;
};
/// @}

View File

@ -25,7 +25,6 @@
/*
TODO:
- rename "file" to "single"
- synchronise map access in multithreaded mode or disable timing
- add unit tests
- for --showtime (needs input file)
- for Timer* classes
@ -48,9 +47,9 @@ void TimerResults::showResults(SHOWTIME_MODES mode) const
TimerResultsData overallData;
std::vector<dataElementType> data;
data.reserve(mResults.size());
{
std::lock_guard<std::mutex> l(mResultsSync);
data.reserve(mResults.size());
data.insert(data.begin(), mResults.cbegin(), mResults.cend());
}
std::sort(data.begin(), data.end(), more_second_sec);

View File

@ -102,7 +102,7 @@ bool TestFixture::prepareTest(const char testname[])
std::putchar('.'); // Use putchar to write through redirection of std::cout/cerr
std::fflush(stdout);
} else {
std::cout << classname << "::" << testname << std::endl;
std::cout << classname << "::" << mTestname << std::endl;
}
return true;
}
@ -316,7 +316,7 @@ void TestFixture::run(const std::string &str)
try {
if (quiet_tests) {
std::cout << '\n' << classname << ':';
REDIRECT;
SUPPRESS;
run();
}
else
@ -324,15 +324,15 @@ void TestFixture::run(const std::string &str)
}
catch (const InternalError& e) {
++fails_counter;
errmsg << "InternalError: " << e.errorMessage << std::endl;
errmsg << classname << "::" << mTestname << " - InternalError: " << e.errorMessage << std::endl;
}
catch (const std::exception& error) {
++fails_counter;
errmsg << "exception: " << error.what() << std::endl;
errmsg << classname << "::" << mTestname << " - Exception: " << error.what() << std::endl;
}
catch (...) {
++fails_counter;
errmsg << "Unknown exception" << std::endl;
errmsg << classname << "::" << mTestname << " - Unknown exception" << std::endl;
}
}

View File

@ -29,6 +29,7 @@ extern std::ostringstream output;
* @brief Utility class for capturing cout and cerr to ostringstream buffers
* for later use. Uses RAII to stop redirection when the object goes out of
* scope.
* NOTE: This is *not* thread-safe.
*/
class RedirectOutputError {
public:
@ -83,10 +84,38 @@ private:
std::streambuf *_oldCerr;
};
#define REDIRECT RedirectOutputError redir; do {} while (false)
class SuppressOutput {
public:
/** Set up suppression, flushing anything in the pipes. */
SuppressOutput() {
// flush all old output
std::cout.flush();
std::cerr.flush();
_oldCout = std::cout.rdbuf(); // back up cout's streambuf
_oldCerr = std::cerr.rdbuf(); // back up cerr's streambuf
std::cout.rdbuf(nullptr); // disable cout
std::cerr.rdbuf(nullptr); // disable cerr
}
/** Revert cout and cerr behaviour */
~SuppressOutput() {
std::cout.rdbuf(_oldCout); // restore cout's original streambuf
std::cerr.rdbuf(_oldCerr); // restore cerrs's original streambuf
}
private:
std::streambuf *_oldCout;
std::streambuf *_oldCerr;
};
#define REDIRECT RedirectOutputError redir
#define GET_REDIRECT_OUTPUT redir.getOutput()
#define CLEAR_REDIRECT_OUTPUT redir.clearOutput()
#define GET_REDIRECT_ERROUT redir.getErrout()
#define CLEAR_REDIRECT_ERROUT redir.clearErrout()
#define SUPPRESS SuppressOutput supprout
#endif

View File

@ -43,7 +43,7 @@ private:
* Execute check using n jobs for y files which are have
* identical data, given within data.
*/
void check(unsigned int jobs, int files, int result, const std::string &data, SHOWTIME_MODES showtime = SHOWTIME_MODES::SHOWTIME_NONE) {
void check(unsigned int jobs, int files, int result, const std::string &data, SHOWTIME_MODES showtime = SHOWTIME_MODES::SHOWTIME_NONE, const char* const plistOutput = nullptr) {
errout.str("");
output.str("");
@ -56,6 +56,9 @@ private:
settings.jobs = jobs;
settings.showtime = showtime;
if (plistOutput)
settings.plistOutput = plistOutput;
// TODO: test with settings.project.fileSettings;
ProcessExecutor executor(filemap, settings, *this);
std::vector<std::unique_ptr<ScopedFile>> scopedfiles;
scopedfiles.reserve(filemap.size());
@ -72,6 +75,7 @@ private:
TEST_CASE(deadlock_with_many_errors);
TEST_CASE(many_threads);
TEST_CASE(many_threads_showtime);
TEST_CASE(many_threads_plist);
TEST_CASE(no_errors_more_files);
TEST_CASE(no_errors_less_files);
TEST_CASE(no_errors_equal_amount_files);
@ -103,7 +107,7 @@ private:
// #11249 - reports TSAN errors
void many_threads_showtime() {
REDIRECT;
SUPPRESS;
check(16, 100, 100,
"int main()\n"
"{\n"
@ -112,6 +116,19 @@ private:
"}", SHOWTIME_MODES::SHOWTIME_SUMMARY);
}
void many_threads_plist() {
const char plistOutput[] = "plist";
ScopedFile plistFile("dummy", plistOutput);
SUPPRESS;
check(16, 100, 100,
"int main()\n"
"{\n"
" char *a = malloc(10);\n"
" return 0;\n"
"}", SHOWTIME_MODES::SHOWTIME_NONE, plistOutput);
}
void no_errors_more_files() {
check(2, 3, 0,
"int main()\n"

View File

@ -43,7 +43,7 @@ private:
* Execute check using n jobs for y files which are have
* identical data, given within data.
*/
void check(unsigned int jobs, int files, int result, const std::string &data, SHOWTIME_MODES showtime = SHOWTIME_MODES::SHOWTIME_NONE) {
void check(unsigned int jobs, int files, int result, const std::string &data, SHOWTIME_MODES showtime = SHOWTIME_MODES::SHOWTIME_NONE, const char* const plistOutput = nullptr) {
errout.str("");
output.str("");
@ -56,6 +56,9 @@ private:
settings.jobs = jobs;
settings.showtime = showtime;
if (plistOutput)
settings.plistOutput = plistOutput;
// TODO: test with settings.project.fileSettings;
ThreadExecutor executor(filemap, settings, *this);
std::vector<std::unique_ptr<ScopedFile>> scopedfiles;
scopedfiles.reserve(filemap.size());
@ -71,6 +74,7 @@ private:
TEST_CASE(deadlock_with_many_errors);
TEST_CASE(many_threads);
TEST_CASE(many_threads_showtime);
TEST_CASE(many_threads_plist);
TEST_CASE(no_errors_more_files);
TEST_CASE(no_errors_less_files);
TEST_CASE(no_errors_equal_amount_files);
@ -101,7 +105,7 @@ private:
// #11249 - reports TSAN errors - only applies to threads not processes though
void many_threads_showtime() {
REDIRECT;
SUPPRESS;
check(16, 100, 100,
"int main()\n"
"{\n"
@ -110,6 +114,19 @@ private:
"}", SHOWTIME_MODES::SHOWTIME_SUMMARY);
}
void many_threads_plist() {
const char plistOutput[] = "plist";
ScopedFile plistFile("dummy", plistOutput);
SUPPRESS;
check(16, 100, 100,
"int main()\n"
"{\n"
" char *a = malloc(10);\n"
" return 0;\n"
"}", SHOWTIME_MODES::SHOWTIME_NONE, plistOutput);
}
void no_errors_more_files() {
check(2, 3, 0,
"int main()\n"