병렬 처리를 위해 ThreadedProcessPoolExecutor에서 함수를 실행하는 Django 서버를 실행하고 있지만 완벽하게 실행 한 후에는 멈췄습니다!
나는 그것에 대해 약간의 조사를했고 이제 교착 상태라고 생각하고 모든 것을 변경하려고 시도했지만 아무것도 작동하지 않습니다.
아래는 그 코드입니다.
import json
import os
from time import sleep
import shutil
from HardCode.scripts import BL0
from HardCode.scripts.cibil.Analysis import analyse
from HardCode.scripts.cibil.apicreditdata import convert_to_df
from analysisnode.settings import PROCESSING_DOCS, CHECKSUM_KEY, FINAL_RESULT
from threadedprocess import ThreadedProcessPoolExecutor
from analysisnode import Checksum
import requests
def parallel_proccess_user_records(user_id):
user_data = json.load(open(PROCESSING_DOCS + str(user_id) + '/user_data.json'))
cibil_df = {'status': False, 'data': None, 'message': 'None'}
if os.path.exists(PROCESSING_DOCS + str(user_id) + '/experian_cibil.xml'):
response_parser = convert_to_df(open(PROCESSING_DOCS + str(user_id) + '/experian_cibil.xml'))
cibil_df = response_parser
sms_json = json.load(open(PROCESSING_DOCS + str(user_id) + '/sms_data.json', 'rb'))
try:
if len(sms_json) == 0:
limit = analyse(user_id=user_id, current_loan=user_data['current_loan_amount'], cibil_df=cibil_df,
new_user=user_data['new_user'], cibil_score=user_data['cibil_score'])
response_bl0 = {
"cust_id": user_id,
"status": True,
"message": "No messages found in sms_json",
"result": {
"loan_salary": -9,
"loan": -9,
"salary": -9,
"cibil": limit
}
}
else:
response_bl0 = BL0.bl0(cibil_xml=cibil_df, cibil_score=user_data['cibil_score'], user_id=int(user_id)
, new_user=user_data['new_user'], list_loans=user_data['all_loan_amount'],
current_loan=user_data['current_loan_amount'], sms_json=sms_json)
shutil.rmtree(PROCESSING_DOCS + str(user_id))
try:
os.makedirs(FINAL_RESULT + str(user_id))
except FileExistsError:
pass
except Exception as e:
print(f"error in middleware {e}")
limit = analyse(user_id=user_id, current_loan=user_data['current_loan_amount'], cibil_df=cibil_df,
new_user=user_data['new_user'], cibil_score=user_data['cibil_score'])
response_bl0 = {
"cust_id": user_id,
"status": True,
"message": "Exception occurred, I feel lonely in middleware",
"result": {
"loan_salary": -9,
"loan": -9,
"salary": -9,
"cibil": limit
}
}
with open(FINAL_RESULT + str(user_id) + '/user_data.json', 'w') as json_file:
json.dump(response_bl0, json_file, ensure_ascii=True, indent=4)
def process_user_records(user_ids):
with ThreadedProcessPoolExecutor(max_processes=8, max_threads=16) as p:
p.map(parallel_proccess_user_records, user_ids)
if __name__ == "__main__":
while True:
no_of_dirs = len(os.listdir(PROCESSING_DOCS))
if no_of_dirs > 0:
directories = os.listdir(PROCESSING_DOCS)
user_ids = [user_id for user_id in directories]
process_user_records(user_ids)
print("***********")
print("Done : ")
print(user_ids)
print("SLEEPING.....zzzzzz")
sleep(10)
코드 설명 :이 코드는 PROCESSING_DOCS라는 폴더의 변경 사항을 찾기 위해 무한 루프로 실행됩니다. 새 파일이 해당 폴더에 추가되면 다른 모든 파일에서 코드가 자동으로 실행되어 유휴 상태로 유지됩니다.
내가 여기서 뭘 잘못하고 있니?
확실하지 않지만 ProcessPoolExecutor
호출을 실행하기 위해 프로세스 풀을 사용 하기 때문에 사용해 볼 수 있습니다 asynchronously
. 그리고 그것은 GIL을 회피 합니다.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다