Remove unnused functions of tsjson

pull/178/head
J-Rios 2022-12-30 17:52:16 +01:00
rodzic 6c1ac118a8
commit 3a95884cdb
Nie znaleziono w bazie danych klucza dla tego podpisu
1 zmienionych plików z 2 dodań i 255 usunięć

Wyświetl plik

@ -12,7 +12,7 @@ Creation date:
Last modified date:
30/12/2022
Version:
1.2.1
1.3.0
'''
###############################################################################
@ -61,8 +61,6 @@ class TSjson():
self.lock = Lock()
self.file_name = file_name
###########################################################################
### Raw Read-Write-Delete Methods
def read(self):
'''
@ -120,7 +118,7 @@ class TSjson():
def delete(self):
'''
Remove a JSON file.
Remove JSON file from filesystem.
'''
remove_ok = False
try:
@ -132,254 +130,3 @@ class TSjson():
logger.error(format_exc())
logger.error("Fail to remove JSON file %s", self.file_name)
return remove_ok
###########################################################################
### Data Content Methods
def read_content(self):
'''
Read JSON file content data.
It call to read() function to get the OrderedDict element of the
file JSON data and then return the specific JSON data from the
dict ("content" key).
'''
read = self.read()
if read is None:
return {}
if read == {}:
return {}
return read["Content"]
def write_content(self, data):
'''
Write JSON file content data.
It checks and creates all the needed directories to file path if
any does not exists.
'''
write_result_ok = False
if not data:
return False
# Check for directory path and create all needed directories
directory = os_path.dirname(self.file_name)
if not os_path.exists(directory):
os_makedirs(directory)
# Try to write the file
try:
with self.lock:
# Check if file exists and is not empty
if os_path.exists(self.file_name) \
and os_stat(self.file_name).st_size:
# Read the file, parse to JSON and add read data to
# dictionary content key
with open(self.file_name, "r", encoding="utf-8") as file:
content = json_load(
file,
object_pairs_hook=OrderedDict)
content["Content"].append(data)
# Overwrite the file with the new content data
with open(self.file_name, "w", encoding="utf-8") as file:
json_dump(
content, fp=file, ensure_ascii=False, indent=4)
write_result_ok = True
# If the file doesn't exist or is empty
else:
# Write the file with an empty content data
with open(self.file_name, "w", encoding="utf-8") as file:
file.write("\n{\n \"Content\": []\n}\n")
# Read the file, parse to JSON and add read data to
# dictionary content key
with open(self.file_name, "r", encoding="utf-8") as file:
content = json_load(file)
content["Content"].append(data)
# Overwrite the file with the new content data
with open(self.file_name, "w", encoding="utf-8") as file:
json_dump(
content, fp=file, ensure_ascii=False, indent=4)
write_result_ok = True
except IOError as error:
logger.error(format_exc())
logger.error("I/O fail (%s): %s", error.errno, error.strerror)
except ValueError:
logger.error(format_exc())
logger.error("Data conversion fail")
except Exception:
logger.error(format_exc())
logger.error(
"Fail to write content of JSON file %s",
self.file_name)
return write_result_ok
def is_in(self, data):
'''
Check if provided key exists in JSON file data.
It reads all the JSON file data and check if the key is present.
'''
# Read the file data
file_data = self.read()
if file_data is None:
return False
if file_data == {}:
return False
# Search element with UID
for _data in file_data["Content"]:
if data == _data:
return True
return False
def is_in_position(self, data):
'''
Check if provided key exists in JSON file data and get the
index from where it is located.
'''
i = 0
found = False
# Read the file data
file_data = self.read()
if file_data is None:
return False, -1
if file_data == {}:
return False, -1
# Search and get index of element with UID
for _data in file_data["Content"]:
if data == _data:
found = True
break
i = i + 1
return found, i
def remove_by_uid(self, element_value, uid):
'''
From the JSON file content, search and remove an element that
has a specified Unique Identifier (UID) key.
Note: If there are elements with same UIDs, only the first one
detected will be removed.
'''
found = False
# Read the file data
file_content = self.read_content()
if file_content == {}:
return False
# Search and remove element with UID
for data in file_content:
if data[uid] == element_value:
found = True
file_content.remove(data)
break
# Rewrite to file after deletion
self.clear_content()
if file_content:
self.write_content(file_content[0])
return found
def search_by_uid(self, element_value, uid):
'''
From the JSON file content, search and get an element that has
a specified Unique Identifier (UID) key.
Note: If there are elements with same UIDs, only the first one
detected will be detected.
'''
result = {}
result["found"] = False
result["data"] = None
# Read the file data
file_data = self.read()
if file_data is None:
return result
if file_data == {}:
return result
# Search and get element with UID
for element in file_data["Content"]:
if not element:
continue
if element[uid] == element_value:
result["found"] = True
result["data"] = element
break
return result
def update(self, data, uid):
'''
From the JSON file content, search and update an element that
has a specified Unique Identifier (UID) key.
Note: If there are elements with same UID, only the first one
detected will be updated.
'''
i = 0
found = False
# Read the file data
file_data = self.read()
if file_data is None:
return False
if file_data == {}:
return False
# Search and get index of element with UID
for msg in file_data["Content"]:
if data[uid] == msg[uid]:
found = True
break
i = i + 1
# Update UID element data and overwrite JSON file
if found:
file_data['Content'][i] = data
self.write(file_data)
else:
logger.error("Element with UID no found in JSON file.")
return found
def update_twice(self, data, uid1, uid2):
'''
From the JSON file content, search and update an element that
has two specified Unique Identifier (UID) keys.
Note: If there are elements with same UIDs, only the first one
detected will be updated.
'''
i = 0
found = False
# Read the file data
file_data = self.read()
if file_data is None:
return False
if file_data == {}:
return False
# Search and get index of element with both UIDs
for msg in file_data["Content"]:
if (data[uid1] == msg[uid1]) and (data[uid2] == msg[uid2]):
found = True
break
i = i + 1
# Update UID element data and overwrite JSON file
if found:
file_data["Content"][i] = data
self.write(file_data)
else:
logger.error("Element with UID no found in JSON file.")
return found
def clear_content(self):
'''
Clear data content of the JSON file.
It locks the Mutex access to the file, and if the file exists
and is not empty, the content is cleared to a default skelleton.
'''
clear_ok = False
try:
with self.lock:
if not os_path.exists(self.file_name):
return False
if not os_stat(self.file_name).st_size:
return False
with open(self.file_name, "w", encoding="utf-8") as file:
file.write("\n{\n \"Content\": [\n ]\n}\n")
clear_ok = True
except Exception:
logger.error(format_exc())
logger.error("Fail to clear JSON file %s", self.file_name)
return clear_ok