Size: a a a

2019 September 19

VE

Vladimir E. in Data Engineers
А как это в kylo было реализовано?
источник

VE

Vladimir E. in Data Engineers
Конекшон между nifi и спарком
источник

RI

Rustam Iksanov in Data Engineers
Инженеры! Есть вопрос по spark kafka streaming и последующих вычислениях. Единственный вариант, при котором у меня получается передать sparkSession внутрь rdd это такой:
 kafkaDStream.foreachRDD { rdd =>
     if (!rdd.isEmpty()) {
       val df = rdd.toDF().as[A]
       val spark =
         SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
       import spark.implicits._
       df.collect().foreach(_ => ())
но этот вариант плох тем, что все вычисление пойдет на одном драйвере(как я понимаю). если убрать collect сразу получаю ошибку с A master URL must be set in your configuration
источник

AZ

Anton Zadorozhniy in Data Engineers
Vladimir E.
А как это в kylo было реализовано?
Нафаня в кайле только для первичного приземления в сторадж и оркестрации джобов, данные напрямую из нафани в Спарк не передаются
источник

VE

Vladimir E. in Data Engineers
Ага, то есть там Спарк стриминг бежит и просто мониторит какую то папку или там батчевый Спарк по тригеру ?
источник

AZ

Anton Zadorozhniy in Data Engineers
Там батчевый спарк используется, входящий стрим лэндится на сторадж нафаней тоже
источник

AB

Andrei Boaghe in Data Engineers
Всем привет,
нужна помощь :)
Вылезает вот такой вот странный эксэпшн из Spark Job'a (2.1.0):

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/tu099trasparenza/cache/condizioniArricchite/_temporary/0/_temporary/attempt_20190919123523_0014_m_000151_0/part-00151-0b7285cc-52d5-49db-97d2-129c716ff5e4.snappy.parquet (inode 59805817): File does not exist. Holder DFSClient_attempt_20190919123523_0014_m_000149_0_-1044167922_250 does not have any open files.

Никто не сталкивался?
PS: в  папку "cache" я руками сохраняю промежуточные DataFrame (parquet), чтобы потом (при следующем запуске джоба) их переиспользовать (считываю parquet, а не генерю заново DF)
источник

DP

Dumitru Preguza in Data Engineers
Andrei Boaghe
Всем привет,
нужна помощь :)
Вылезает вот такой вот странный эксэпшн из Spark Job'a (2.1.0):

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/tu099trasparenza/cache/condizioniArricchite/_temporary/0/_temporary/attempt_20190919123523_0014_m_000151_0/part-00151-0b7285cc-52d5-49db-97d2-129c716ff5e4.snappy.parquet (inode 59805817): File does not exist. Holder DFSClient_attempt_20190919123523_0014_m_000149_0_-1044167922_250 does not have any open files.

Никто не сталкивался?
PS: в  папку "cache" я руками сохраняю промежуточные DataFrame (parquet), чтобы потом (при следующем запуске джоба) их переиспользовать (считываю parquet, а не генерю заново DF)
источник

RI

Rustam Iksanov in Data Engineers
Rustam Iksanov
Инженеры! Есть вопрос по spark kafka streaming и последующих вычислениях. Единственный вариант, при котором у меня получается передать sparkSession внутрь rdd это такой:
 kafkaDStream.foreachRDD { rdd =>
     if (!rdd.isEmpty()) {
       val df = rdd.toDF().as[A]
       val spark =
         SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
       import spark.implicits._
       df.collect().foreach(_ => ())
но этот вариант плох тем, что все вычисление пойдет на одном драйвере(как я понимаю). если убрать collect сразу получаю ошибку с A master URL must be set in your configuration
У кого-нибудь есть опыт?
источник

AC

Alexander Chermenin in Data Engineers
Rustam Iksanov
Инженеры! Есть вопрос по spark kafka streaming и последующих вычислениях. Единственный вариант, при котором у меня получается передать sparkSession внутрь rdd это такой:
 kafkaDStream.foreachRDD { rdd =>
     if (!rdd.isEmpty()) {
       val df = rdd.toDF().as[A]
       val spark =
         SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
       import spark.implicits._
       df.collect().foreach(_ => ())
но этот вариант плох тем, что все вычисление пойдет на одном драйвере(как я понимаю). если убрать collect сразу получаю ошибку с A master URL must be set in your configuration
чего вы этим хотите добиться?
источник

RI

Rustam Iksanov in Data Engineers
Я хочу добится вычисления. Проблема в том, что подкапотом foreach куча селектов и джоинов.
источник

AC

Alexander Chermenin in Data Engineers
версии спарка после 2.0 позволяют джойнить стримы с static-DF, может лучше в эту сторону покопать?
источник

RI

Rustam Iksanov in Data Engineers
Alexander Chermenin
версии спарка после 2.0 позволяют джойнить стримы с static-DF, может лучше в эту сторону покопать?
из кафки приходят данные для селекта. Пока не очень понимаю, как там делать джоин.
источник

RI

Rustam Iksanov in Data Engineers
Еще вопросик. Есть спарк приложение. Один из методов внутри создает темпвью из него делает селект и возвращает результат селекта. Вопрос как долго будет жить эта темпвью?
источник

R

Renarde in Data Engineers
Rustam Iksanov
Еще вопросик. Есть спарк приложение. Один из методов внутри создает темпвью из него делает селект и возвращает результат селекта. Вопрос как долго будет жить эта темпвью?
пока приложение не умрет, кто-то не дропнет этот вью (в приложении) или сделает .createOrReplaceTempView
источник

RI

Rustam Iksanov in Data Engineers
Renarde
пока приложение не умрет, кто-то не дропнет этот вью (в приложении) или сделает .createOrReplaceTempView
То есть, если данных много, то они будут захламлять память?
источник

R

Renarde in Data Engineers
вью не держит никаких данных. это lazyval, который вычислится в момент запроса к нему
источник

RI

Rustam Iksanov in Data Engineers
Renarde
вью не держит никаких данных. это lazyval, который вычислится в момент запроса к нему
Вот он вычислился. Процесс по приложению идет дальше, а этот вью занимает место, так?
источник

R

Renarde in Data Engineers
Rustam Iksanov
Вот он вычислился. Процесс по приложению идет дальше, а этот вью занимает место, так?
вычислился, отдал результат и забыл его.
вот допустим, у вас есть фрейм:

df = spark.read.format("parquet").load("s3a://something")
df.createOrReplaceTempView("view1")


Он просто указывает в SparkSQL ссылку, на то, что этот вью указывает на данные в S3.
Если вы дальше в приложении выполните:

so
me_results = spark.sql("select c1,c2 from view1").collect()

То в переменную some_results будут собраны результаты исполнения запроса, но сам по себе view1 не будет держать никаких данных.
источник

R

Renarde in Data Engineers
view без кеша / персиста - это просто ссылка на какие-то данные.

другая опция - это когда хочется держать фрейм в памяти, чтобы быстро к нему обращаться. тогда будет вот так:

df = spark.read.format("parquet").load("s3a://something-else").cache()

print(df.count()) // trigger caching

df.createOrReplaceTempView("view2")


тогда этот вью будет висеть в памяти, пока кто-нибудь его не дропнет (или экзекьютор с данными вылетит) или spark-app не получит SIGTERM
источник