Spark Streaming Kinesis 통합 : 작업자에서 LeaseCoordinator를 초기화하는 동안 오류가 발생했습니다.

프랑코 테 스토리

스칼라에서 kinesis 애플리케이션으로 간단한 바닐라 스파크 스트리밍을 실행할 때 몇 가지 문제가 발생했습니다. SnowplowWordCountASL같은 일부 자습서의 기본 지침을 따랐습니다 .

그러나 다음 Kinesis 작업자 오류로 인해 여전히 작동하게 할 수 없습니다.

16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
    ... 4 more

다음은 내 코드 샘플입니다.

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

/**
  * Created by franco on 11/11/16.
  */
object TestApp {
  // === Configurations for Kinesis streams ===
  val awsAccessKeyId = "XXXXXX"
  val awsSecretKey = "XXXXXXX"
  val kinesisStreamName = "MyStream"
  val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
  val appName = "MyAppName"

  def main(args: Array[String]): Unit = {

    val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)

    val provider = new StaticCredentialsProvider(credentials)

    val kinesisClient = new AmazonKinesisClient(provider)
    kinesisClient.setEndpoint(kinesisEndpointUrl)

    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()

    val streams = shards

    val batchInterval = Milliseconds(2000)

    val kinesisCheckpointInterval = batchInterval

    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName

    val cores : Int = Runtime.getRuntime.availableProcessors()
    println("Available Cores : " + cores.toString)
    val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
    val ssc = new StreamingContext(config, batchInterval)

    // Create the Kinesis DStreams
    val kinesisStreams = (0 until streams).map { i =>
      KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
        InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
    }

    ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }


}

내 IAM 정책은 다음과 같습니다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:region:account:stream/name"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DeleteItem",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:Scan",
                "dynamodb:UpdateItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:region:account:table/name"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

이 앱의 문제점을 파악할 수 없습니다. 이 주제에 대한 모든 지침을 주시면 감사하겠습니다.

Ryan Slipetz

AWS 액세스 키 및 보안 키를 전달할 수있는 DStream의 다른 생성자가 있습니다.

예를 들어 아래 링크의 첫 번째 및 다섯 번째 생성자는 시스템 속성을 설정하는 대신 생성자에 전달할 수 있습니다 (시스템을 통해 전달해야 함).

KinesisUtil 생성자

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

Related 관련 기사

뜨겁다태그

보관