Cleanup code after long running on ArchLinux

* Cleanup code where required in preparation for further testing on other platforms
This commit is contained in:
abraunegg 2024-05-18 10:41:06 +10:00
parent 837933d196
commit fb77411b1f
8 changed files with 449 additions and 278 deletions

View file

@ -163,9 +163,7 @@ class ClientSideFiltering {
// Test the entire path working backwards from child
string path = buildNormalizedPath(name);
string checkPath;
auto paths = pathSplitter(path);
foreach_reverse(directory; paths) {
foreach_reverse(directory; pathSplitter(path)) {
if (directory != "/") {
// This will add a leading '/' but that needs to be stripped to check
checkPath = "/" ~ directory ~ checkPath;

View file

@ -10,6 +10,7 @@ import std.file;
import std.json;
import std.stdio;
import std.range;
import core.memory;
// What other modules that we have created do we need to import?
import log;
@ -197,7 +198,7 @@ class CurlEngine {
// Is the actual http instance is stopped?
if (!http.isStopped) {
// HTTP instance was not stopped .. need to stop it
// HTTP instance was not stopped .. we need to stop it
http.shutdown();
object.destroy(http); // Destroy, however we cant set to null
}
@ -216,7 +217,7 @@ class CurlEngine {
return new CurlEngine; // Constructs a new CurlEngine with a fresh HTTP instance
} else {
CurlEngine curlEngine = curlEnginePool[$ - 1];
curlEnginePool.popBack();
curlEnginePool.popBack(); // assumes a LIFO (last-in, first-out) usage pattern
// Is this engine stopped?
if (curlEngine.http.isStopped) {
@ -236,30 +237,41 @@ class CurlEngine {
// Release all curl instances
static void releaseAllCurlInstances() {
addLogEntry("CurlEngine releaseAllCurlInstances() called", ["debug"]);
synchronized (CurlEngine.classinfo) {
// What is the current pool size
addLogEntry("CurlEngine curlEnginePool size to release: " ~ to!string(curlEnginePool.length), ["debug"]);
// Safely iterate and clean up each CurlEngine instance
foreach (curlEngineInstance; curlEnginePool) {
try {
curlEngineInstance.cleanup(); // Cleanup instance by resetting values
curlEngineInstance.shutdownCurlHTTPInstance(); // Assume proper cleanup of any resources used by HTTP
} catch (Exception e) {
// Log the error or handle it appropriately
// e.g., writeln("Error during cleanup/shutdown: ", e.toString());
}
// It's safe to destroy the object here assuming no other references exist
object.destroy(curlEngineInstance); // Destroy, then set to null
curlEngineInstance = null;
}
// Clear the array after all instances have been handled
curlEnginePool.length = 0; // More explicit than curlEnginePool = [];
if (curlEnginePool.length > 0) {
// Safely iterate and clean up each CurlEngine instance
foreach (curlEngineInstance; curlEnginePool) {
try {
curlEngineInstance.cleanup(true); // Cleanup instance by resetting values and flushing cookie cache
curlEngineInstance.shutdownCurlHTTPInstance(); // Assume proper cleanup of any resources used by HTTP
} catch (Exception e) {
// Log the error or handle it appropriately
// e.g., writeln("Error during cleanup/shutdown: ", e.toString());
}
// It's safe to destroy the object here assuming no other references exist
object.destroy(curlEngineInstance); // Destroy, then set to null
curlEngineInstance = null;
// Perform Garbage Collection on this destroyed curl engine
GC.collect();
}
// Clear the array after all instances have been handled
curlEnginePool.length = 0; // More explicit than curlEnginePool = [];
}
}
// Perform Garbage Collection on this destroyed curl engine
GC.collect();
}
// Destroy all curl instances
// Return how many curl engines there are
static ulong curlEnginePoolLength() {
return curlEnginePool.length;
}
// Destroy all curl instances
static void destroyAllCurlInstances() {
addLogEntry("CurlEngine destroyAllCurlInstances() called", ["debug"]);
// Release all 'curl' instances
@ -271,11 +283,15 @@ class CurlEngine {
// Log that we are releasing this engine back to the pool
addLogEntry("CurlEngine releaseEngine() called on instance id: " ~ to!string(internalThreadId), ["debug"]);
addLogEntry("CurlEngine curlEnginePool size before release: " ~ to!string(curlEnginePool.length), ["debug"]);
cleanup();
// cleanup this curl instance before putting it back in the pool
cleanup(true); // Cleanup instance by resetting values and flushing cookie cache
synchronized (CurlEngine.classinfo) {
curlEnginePool ~= this;
addLogEntry("CurlEngine curlEnginePool size after release: " ~ to!string(curlEnginePool.length), ["debug"]);
}
// Perform Garbage Collection
GC.collect();
}
// Initialise this curl instance
@ -472,7 +488,7 @@ class CurlEngine {
}
// Cleanup this instance internal variables that may have been set
void cleanup() {
void cleanup(bool flushCookies = false) {
// Reset any values to defaults, freeing any set objects
addLogEntry("CurlEngine cleanup() called on instance id: " ~ to!string(internalThreadId), ["debug"]);
@ -488,9 +504,14 @@ class CurlEngine {
return 0;
};
http.contentLength = 0;
http.flushCookieJar();
http.clearSessionCookies();
http.clearAllCookies();
// We only do this if we are pushing the curl engine back to the curl pool
if (flushCookies) {
// Flush the cookie cache as well
http.flushCookieJar();
http.clearSessionCookies();
http.clearAllCookies();
}
}
// set the response to null
@ -513,6 +534,12 @@ class CurlEngine {
http.shutdown();
object.destroy(http); // Destroy, however we cant set to null
addLogEntry("HTTP instance shutdown and destroyed: " ~ to!string(internalThreadId), ["debug"]);
} else {
// Already stopped .. destroy it
object.destroy(http); // Destroy, however we cant set to null
addLogEntry("Stopped HTTP instance shutdown and destroyed: " ~ to!string(internalThreadId), ["debug"]);
}
// Perform Garbage Collection
GC.collect();
}
}

View file

@ -733,24 +733,9 @@ final class ItemDatabase {
return items;
}
// Select all items associated with the provided driveId
Item[] selectAllItemsByDriveId(const(char)[] driveId) {
assert(driveId);
Item[] items;
auto stmt = db.prepare("SELECT * FROM item WHERE driveId = ?1");
stmt.bind(1, driveId);
auto res = stmt.exec();
while (!res.empty) {
items ~= buildItem(res);
res.step();
}
return items;
}
// Perform a vacuum on the database, commit WAL / SHM to file
void performVacuum() {
addLogEntry("Attempting to perform a database vacuum to merge any temporary data", ["debug"]);
try {
auto stmt = db.prepare("VACUUM;");
stmt.exec();

View file

@ -63,10 +63,10 @@ class LogBuffer {
// Wait for the flush thread to finish outside of the synchronized block to avoid deadlocks
if (flushThread.isRunning()) {
// Flush any remaining log
flushBuffer();
// Join all threads
flushThread.join();
// Flush any remaining log
flushBuffer();
}
// Flush anything remaining
@ -164,6 +164,8 @@ class LogBuffer {
}
}
}
// Clear Messages
messages = [];
}
}

View file

@ -3,7 +3,6 @@ module main;
// What does this module require to function?
import core.stdc.stdlib: EXIT_SUCCESS, EXIT_FAILURE, exit;
import core.stdc.signal;
import core.sys.posix.signal;
import core.memory;
import core.time;
@ -55,9 +54,6 @@ bool dryRun = false;
string runtimeDatabaseFile = "";
int main(string[] cliArgs) {
// Setup CTRL-C handler
setupSignalHandler();
// Application Start Time - used during monitor loop to detail how long it has been running for
auto applicationStartTime = Clock.currTime();
// Disable buffering on stdout - this is needed so that when we are using plain write() it will go to the terminal without flushing
@ -160,8 +156,9 @@ int main(string[] cliArgs) {
// Who are we running as? This will print the ProcessID, UID, GID and username the application is running as
runtimeUserName = getUserName();
// Print in debug the application version as soon as possible
// Print the application version and how this was compiled as soon as possible
addLogEntry("Application Version: " ~ applicationVersion, ["debug"]);
addLogEntry("Application Compiled With: " ~ compilerDetails(), ["debug"]);
// How was this application started - what options were passed in
addLogEntry("Passed in 'cliArgs': " ~ to!string(cliArgs), ["debug"]);
@ -656,7 +653,7 @@ int main(string[] cliArgs) {
checkForNoMountScenario();
// Set the default thread pool value
defaultPoolThreads(to!int(appConfig.getValueLong("threads")));
defaultPoolThreads(1);
// Is the sync engine initialised correctly?
if (appConfig.syncEngineWasInitialised) {
@ -708,7 +705,10 @@ int main(string[] cliArgs) {
// Display that we are syncing from a specific path due to --single-directory
addLogEntry("Syncing changes from this selected path: " ~ singleDirectoryPath, ["verbose"]);
}
// Handle SIGINT and SIGTERM
setupSignalHandler();
// Are we doing a --sync operation? This includes doing any --single-directory operations
if (appConfig.getValueBool("synchronize")) {
// Did the user specify --upload-only?
@ -982,11 +982,12 @@ int main(string[] cliArgs) {
// Detail the outcome of the sync process
displaySyncOutcome();
if (appConfig.fullScanTrueUpRequired) {
// Write WAL and SHM data to file for this loop
addLogEntry("Merge contents of WAL and SHM files into main database file", ["debug"]);
itemDB.performVacuum();
}
// Cleanup sync process arrays
syncEngineInstance.cleanupArrays();
// Write WAL and SHM data to file for this loop and release memory used by in-memory processing
addLogEntry("Merge contents of WAL and SHM files into main database file", ["debug"]);
itemDB.performVacuum();
} else {
// Not online
addLogEntry("Microsoft OneDrive service is not reachable at this time. Will re-try on next sync attempt.");
@ -999,7 +1000,9 @@ int main(string[] cliArgs) {
// Release all the curl instances used during this loop
// New curl instances will be established on next loop
addLogEntry("CurlEngine Pool Size PRE Cleanup: " ~ to!string(CurlEngine.curlEnginePoolLength()), ["debug"]);
CurlEngine.releaseAllCurlInstances();
addLogEntry("CurlEngine Pool Size POST Cleanup: " ~ to!string(CurlEngine.curlEnginePoolLength()) , ["debug"]);
// Display memory details before garbage collection
if (displayMemoryUsage) displayMemoryUsagePreGC();
@ -1007,6 +1010,7 @@ int main(string[] cliArgs) {
GC.collect();
// Return free memory to the OS
GC.minimize();
// Display memory details after garbage collection
if (displayMemoryUsage) displayMemoryUsagePostGC();
@ -1189,7 +1193,6 @@ void performStandardSyncProcess(string localPath, Monitor filesystemMonitor = nu
filesystemMonitor.update(false);
}
// Perform the local database consistency check, picking up locally modified data and uploading this to OneDrive
syncEngineInstance.performDatabaseConsistencyAndIntegrityCheck();
if (appConfig.getValueBool("monitor")) {
@ -1333,24 +1336,13 @@ auto assumeNoGC(T) (T t) if (isFunctionPointer!T || isDelegate!T) {
return cast(SetFunctionAttributes!(T, functionLinkage!T, attrs)) t;
}
// Configure the signal handler to catch SIGINT (CTRL-C) and SIGTERM (kill)
void setupSignalHandler() {
sigaction_t sa;
sa.sa_flags = SA_RESETHAND | SA_NODEFER; // Use reset and no defer flags to handle reentrant signals
sa.sa_handler = &exitHandler; // Direct function pointer assignment
sigemptyset(&sa.sa_mask); // Initialize the signal set to empty
// Register the signal handler for SIGINT
if (sigaction(SIGINT, &sa, null) != 0) {
writeln("FATAL: Failed to install SIGINT handler");
exit(EXIT_FAILURE);
}
// Register the signal handler for SIGTERM
if (sigaction(SIGTERM, &sa, null) != 0) {
writeln("FATAL: Failed to install SIGTERM handler");
exit(EXIT_FAILURE);
}
sigaction_t action;
action.sa_handler = &exitHandler; // Direct function pointer assignment
sigemptyset(&action.sa_mask); // Initialize the signal set to empty
action.sa_flags = 0;
sigaction(SIGINT, &action, null); // Interrupt from keyboard
sigaction(SIGTERM, &action, null); // Termination signal
}
// Catch SIGINT (CTRL-C) and SIGTERM (kill), handle rapid repeat presses
@ -1358,39 +1350,40 @@ extern(C) nothrow @nogc @system void exitHandler(int value) {
if (shutdownInProgress) {
return; // Ignore subsequent presses
}
shutdownInProgress = true;
} else {
// Disable logging suppression
appConfig.suppressLoggingOutput = false;
// Flag we are shutting down
shutdownInProgress = true;
try {
assumeNoGC ( () {
addLogEntry("\nReceived termination signal, initiating cleanup");
// Wait for all parallel jobs that depend on the database to complete
addLogEntry("Waiting for any existing upload|download process to complete");
shutdownSyncEngine();
// Perform the shutdown process
performSynchronisedExitProcess("exitHandler");
})();
} catch (Exception e) {
// Any output here will cause a GC allocation
// - Error: `@nogc` function `main.exitHandler` cannot call non-@nogc function `std.stdio.writeln!string.writeln`
// - Error: cannot use operator `~` in `@nogc` function `main.exitHandler`
// writeln("Exception during shutdown: " ~ e.msg);
try {
assumeNoGC ( () {
addLogEntry("\nReceived termination signal, initiating application cleanup");
// Wait for all parallel jobs that depend on the database to complete
addLogEntry("Waiting for any existing upload|download process to complete");
shutdownSyncEngine();
// Perform the shutdown process
performSynchronisedExitProcess("SIGINT-SIGTERM-HANDLER");
})();
} catch (Exception e) {
// Any output here will cause a GC allocation
// - Error: `@nogc` function `main.exitHandler` cannot call non-@nogc function `std.stdio.writeln!string.writeln`
// - Error: cannot use operator `~` in `@nogc` function `main.exitHandler`
// writeln("Exception during shutdown: " ~ e.msg);
}
// Exit the process with the provided exit code
exit(value);
}
// Exit the process with the provided exit code
exit(value);
}
// Handle application exit
void performSynchronisedExitProcess(string scopeCaller = null) {
synchronized {
// Logging the caller of the shutdown procedure
if (!scopeCaller.empty) {
addLogEntry("performSynchronisedExitProcess called by: " ~ scopeCaller, ["debug"]);
}
// Perform cleanup and shutdown of various services and resources
try {
// Log who called this function
addLogEntry("performSynchronisedExitProcess called by: " ~ scopeCaller, ["debug"]);
// Shutdown the OneDrive Webhook instance
shutdownOneDriveWebhook();
// Shutdown the client side filtering objects
@ -1414,68 +1407,71 @@ void performSynchronisedExitProcess(string scopeCaller = null) {
// Shutdown application logging
shutdownApplicationLogging();
}
// Perform Garbage Cleanup
GC.collect();
// Return free memory to the OS
GC.minimize();
}
}
void shutdownOneDriveWebhook() {
if (oneDriveWebhook !is null) {
addLogEntry("Shutdown OneDrive Webhook instance", ["debug"]);
addLogEntry("Shutting down OneDrive Webhook instance", ["debug"]);
oneDriveWebhook.stop();
object.destroy(oneDriveWebhook);
oneDriveWebhook = null;
addLogEntry("Shutdown of OneDrive Webhook instance is complete", ["debug"]);
}
}
void shutdownFilesystemMonitor() {
if (filesystemMonitor !is null) {
addLogEntry("Shutdown Filesystem Monitoring instance", ["debug"]);
addLogEntry("Shutting down Filesystem Monitoring instance", ["debug"]);
filesystemMonitor.shutdown();
object.destroy(filesystemMonitor);
filesystemMonitor = null;
addLogEntry("Shut down of Filesystem Monitoring instance is complete", ["debug"]);
}
}
void shutdownSelectiveSync() {
if (selectiveSync !is null) {
addLogEntry("Shutdown Client Side Filtering instance", ["debug"]);
addLogEntry("Shutting down Client Side Filtering instance", ["debug"]);
selectiveSync.shutdown();
object.destroy(selectiveSync);
selectiveSync = null;
addLogEntry("Shut down of Client Side Filtering instance is complete", ["debug"]);
}
}
void shutdownSyncEngine() {
if (syncEngineInstance !is null) {
addLogEntry("Shutdown Sync Engine instance", ["debug"]);
addLogEntry("Shutting down Sync Engine instance", ["debug"]);
syncEngineInstance.shutdown(); // Make sure any running thread completes first
object.destroy(syncEngineInstance);
syncEngineInstance = null;
addLogEntry("Shut down Sync Engine instance is complete", ["debug"]);
}
}
void shutdownDatabase() {
if (itemDB !is null && itemDB.isDatabaseInitialised()) {
addLogEntry("Shutdown Database instance", ["debug"]);
addLogEntry("Shutting down Database instance", ["debug"]);
addLogEntry("Performing a database vacuum" , ["debug"]);
itemDB.performVacuum();
addLogEntry("Database vacuum is complete" , ["debug"]);
object.destroy(itemDB);
itemDB = null;
addLogEntry("Shut down Database instance is complete", ["debug"]);
}
}
void shutdownAppConfig() {
if (appConfig !is null) {
addLogEntry("Shutdown Application Configuration instance", ["debug"]);
addLogEntry("Shutting down Application Configuration instance", ["debug"]);
if (dryRun) {
// We were running with --dry-run , clean up the applicable database
cleanupDryRunDatabaseFiles(runtimeDatabaseFile);
}
object.destroy(appConfig);
appConfig = null;
addLogEntry("Shut down of Application Configuration instance is complete", ["debug"]);
}
}
@ -1490,4 +1486,14 @@ void shutdownApplicationLogging() {
// Destroy the shared logging buffer
(cast() logBuffer).shutdown();
object.destroy(logBuffer);
}
}
string compilerDetails() {
version(DigitalMars) enum compiler = "DMD";
else version(LDC) enum compiler = "LDC";
else version(GNU) enum compiler = "GDC";
else enum compiler = "Unknown compiler";
string compilerString = compiler ~ " " ~ to!string(__VERSION__);
return compilerString;
}

View file

@ -118,13 +118,11 @@ class OneDriveApi {
// The destructor should only clean up resources owned directly by this instance
~this() {
if (curlEngine !is null) {
curlEngine = null;
}
if (response !is null) {
response = null;
}
object.destroy(response);
object.destroy(curlEngine);
response = null;
curlEngine = null;
appConfig = null;
}
// Initialise the OneDrive API class
@ -518,6 +516,8 @@ class OneDriveApi {
} else {
url = itemByPathUrl ~ encodeComponent(path) ~ ":/";
}
// Add select clause
url ~= "?select=id,name,eTag,cTag,deleted,file,folder,root,fileSystemInfo,remoteItem,parentReference,size";
return get(url);
}
@ -526,7 +526,7 @@ class OneDriveApi {
JSONValue getPathDetailsById(string driveId, string id) {
string url;
url = driveByIdUrl ~ driveId ~ "/items/" ~ id;
//url ~= "?select=id,name,eTag,cTag,deleted,file,folder,root,fileSystemInfo,remoteItem,parentReference,size";
url ~= "?select=id,name,eTag,cTag,deleted,file,folder,root,fileSystemInfo,remoteItem,parentReference,size";
return get(url);
}
@ -551,6 +551,7 @@ class OneDriveApi {
// https://learn.microsoft.com/en-us/onedrive/developer/rest-api/concepts/addressing-driveitems?view=odsp-graph-online
// Required format: /drives/{drive-id}/root:/{item-path}:
url = driveByIdUrl ~ driveId ~ "/root:/" ~ encodeComponent(path) ~ ":";
url ~= "?select=id,name,eTag,cTag,deleted,file,folder,root,fileSystemInfo,remoteItem,parentReference,size";
return get(url);
}
@ -570,6 +571,8 @@ class OneDriveApi {
// configure deltaLink to query
if (deltaLink.empty) {
url = driveByIdUrl ~ driveId ~ "/items/" ~ id ~ "/delta";
// Reduce what we ask for in the response - which reduces the data transferred back to us, and reduces what is held in memory during initial JSON processing
url ~= "?select=id,name,eTag,cTag,deleted,file,folder,root,fileSystemInfo,remoteItem,parentReference,size";
} else {
url = deltaLink;
}
@ -588,7 +591,7 @@ class OneDriveApi {
// configure URL to query
if (nextLink.empty) {
url = driveByIdUrl ~ driveId ~ "/items/" ~ id ~ "/children";
//url ~= "?select=id,name,eTag,cTag,deleted,file,folder,root,fileSystemInfo,remoteItem,parentReference,size";
url ~= "?select=id,name,eTag,cTag,deleted,file,folder,root,fileSystemInfo,remoteItem,parentReference,size";
} else {
url = nextLink;
}

File diff suppressed because it is too large Load diff

View file

@ -1088,31 +1088,29 @@ void displayMemoryUsagePreGC() {
// Display internal memory stats post garbage collection + RSS (actual memory being used)
void displayMemoryUsagePostGC() {
// Display memory usage
addLogEntry("Memory Usage POST Garbage Collection (KB)");
addLogEntry("-----------------------------------------------------");
writeMemoryStats();
// Query the actual Resident Set Size (RSS) for the PID
pid_t pid = getCurrentPID();
ulong rss = getRSS(pid);
addLogEntry("current Resident Set Size (RSS) = " ~ to!string(rss)); // actual memory in RAM used by the process - this needs to remain stable, already in KB
// Is there a previous value
if (previousRSS != 0) {
addLogEntry("previous Resident Set Size (RSS) = " ~ to!string(previousRSS)); // actual memory in RAM used by the process - this needs to remain stable, already in KB
// Increase or decrease in RSS
if (rss > previousRSS) {
addLogEntry("difference in Resident Set Size (RSS) = +" ~ to!string((rss - previousRSS))); // Difference in actual memory used
} else {
addLogEntry("difference in Resident Set Size (RSS) = -" ~ to!string((previousRSS - rss))); // Difference in actual memory used
}
}
// Update previous RSS with new value
previousRSS = rss;
// Closout
// Display memory usage title
addLogEntry("Memory Usage POST Garbage Collection (KB)");
addLogEntry("-----------------------------------------------------");
writeMemoryStats(); // Assuming this function logs memory stats correctly
// Query the actual Resident Set Size (RSS) for the PID
pid_t pid = getCurrentPID();
ulong rss = getRSS(pid);
// Check and log the previous RSS value
if (previousRSS != 0) {
addLogEntry("previous Resident Set Size (RSS) = " ~ to!string(previousRSS) ~ " KB");
// Calculate and log the difference in RSS
long difference = rss - previousRSS; // 'difference' can be negative, use 'long' to handle it
string sign = difference > 0 ? "+" : (difference < 0 ? "" : ""); // Determine the sign for display, no sign for zero
addLogEntry("difference in Resident Set Size (RSS) = " ~ sign ~ to!string(difference) ~ " KB");
}
// Update previous RSS with the new value
previousRSS = rss;
// Closout
addLogEntry();
}
@ -1121,6 +1119,15 @@ void writeMemoryStats() {
addLogEntry("current memory usedSize = " ~ to!string((GC.stats.usedSize/1024))); // number of used bytes on the GC heap (might only get updated after a collection)
addLogEntry("current memory freeSize = " ~ to!string((GC.stats.freeSize/1024))); // number of free bytes on the GC heap (might only get updated after a collection)
addLogEntry("current memory allocatedInCurrentThread = " ~ to!string((GC.stats.allocatedInCurrentThread/1024))); // number of bytes allocated for current thread since program start
// Query the actual Resident Set Size (RSS) for the PID
pid_t pid = getCurrentPID();
ulong rss = getRSS(pid);
// The RSS includes all memory that is currently marked as occupied by the process.
// Over time, the heap can become fragmented. Even after garbage collection, fragmented memory blocks may not be contiguous enough to be returned to the OS, leading to an increase in the reported memory usage despite having free space.
// This includes memory that might not be actively used but has not been returned to the system.
// The GC.minimize() function can sometimes cause an increase in RSS due to how memory pages are managed and freed.
addLogEntry("current Resident Set Size (RSS) = " ~ to!string(rss) ~ " KB"); // actual memory in RAM used by the process at this point in time
}
// Return the username of the UID running the 'onedrive' process