스칼라에서 kinesis 애플리케이션으로 간단한 바닐라 스파크 스트리밍을 실행할 때 몇 가지 문제가 발생했습니다. Snowplow 및 WordCountASL 과 같은 일부 자습서의 기본 지침을 따랐습니다 .
그러나 다음 Kinesis 작업자 오류로 인해 여전히 작동하게 할 수 없습니다.
16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
... 4 more
다음은 내 코드 샘플입니다.
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
/**
* Created by franco on 11/11/16.
*/
object TestApp {
// === Configurations for Kinesis streams ===
val awsAccessKeyId = "XXXXXX"
val awsSecretKey = "XXXXXXX"
val kinesisStreamName = "MyStream"
val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
val appName = "MyAppName"
def main(args: Array[String]): Unit = {
val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)
val provider = new StaticCredentialsProvider(credentials)
val kinesisClient = new AmazonKinesisClient(provider)
kinesisClient.setEndpoint(kinesisEndpointUrl)
val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()
val streams = shards
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName
val cores : Int = Runtime.getRuntime.availableProcessors()
println("Available Cores : " + cores.toString)
val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
val ssc = new StreamingContext(config, batchInterval)
// Create the Kinesis DStreams
val kinesisStreams = (0 until streams).map { i =>
KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
}
ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
}
내 IAM 정책은 다음과 같습니다.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt123",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:region:account:stream/name"
]
},
{
"Sid": "Stmt456",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DeleteItem",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Scan",
"dynamodb:UpdateItem"
],
"Resource": [
"arn:aws:dynamodb:region:account:table/name"
]
},
{
"Sid": "Stmt789",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": [
"*"
]
}
]
}
이 앱의 문제점을 파악할 수 없습니다. 이 주제에 대한 모든 지침을 주시면 감사하겠습니다.
AWS 액세스 키 및 보안 키를 전달할 수있는 DStream의 다른 생성자가 있습니다.
예를 들어 아래 링크의 첫 번째 및 다섯 번째 생성자는 시스템 속성을 설정하는 대신 생성자에 전달할 수 있습니다 (시스템을 통해 전달해야 함).
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다