Optimize Progress bar
This commit is contained in:
parent
e371ae1052
commit
6fe6a1c759
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
out/
|
||||||
|
settings.json
|
||||||
|
.idea/
|
||||||
|
venv/
|
||||||
|
venv_win/
|
||||||
|
test/
|
@ -5,6 +5,7 @@ import struct
|
|||||||
from base64 import b64decode
|
from base64 import b64decode
|
||||||
from multiprocessing import Process, Queue
|
from multiprocessing import Process, Queue
|
||||||
from queue import Empty
|
from queue import Empty
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
from progress.bar import Bar
|
from progress.bar import Bar
|
||||||
from Cryptodome.Cipher import AES
|
from Cryptodome.Cipher import AES
|
||||||
@ -16,7 +17,7 @@ from modules.functions.get_song import get_song_lyric
|
|||||||
from modules.utils.inputs import cinput, rinput
|
from modules.utils.inputs import cinput, rinput
|
||||||
|
|
||||||
|
|
||||||
def load_information_from_song(path):
|
def load_information_from_song(path) -> str | dict:
|
||||||
"""从音乐文件中的 Comment 字段获取 163 key 并解密返回歌曲信息"""
|
"""从音乐文件中的 Comment 字段获取 163 key 并解密返回歌曲信息"""
|
||||||
file = File(path) # 使用 mutagen 获取歌曲信息
|
file = File(path) # 使用 mutagen 获取歌曲信息
|
||||||
if os.path.splitext(path)[-1] == ".mp3": # 当文件为 mp3 时使用 ID3 格式读取
|
if os.path.splitext(path)[-1] == ".mp3": # 当文件为 mp3 时使用 ID3 格式读取
|
||||||
@ -64,10 +65,12 @@ def load_information_from_song(path):
|
|||||||
return "decrypt_failed"
|
return "decrypt_failed"
|
||||||
|
|
||||||
|
|
||||||
def load_and_decrypt_from_ncm(file_path, targetdir): # nondanee的源代码, 根据需求更改了某些东西
|
def load_and_decrypt_from_ncm(file_path, target_dir) -> dict: # nondanee的源代码, 根据需求更改了某些东西
|
||||||
core_key = binascii.a2b_hex("687A4852416D736F356B496E62617857")
|
core_key = binascii.a2b_hex("687A4852416D736F356B496E62617857")
|
||||||
meta_key = binascii.a2b_hex("2331346C6A6B5F215C5D2630553C2728")
|
meta_key = binascii.a2b_hex("2331346C6A6B5F215C5D2630553C2728")
|
||||||
unpad = lambda s: s[0:-(s[-1] if type(s[-1]) == int else ord(s[-1]))]
|
|
||||||
|
def unpad(s):
|
||||||
|
return s[0:-(s[-1] if type(s[-1]) == int else ord(s[-1]))]
|
||||||
f = open(file_path, 'rb')
|
f = open(file_path, 'rb')
|
||||||
header = f.read(8)
|
header = f.read(8)
|
||||||
assert binascii.b2a_hex(header) == b'4354454e4644414d'
|
assert binascii.b2a_hex(header) == b'4354454e4644414d'
|
||||||
@ -84,7 +87,6 @@ def load_and_decrypt_from_ncm(file_path, targetdir): # nondanee的源代码,
|
|||||||
key_length = len(key_data)
|
key_length = len(key_data)
|
||||||
key_data = bytearray(key_data)
|
key_data = bytearray(key_data)
|
||||||
key_box = bytearray(range(256))
|
key_box = bytearray(range(256))
|
||||||
c = 0
|
|
||||||
last_byte = 0
|
last_byte = 0
|
||||||
key_offset = 0
|
key_offset = 0
|
||||||
for i in range(256):
|
for i in range(256):
|
||||||
@ -110,13 +112,13 @@ def load_and_decrypt_from_ncm(file_path, targetdir): # nondanee的源代码,
|
|||||||
meta_data = json.loads(meta_data)
|
meta_data = json.loads(meta_data)
|
||||||
crc32 = f.read(4)
|
crc32 = f.read(4)
|
||||||
crc32 = struct.unpack('<I', bytes(crc32))[0]
|
crc32 = struct.unpack('<I', bytes(crc32))[0]
|
||||||
|
..., crc32
|
||||||
f.seek(5, 1)
|
f.seek(5, 1)
|
||||||
image_size = f.read(4)
|
image_size = f.read(4)
|
||||||
image_size = struct.unpack('<I', bytes(image_size))[0]
|
image_size = struct.unpack('<I', bytes(image_size))[0]
|
||||||
image_data = f.read(image_size)
|
image_data = f.read(image_size)
|
||||||
file_name = f.name.split("/")[-1].split(".ncm")[0] + '.' + meta_data['format']
|
file_name = f.name.split("/")[-1].split(".ncm")[0] + '.' + meta_data['format']
|
||||||
m = open(os.path.join(targetdir, file_name), 'wb')
|
m = open(os.path.join(target_dir, file_name), 'wb')
|
||||||
chunk = bytearray()
|
|
||||||
while True:
|
while True:
|
||||||
chunk = bytearray(f.read(0x8000))
|
chunk = bytearray(f.read(0x8000))
|
||||||
chunk_length = len(chunk)
|
chunk_length = len(chunk)
|
||||||
@ -131,7 +133,7 @@ def load_and_decrypt_from_ncm(file_path, targetdir): # nondanee的源代码,
|
|||||||
|
|
||||||
# 对解密后的文件进行信息补全
|
# 对解密后的文件进行信息补全
|
||||||
if meta_data["format"] == "mp3": # 针对 mp3 使用 ID3 进行信息补全
|
if meta_data["format"] == "mp3": # 针对 mp3 使用 ID3 进行信息补全
|
||||||
audio = ID3(os.path.join(targetdir, os.path.splitext(file_path.split("/")[-1])[0] + ".mp3"))
|
audio = ID3(os.path.join(target_dir, os.path.splitext(file_path.split("/")[-1])[0] + ".mp3"))
|
||||||
artists = []
|
artists = []
|
||||||
for i in meta_data["artist"]:
|
for i in meta_data["artist"]:
|
||||||
artists.append(i[0])
|
artists.append(i[0])
|
||||||
@ -142,7 +144,7 @@ def load_and_decrypt_from_ncm(file_path, targetdir): # nondanee的源代码,
|
|||||||
audio["TALB"] = TALB(encoding=3, text=[meta_data["album"]]) # 插入专辑名
|
audio["TALB"] = TALB(encoding=3, text=[meta_data["album"]]) # 插入专辑名
|
||||||
audio.save()
|
audio.save()
|
||||||
elif meta_data["format"] == "flac": # 针对 flac 使用 FLAC 进行信息补全
|
elif meta_data["format"] == "flac": # 针对 flac 使用 FLAC 进行信息补全
|
||||||
audio = flac.FLAC(os.path.join(targetdir, os.path.splitext(file_path.split("/")[-1])[0] + ".flac"))
|
audio = flac.FLAC(os.path.join(target_dir, os.path.splitext(file_path.split("/")[-1])[0] + ".flac"))
|
||||||
artists = []
|
artists = []
|
||||||
for i in meta_data["artist"]:
|
for i in meta_data["artist"]:
|
||||||
artists.append(i[0])
|
artists.append(i[0])
|
||||||
@ -198,6 +200,7 @@ def get_lyric_from_folder(self):
|
|||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
target_path = ""
|
||||||
if ncm_files:
|
if ncm_files:
|
||||||
while True:
|
while True:
|
||||||
print(f"\n发现{len(ncm_files)}个ncm加密文件!")
|
print(f"\n发现{len(ncm_files)}个ncm加密文件!")
|
||||||
@ -214,7 +217,7 @@ def get_lyric_from_folder(self):
|
|||||||
target_path = self.settings.lyric_path
|
target_path = self.settings.lyric_path
|
||||||
break
|
break
|
||||||
elif select == '3':
|
elif select == '3':
|
||||||
target_path = input("请输入: ").strip()
|
target_path = cinput("请输入: ")
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
print("输入无效!按回车继续...")
|
print("输入无效!按回车继续...")
|
||||||
@ -226,18 +229,22 @@ def get_lyric_from_folder(self):
|
|||||||
max_process = 20 # 最大进程数
|
max_process = 20 # 最大进程数
|
||||||
current_process = 0 # 当前正在活动的进程数
|
current_process = 0 # 当前正在活动的进程数
|
||||||
passed = 0 # 总共结束的进程数
|
passed = 0 # 总共结束的进程数
|
||||||
with Bar("正在破解", max=len(ncm_files)) as bar:
|
with Bar(f"正在破解 %(index){len(str(len(ncm_files)))}d/%(max)d",
|
||||||
|
suffix="", max=len(ncm_files), color="blue") as bar:
|
||||||
total = len(ncm_files)
|
total = len(ncm_files)
|
||||||
finished = 0 # 已经分配的任务数量
|
allocated = 0 # 已经分配的任务数量
|
||||||
while True: # 进入循环,执行 新建进程->检测队列->检测任务完成 的循环
|
while True: # 进入循环,执行 新建进程->检测队列->检测任务完成 的循环
|
||||||
if current_process <= max_process and finished < total: # 分配进程
|
sleep(0)
|
||||||
|
if current_process <= max_process and allocated < total: # 分配进程
|
||||||
Process(target=process_work,
|
Process(target=process_work,
|
||||||
args=(os.path.join(path, ncm_files[finished]),
|
args=(os.path.join(path, ncm_files[allocated]),
|
||||||
ncm_files[finished],
|
ncm_files[allocated],
|
||||||
target_path,
|
target_path,
|
||||||
q_err,
|
q_err,
|
||||||
q_info)).start()
|
q_info)).start()
|
||||||
finished += 1
|
allocated += 1
|
||||||
|
bar.suffix = f"已分配: {ncm_files[allocated-1]}"
|
||||||
|
bar.update()
|
||||||
while True: # 错误队列检测
|
while True: # 错误队列检测
|
||||||
try:
|
try:
|
||||||
errors.append(q_err.get_nowait())
|
errors.append(q_err.get_nowait())
|
||||||
@ -253,6 +260,8 @@ def get_lyric_from_folder(self):
|
|||||||
musics.append({"id": r['musicId'], "name": r["musicName"], "artists": r["artist"]})
|
musics.append({"id": r['musicId'], "name": r["musicName"], "artists": r["artist"]})
|
||||||
passed += 1
|
passed += 1
|
||||||
current_process -= 1
|
current_process -= 1
|
||||||
|
bar.suffix = f"已完成: {r['musicName']} - "\
|
||||||
|
f"{''.join([x + ', ' for x in [x[0] for x in r['artist']]])[:-2]}"
|
||||||
bar.next()
|
bar.next()
|
||||||
except Empty:
|
except Empty:
|
||||||
break
|
break
|
||||||
@ -279,6 +288,7 @@ def get_lyric_from_folder(self):
|
|||||||
lyric_path = path
|
lyric_path = path
|
||||||
break
|
break
|
||||||
elif r == "2":
|
elif r == "2":
|
||||||
|
lyric_path = self.settings.lyric_path
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
|
Loading…
Reference in New Issue
Block a user