각 Ftp 파일에 대해 동시에 Spring Integration 흐름 실행

Abdur Rehman

Ftp 서버에서 파일 Ftp.inboundChannelAdapter을으로 변환하는 Java DSL을 사용하여 구성된 통합 흐름이 있습니다 JobRequest. 그러면 .handle()배치 작업을 트리거 하는 방법이 있으며 모든 것이 필요에 따라 작동하지만 FTP 내부의 각 파일에 대해 순차적으로 실행되는 프로세스가 있습니다. 폴더

currentThreadName내 Transformer Endpoint에 추가 하여 각 파일에 대해 동일한 스레드 이름을 인쇄했습니다.

여기 내가 지금까지 시도한 것입니다

1. 작업 실행자 빈

 @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("Integration");

    }

2. 통합 흐름

  @Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
    return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
                    .remoteDirectory("/bar")
                    .localDirectory(localDir.getFile())
            ,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
            .transform(fileMessageToJobRequest(importUserJob(step1())))
            .handle(jobLaunchingGateway)
            .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
            .route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
            .get();
}

3.I 또한 내가해야 할 또 다른 SO 스레드에서 읽을 ExecutorChannel내가 하나를 구성 할 수 있도록하지만 난 내이 채널을 주입하는 방법을 모른다 Ftp.inboundAdapter로그에서 채널이 항상 것을 볼 수있다, integrationFlow.channel#0나는 추측한다DirectChannel

 @Bean
public MessageChannel inputChannel() {
    return new ExecutorChannel(taskExecutor());
}

내가 여기서 무엇을 놓치고 있는지 모르겠거나 Spring 및 Spring-Integration에 매우 익숙하기 때문에 Spring Messaging System을 제대로 이해하지 못했을 수 있습니다.

어떤 도움을 주시면 감사하겠습니다

감사

아르 템 빌란

ExecutorChannel당신은 단순히 흐름에 주입 할 수 있으며이에 적용 할 것입니다 SourcePollingChannelAdapter프레임 워크. 그래서 inputChannel이것을 빈으로 정의 하면 다음과 같이합니다.

.channel(inputChannel())

당신의 전 .transform(fileMessageToJobRequest(importUserJob(step1()))). 문서에서 더보기 : https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels

반면에 .taskExecutor(taskExecutor())구성 에 따라 파일을 병렬로 처리 하려면 .maxMessagesPerPoll(20)as 1. 의 논리는 다음 AbstractPollingEndpoint과 같습니다.

this.taskExecutor.execute(() -> {
                int count = 0;
                while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                    if (pollForMessage() == null) {
                        break;
                    }
                    count++;
                }

그래서 우리는 병렬로 작업을해야합니까, 그러나 그 도달 할 경우에만 maxMessagesPerPoll그것은 어디 20현재의 경우. 문서에 몇 가지 설명이 있습니다 : https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer

maxMessagesPerPoll 속성은 지정된 폴링 작업 내에서 수신 할 최대 메시지 수를 지정합니다. 이는 폴러가 널이 리턴되거나 최대 값에 도달 할 때까지 기다리지 않고 receive ()를 계속 호출 함을 의미합니다. 예를 들어, 폴러에 10 초 간격 트리거가 있고 maxMessagesPerPoll 설정이 25이고 큐에 100 개의 메시지가있는 채널을 폴링하는 경우 40 초 이내에 100 개의 메시지를 모두 검색 할 수 있습니다. 25 개를 잡고 10 초 동안 기다렸다가 다음 25 개를 잡는 식입니다.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

각 줄에 대해 명령을 실행하기 위해 파일을 반복 할 때 출력 리디렉션

분류에서Dev

GPU 실행 "흐름"대 CPU

분류에서Dev

Airflow : 각 파일에 대해 DAG를 실행하는 적절한 방법

분류에서Dev

여러 SQS 대기열에서 소비하는 작업자로서의 여러 Spring Integration 흐름

분류에서Dev

각 문자에 대해 새 행을 쓰는 CSV 파일

분류에서Dev

Powershell : 각 폴더에 대해 모든 파일 이름 추가

분류에서Dev

각 실행에 대해 Julia에 새 디렉토리 만들기

분류에서Dev

Spring Integration : 실제 메시지에 대한 응답 추가

분류에서Dev

각 줄에 대해 명령을 실행하기 위해 파일을 반복 할 때 출력 리디렉션

분류에서Dev

각 비대화 형 bash 세션에 대해 동일한 명령 실행

분류에서Dev

각 비대화 형 bash 세션에 대해 동일한 명령 실행

분류에서Dev

Makefile에서 각 C 파일에 대해 별도의 실행 파일을 어떻게 빌드합니까?

분류에서Dev

각도 js에서 지시문의 실행 흐름과 속성 사용

분류에서Dev

노새에서 흐름의 동시 실행

분류에서Dev

하나의 Spring Integration 흐름에서 2 개의 집계자를 사용할 수 있습니까?

분류에서Dev

여러 단기 TPL 데이터 흐름 대 단일 장기 실행 흐름

분류에서Dev

Node.js의 비동기 실행 흐름에 대해

분류에서Dev

목록 / 배열의 각 항목에 대해 동일한 함수 실행

분류에서Dev

Spring Integration의 메시지 헤더에 파일 이름 설정

분류에서Dev

Spring Integration의 메시지 헤더에 파일 이름 설정

분류에서Dev

Spring Integration에서 흐름 상태 / 실행 상태를 모니터링하는 방법

분류에서Dev

Spring Integration DSL JMS 요청 / 응답 흐름

분류에서Dev

동시에 여러 파일에 대해 동일한 명령 실행

분류에서Dev

아래의 자바 프로그램에서 실행의 흐름과 "this"키워드 실행에 대해 이해하지 못했습니다.

분류에서Dev

Spring Integration-애플리케이션 시작시 흐름 실행

분류에서Dev

파일의 각 행에 대해 병렬로 쉘 스크립트 실행

분류에서Dev

find 명령으로 찾은 각 파일에 대해 Bash 기능 실행

분류에서Dev

텍스트 파일의 각 항목에 대해 Powershell 실행 명령

분류에서Dev

Ubuntu에서 텐서 흐름 실행시 경고

Related 관련 기사

  1. 1

    각 줄에 대해 명령을 실행하기 위해 파일을 반복 할 때 출력 리디렉션

  2. 2

    GPU 실행 "흐름"대 CPU

  3. 3

    Airflow : 각 파일에 대해 DAG를 실행하는 적절한 방법

  4. 4

    여러 SQS 대기열에서 소비하는 작업자로서의 여러 Spring Integration 흐름

  5. 5

    각 문자에 대해 새 행을 쓰는 CSV 파일

  6. 6

    Powershell : 각 폴더에 대해 모든 파일 이름 추가

  7. 7

    각 실행에 대해 Julia에 새 디렉토리 만들기

  8. 8

    Spring Integration : 실제 메시지에 대한 응답 추가

  9. 9

    각 줄에 대해 명령을 실행하기 위해 파일을 반복 할 때 출력 리디렉션

  10. 10

    각 비대화 형 bash 세션에 대해 동일한 명령 실행

  11. 11

    각 비대화 형 bash 세션에 대해 동일한 명령 실행

  12. 12

    Makefile에서 각 C 파일에 대해 별도의 실행 파일을 어떻게 빌드합니까?

  13. 13

    각도 js에서 지시문의 실행 흐름과 속성 사용

  14. 14

    노새에서 흐름의 동시 실행

  15. 15

    하나의 Spring Integration 흐름에서 2 개의 집계자를 사용할 수 있습니까?

  16. 16

    여러 단기 TPL 데이터 흐름 대 단일 장기 실행 흐름

  17. 17

    Node.js의 비동기 실행 흐름에 대해

  18. 18

    목록 / 배열의 각 항목에 대해 동일한 함수 실행

  19. 19

    Spring Integration의 메시지 헤더에 파일 이름 설정

  20. 20

    Spring Integration의 메시지 헤더에 파일 이름 설정

  21. 21

    Spring Integration에서 흐름 상태 / 실행 상태를 모니터링하는 방법

  22. 22

    Spring Integration DSL JMS 요청 / 응답 흐름

  23. 23

    동시에 여러 파일에 대해 동일한 명령 실행

  24. 24

    아래의 자바 프로그램에서 실행의 흐름과 "this"키워드 실행에 대해 이해하지 못했습니다.

  25. 25

    Spring Integration-애플리케이션 시작시 흐름 실행

  26. 26

    파일의 각 행에 대해 병렬로 쉘 스크립트 실행

  27. 27

    find 명령으로 찾은 각 파일에 대해 Bash 기능 실행

  28. 28

    텍스트 파일의 각 항목에 대해 Powershell 실행 명령

  29. 29

    Ubuntu에서 텐서 흐름 실행시 경고

뜨겁다태그

보관