Вопросы с тегами [apache-spark]

1

голосов
2

ответ
920

Просмотры

pyspark: получение параметров наилучшей модели в после gridsearch пусто {}

может кто-то помочь мне извлечь параметры лучшего исполнения модели от моего поиска сетки? Это пустой словарь по какой-то причине. от pyspark.ml.tuning импорта ParamGridBuilder, TrainValidationSplit, CrossValidator из pyspark.ml.evaluation импорта BinaryClassificationEvaluator поезда, тест = df.randomSplit ([0.66, 0.34], семена = 12345) paramGrid = (ParamGridBuilder () .addGrid (lr.regParam , [0.01,0.1]) .addGrid (lr.elasticNetParam, [1,0,]) .addGrid (lr.maxIter, [3,]) .build ()) оценщик = BinaryClassificationEvaluator (rawPredictionCol = "rawPrediction", labelCol = "купить «) evaluator.setMetricName ( 'areaUnderROC') = CrossValidator резюме (оценка = трубопровод, estimatorParamMaps = paramGrid, вычислитель = оценщик,
user798719
2

голосов
0

ответ
30

Просмотры

Есть ли способ для кэширования на нагрузке?

Есть вариант с sparksession.read () для кэширования на нагрузке? Я читаю XML-файлы из s3, и первый сканирует файлы, чтобы получить схему. Поскольку это читает файлы в любом случае, я бы просто загрузить в то время, так что он только читает все файлы из s3 один раз. Есть какой-либо способ сделать это? Я уже искал каждую комбинацию «искры», «кэш», «загрузка» и «читать» и пошел по крайней мере две страницы глубоко. sparkSession.read () формат ( "com.databricks.spark.xml") .load ( "S3A: //").
user2661771
1

голосов
1

ответ
31

Просмотры

Избегайте печати коды при выполнении сценариев в свече оболочке

Я пытаюсь подключиться к БД с искровой в оболочке с использованием сценариев в файле лестница. При подключении скрипта требуется пароль из другого места, но это печать в консоли искрового оболочки. Я просто хочу, чтобы избежать их. Код в Scala, как показано ниже, Вэл конфигурации = Map ( "драйвер" -> "имя_драйвера", "URL" -> "DbUrl", "пользователь" -> "Имя пользователя", "пароль" -> "741852963"); При загрузке этого кода в искровой оболочке печатается код в искровой оболочке тоже. Я хочу, чтобы они одни части, чтобы не печатать в свече консоли. Как я могу добиться этого?
John Humanyun
0

голосов
0

ответ
11

Просмотры

how to pass a SparkContext from pyspark file to a scala UDF?

У меня есть файл pyspark и мои основные коды, написанные на Python в этом файле. а также у меня есть файл Scala, который содержит некоторые функции, написанные в Scala и использовать их как UDF, в pyspark коде. Теперь мне нужно прочитать файл CSV как Спарк DataFrame в функциях Scala. для этого мне нужно создать SparkSession или SparkContext. Это моя функция Scala: объект GetProductNameScalaUDF {VAL искра = org.apache.spark.sql.SparkSession.builder.master ( "нить") APPNAME ( "некоторое название приложения") getOrCreate () Вал csv_df = spark.read... . формат ( "CSV") вариант ( "заголовок", "истина") нагрузка ( "/ путь / file.csv") Защита SomeFunction (someParameterFromPythonCode: Int). {// Я использую csv_df в этой функции! ...} Защита getProductName (): UserDefinedFunction = UDF (SomeFunction _)}, но SparkContext уже создан в pyspark файл и когда я запускаю его, я сталкиваюсь ошибку вроде этого: org.apache.spark.SparkException: Не удалось получить broadcast_16_piece0 из broadcast_16 я уже видел этот вопрос это как у меня, и я обнаружил, что проблема несколько SparkContext, созданных как в pyspark файле и Scala файл, но я попробовал свои ответы, и они не работали. Я также видел этот вопрос, чтобы пройти SparkContext или SparkSession в качестве параметра из pyspark файла в функции Scala, но от pyspark к функции питона и что не работает ни. Мой вопрос, я создаю SpaekContext и SparkSession в pyspark файл и хочу, чтобы передать их в качестве параметров функций Scala. Как я могу добиться этого? это мой питон код: СБН = SparkContext () sqlContext = SQL.
Ali AzG
0

голосов
0

ответ
3

Просмотры

Как выполнять функции SQL DB2 в SPARKSQL

Мы понимаем, что MYSQL и DB2 являются реляционными базами данных. SQL используется в MYSQL отличается от SQL в DB2 (с помощью некоторых дополнительных функций). При работе с искровым SQL работы с использованием SQL DB2 Я столкнулся с вопросом, что функция не найдено. Но на самом деле эта функция доступна в DB2, но не в MYSQL. multiply_alt это ошибка функции метания. Искра-SQL только для нормального SQL Server? Запуск Спарк-SQL работы с SQL DB2 Exception в потоке "основной" org.apache.spark.sql.AnalysisException: Неопределенная функция: 'multiply_alt'. Эта функция не является ни зарегистрированной временной функцией, ни постоянная функция, зарегистрированной в «по умолчанию» в базе данных .; линия 830 поз 7
Goutham ssc
1

голосов
1

ответ
92

Просмотры

Java Spark: com.mongodb.spark.config.writeconfig вопрос

Я пытаюсь соединиться с MongoDB через искровой разъем Java и я получаю сообщение об ошибке «com.mongodb.spark.config.writeconfig», когда я представить банку и запустить банку в свече оболочки. Здесь скриншот ошибки: Не могли бы вы помочь мне решить эту проблему. Я попытался это, как хорошо, но никакого успеха. . $ / Бен / sparkR --conf "spark.mongodb.input.uri = MongoDB: //127.0.0.1/test.myCollection readPreference = primaryPreferred" ./bin/sparkR --conf «spark.mongodb.output.uri = MongoDB: //127.0.0.1/db.test»./bin/spark-shell --packages org.mongodb.spark: монго-искровой connector_2.11: 2.2.0 $ искровых представить --master местного - класс com.test.spark.SparkClient /home/otalogin/SparkClient.jar $ искровым представить --master местного --class com.test.spark.SparkClient /home/otalogin/SparkClient.jar --jar Монго-искровым connector_2 +0,11: 2.2.0, но получить ту же ошибку.
Tom Swayer
1

голосов
1

ответ
724

Просмотры

Иерархическая манипулирование данными в Apache Спарк

Я имею Dataset в Спарк (v2.1.1) с 3-х столбцов (как показано ниже), содержащий иерархические данные. Моя цель цель состоит в том, чтобы назначить инкрементный нумерацию для каждой строки на основе иерархии родитель-потомок. Графически это можно сказать, что иерархические данные представляет собой совокупность деревьев. Согласно таблице ниже, у меня уже есть строки, сгруппированные на основе «Global_ID». Теперь я хотел бы, чтобы создать колонку «Value» в инкрементном порядке, но на основе иерархии данных из «Родителя» и «ребенок» столбцов. Табличное представление (Значение требуемый выход): + ----------- + -------- + ------- + ----------- + ------- - + ------- + ------- + | Текущий Dataset | | Желаемый Dataset (выход) | + ----------- + -------- + ------- + ----------- + ------- - + ------- + ------- + | Global_ID | родитель | Ребенок | | Global_ID | родитель | Ребенок | Значение | + ----------- + -------- + ------- + ----------- + ------- - + ------- + ------- + | 111 | 111 | 123 | | 111 | 111 | 111 | 1 | | 111 | 135 | 246 | | 111 | 111 | 123 | 2 | | 111 | 123 | 456 | | 111 | 123 | 789 | 3 | | 111 | 123 | 789 | | 111 | 123 | 456 | 4 | | 111 | 111 | 111 | | 111 | 111 | 135 | 5 | | 111 | 135 | 468 | | 111 | 135 | 246 | 6 | | 111 | 135 | 268 | | 111 | 135 | 468 | 7 | | 111 | 268 | 321 | | 111 | 135 | 268 | 8 | | 111 | 138 | 139 | | 111 | 268 | 321 | 9 | | 111 | 111 | 135 | | 111 | 111 | 138 | 10 | | 111 | 111 | 138 | | 111 | 138 | 139 | 11 | | 222 | 222 | 654 | | 222 | 222 | 222 | 12 | | 222 | 654 | 721 | | 222 | 222 | 987 | 13 | | 222 | 222 | 222 | | 222 | 222 | 654 | 14 | | 222 | 721 | 127 | | 222 | 654 | 721 | 15 | | 222 | 222 | 987 | | 222 | 721 | 127 | 16 | | 333 | 333 | 398 | | 333 | 333 | 333 | 17 | | 333 | 333 | 498 | | 333 | 333 | 398 | 18 | | 333 | 333 | 333 | | 333 | 333 | 498 | 19 | | 333 | 333 | 598 | | 333 | 333 | 598 | 20 | + ----------- + -------- + ------- + ----------- + ------- - + ------- + ------- + Дерево Представление (Заданное значение представлено рядом с каждым узлом): + ----- + + ----- + 1 | 111 | 17 | 333 | + - + - + + - + - + | | + --------------- + -------- + ----------------- + + ----- ----- + ---------- + | | | | | | + - v - + + - v - + + - v - + + - v - + + - v - + + - v - + 2 | 123 | 5 | 135 | 10 | 138 | | 398 | | 498 | | 598 | + - + - + - + - + - + - + - + - + - + - + - + - + + + ----- + - ---- + -------- + -------- + | 18 19 20 | | | | | | + - v - + + - v - + + - v - + + - v - + + - v - + + - v - + | 789 | | 456 | | 246 | | 468 | | 268 | | 139 | + ----- + ----- + ----- + ----- + ----- + - + - + + + + ----- 12 | 222 | 3 4 6 7 8 | 11 + - + - + + - v - + | | 321 | + ------ + ------- + + - + - + | | 9 + - v - + + - v - + 13 | 987 | 14 | 654 | + - + - + + - + - + | + - v - + 15 | 721 | + - + - + | + - v - + 16 | 127 | + - + - + Код сниппета: Dataset myDataset = искра .sql ( "выберите Global_ID, родитель, ребенок из ЗАПИСИ"); Технология Стек: Apache Спарк (v2.1.1) Java-8 AWS ОГО кластера (Спарк приложение развертывание) Объем данных: около ~ 20 миллионов строк в наборе данных Подходов Пытался: Свечи Graphx + GraphFrames: Используя эту комбинацию, я мог бы достичь только соотношения между вершины и ребра, но он не подходит для моего случая использования. Справка: https://graphframes.github.io/user-guide.html Спарк Graphx ПРЕГЕЛЯ API: Это ближе всего я мог бы получить к достижению ожидаемого результата, но, к сожалению, я не смог найти код Java фрагмент кода для того же. Пример, приведенный в одном из блогов в Scala, которые я не очень хорошо знаком с. Ссылка: https: // DZone. ком / статьи / обработка иерархической-данные с помощью искры-Graphx пр Любых предложений альтернатив (или) изменений в существующих подходах были бы очень полезно, поскольку я полностью потерял в выяснении решения для этого случая использования. Ценю твою помощь! Спасибо!
Sridher
1

голосов
0

ответ
41

Просмотры

Как сделать Спарк работник чтения данных из локальной MongoDB с MongoDB-искровым разъем?

У меня два «MongoDB» на двух компьютерах. И есть также «Спарк работник» на каждом компьютере. Но когда я запускаю «искру», он не читает данные из локального «MongoDB». Вместо этого он читает от одного из них. Таким образом, получили только частичные данные. Существует страница. https://docs.mongodb.com/spark-connector/master/faq/ .Однако, после того, как я прочитал это, я не понимаю, как настроить на всех.
BobXWu
1

голосов
1

ответ
283

Просмотры

Искру UDF написана на Java Lambda повышает ClassCastException

Вот исключение: java.lang.ClassCastException: не может назначить экземпляр java.lang.invoke.SerializedLambda для ... типа org.apache.spark.sql.api.java.UDF2 в экземпляре ... Если я не» т реализовать UDF на лямбда-выражения, это нормально. Как: частный UDF2 funUdf = новый UDF2 () {@Override общественного Строка вызова (Строка, строка б) бросает исключение {возвращать удовольствие (а, б); }}; dataset.sparkSession () UDF () регистр ( "Fun", funUdf, DataTypes.StringType)..; functions.callUDF ( "удовольствие" functions.col ( "а"), functions.col ( "б")); Я бегу в местной, так что этот ответ не поможет: https://stackoverflow.com/a/28367602/4164722 Почему? Как я могу это исправить?
secfree
1

голосов
0

ответ
68

Просмотры

Подсчитайте Départ рейсы из отсортированных данных с использованием Спарк

У меня есть набор данных полетов в виде + ---------------- + ---------- + ---------- --- + | flightID | depart_ts | arrival_ts | + ---------------- + ---------- + ------------- + | 1 | 1451603468 | 1451603468 | | 2 | 1451603468 | 1451603468 | | 3 | 1451603468 | 1451603468 | | 4 | 1451603468 | 1451603468 | | 5 | 1451603468 | 1451603468 | + ---------------- + ---------- + ------------- + и моя работа состоит в том, чтобы использовать Apache Спарк, чтобы найти обратный рейс для каждого полета данного некоторых условий (время отправления обратного полета B должен быть в течение 2-х часов с времени прибытия рейса A). Выполнение перекрестного соединения оГО Record, чтобы проверить эти условия не является эффективным и будет стоить много времени. Я думал об использовании функции окна с 1 раздела и пользовательского UDAF сделать расчет. Что-то вроде этого 1. Вал flightsWindow = Window.orderBy ( "depart_ts"). rangeBetween (0, 7200) 2. flights.withColumn ( "returnFlightID", calcReturn ($ "arrival_ts", $ "depart_ts"). над (flightsWindow)). показать () Учитывая, что такой подход приведет к решению, я столкнулся с некоторыми проблемами: в строке 1, я хочу, чтобы интервал диапазона кадров из текущей строки до arrival_ts + 7200, но, видимо, я не могу сделать динамический диапазон в искры, нет? В строке 1 и при условии, что 2 рейса имеет одинаковое время прибытия, это сделает невозможным для извлечения значений второго полета, когда CURRENT_ROW указатель перемещается туда, так как разность между первым полетом и вторым полетом 0. Можно явно указать диапазоне, чтобы начать кадрирование от CURRENT_ROW? В строке 2, Я хочу, чтобы получить значение depart_ts для самой первой строки кадра для сравнения с другими рейсами в кадре. Можно ли сделать это. Я попробовал первую функцию (), но она не подходит в моем случае.
Assem
1

голосов
2

ответ
612

Просмотры

Нажатие вниз предикат фильтра в Спарк JDBC Свойства

Как я могу настроить мою искровые варианты JDBC, чтобы убедиться, что я нажимаю вниз предикат фильтра в базу данных, а не загружать все первым? Я использую искру 2.1. Не удается получить правильный синтаксис для использования, и я знаю, что могу добавить, где положение после загрузки (), но это, очевидно, погрузить все первым. Я пытаюсь ниже, но в то время как этот фильтр будет занять несколько секунд при работе в моем дб клиенте не возвращает ничего и просто продолжает работать, пытаясь оттолкнуть предикат от искры JDBC. Вал jdbcReadOpts = Map ( "URL" -> URL, "драйвер" -> драйвер, "пользователь" -> пользователя, "пароль" -> пройти, "dbtable" -> tblQuery, "inferSchema" -> "истина") Вэл предикат = "ДАТА (TS_COLUMN) = '2018-01-01'" // Также пробовал ->
horatio1701d
1

голосов
0

ответ
145

Просмотры

Spark Dataset.groupBy().count() returns empty table

У меня есть Dataset.where ( «theColumn == число»), поэтому нет пустого поля theColumn. Dataset получен из базы данных Cassandra и все данные присутствуют. Dataset.groupBy ( "theColumn") рассчитывать () шоу ()..; возвращает пустую таблицу, хотя все строки существуют в Dataset, полученные из базы данных. Что может быть проблема? Как это исправить? Я установить значение spark.default.parallelism в конфигурации, но никаких изменений не произошло. У меня нет каких-либо других специальных конфигураций набора. Я называю этот метод в JavaDStream.foreachRDD и имею JavaStreamingContext и SparkSession работают. SparkSession: SparkSession искра = SparkSession .builder () .master ( "местный [4]") .appName (AppName) .config ( "spark.cassandra.connection.host", внутрибрюшинно). конфигурации ( "spark.cassandra.connection.port", порт) .config ( "spark.driver.allowMultipleContexts", "истина") .getOrCreate (); StreamingContext:. SparkConf sparkConfig = новый SparkConf () setMaster ( "местный [4]") .setAppName (AppName2) .set ( "spark.cassandra.connection.host", ф) .set ( "spark.cassandra.connection.port », порт) .set ( "spark.driver.allowMultipleContexts", "истинный"); JavaStreamingContext JSSC = новый JavaStreamingContext (sparkConfig, batchInterval); . ГруппеПо () расположение: logLines.foreachRDD (RDD -> {javaFunctions (РДД) .writerBuilder ( "my_keyspace", "имя_таблица", mapToRow (Table.class)) saveToCassandra (); Dataset DS = spark.read () .format (» шоу(); // Выводит пустую таблицу. }); ds.count () и ds.dropDuplicates.count () возвращает 0 Apache Spark версия 2.2.0 Кажется, что проблема возникает, когда оба JavaStreamingContext и SparkSession используются вместе. шоу(); // Выводит пустую таблицу. }); ds.count () и ds.dropDuplicates.count () возвращает 0 Apache Spark версия 2.2.0 Кажется, что проблема возникает, когда оба JavaStreamingContext и SparkSession используются вместе.
Elisabeth
1

голосов
0

ответ
109

Просмотры

Во время работы Docker контейнера с внешним Спарком Я получаю эту ошибку

Моя Искра работает в режиме кластера. Я сборка искровой Кассандры разъем баночка моего приложения банка затем посылающее задание от Докер. Тем не менее сталкиваются с этой проблемой. java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition на java.net.URLClassLoader.findClass (URLClassLoader.java:381) при java.lang.ClassLoader.loadClass (ClassLoader.java:424) в java.lang.ClassLoader.loadClass (ClassLoader.java:357) при java.lang.Class.forName0 (нативный метод) при java.lang.Class.forName (Class.java:348) при org.apache.spark.serializer. JavaDeserializationStream $$ Анон $ 1.resolveClass (JavaSerializer.scala: 67) в java.io.ObjectInputStream.readNonProxyDesc (ObjectInputStream.java:1863) в java.io.ObjectInputStream.readClassDesc (ObjectInputStream.java:1746) в java.io.
Chintamani
1

голосов
0

ответ
76

Просмотры

Как отправить Список выражения для агрегатной функции Спарк JAVA API

Я пытаюсь выполнить ниже код, используя Спарк Java API. sampleDS = sampleDS .select (колонка ( "столбец1"), Col ( "столбец2"), Col ( "price1") Col ( "price2")) .groupBy (колонка ( "столбец1"), Col ( "столбец2")) .agg (выражение ( "сумма (price1)"). как ( "MainPrice"), выражение ( "сумма (price2)"). как ( "ExtPrice")) .sort (COL ( "column1"), Col (» столбец2" )); Но эта часть в коде я хочу, чтобы быть динамической, .agg (выражение ( "сумма (price1)"). Как ( "MainPrice"), выражение ( "сумма (price2)"). Как ( "ExtPrice")) что означает только, если запрос имеет ExtPrice мне нужно ExtPrice быть там, ViceVersa для MainPrice т.е. если MainPrice выбран только код должен быть .agg (выражение ( «сумма (price1)»)), или при выборе только ExtPrice .agg ( выражение (»
John Humanyun
1

голосов
0

ответ
512

Просмотры

Read first line of huge Json file with Spark using Pyspark

I'm pretty new to Spark and to teach myself I have been using small json files, which work perfectly. I'm using Pyspark with Spark 2.2.1 However I don't get how to read in a single data line instead of the entire json file. I have been looking for documentation on this but it seems pretty scarce. I have to process a single large (larger than my RAM) json file (wikipedia dump: https://archive.org/details/wikidata-json-20150316) and want to do this in chuncks or line by line. I thought Spark was designed to do just that but can't find out how to do it and when I request the top 5 observations in a naive way I run out of memory. I have tried RDD . SparkRDD= spark.read.json("largejson.json").rdd SparkRDD.take(5) and Dataframe SparkDF= spark.read.json("largejson.json") SparkDF.show(5,truncate = False) So in short: 1) How do I read in just a fraction of a large JSON file? (Show first 5 entries) 2) How do I filter a large JSON file line by line to keep just the required results? Also: I don't want to predefine the datascheme for this to work. I must be overlooking something. Thanks Edit: With some help I have gotten a look at the first observation but it by itself is already too huge to post here so I'll just put a fraction of it here. [ { "id": "Q1", "type": "item", "aliases": { "pl": [{ "language": "pl", "value": "kosmos" }, { "language": "pl", "value": "\\u015bwiat" }, { "language": "pl", "value": "natura" }, { "language": "pl", "value": "uniwersum" }], "en": [{ "language": "en", "value": "cosmos" }, { "language": "en", "value": "The Universe" }, { "language": "en", "value": "Space" }], ...etc
Sleenee
1

голосов
1

ответ
118

Просмотры

Unknown javadoc format for JavaRDD error in Eclipse in Windows 10

I can't get the Javadoc for Spark core library to work on Eclipse and Windows 10. I have no JRE defined under preferences. I load the Javadoc I right-clicked on the jar file in eclipse-> project explorer -> maven -> download Javadoc. What I typically do. See attached image. How to fix this? Stack Trace is: Java Model Exception: Java Model Status [Unknown javadoc format for JavaRDD {key=Lorg/apache/spark/api/java/JavaRDD;} [in JavaRDD.class [in org.apache.spark.api.java [in C:\Users\karln\.m2\repository\org\apache\spark\spark-core_2.11\2.2.1\spark-core_2.11-2.2.1.jar]]]] at org.eclipse.jdt.internal.core.JavadocContents.getTypeDoc(JavadocContents.java:81) at org.eclipse.jdt.internal.core.BinaryType.getAttachedJavadoc(BinaryType.java:999) at org.eclipse.jdt.internal.ui.text.javadoc.JavadocContentAccess2.getHTMLContent(JavadocContentAccess2.java:538) at org.eclipse.jdt.internal.ui.text.java.hover.JavadocHover.getHoverInfo(JavadocHover.java:757) at org.eclipse.jdt.internal.ui.text.java.hover.JavadocHover.internalGetHoverInfo(JavadocHover.java:675) at org.eclipse.jdt.internal.ui.text.java.hover.JavadocHover.getHoverInfo2(JavadocHover.java:667) at org.eclipse.jdt.internal.ui.text.java.hover.BestMatchHover.getHoverInfo2(BestMatchHover.java:164) at org.eclipse.jdt.internal.ui.text.java.hover.BestMatchHover.getHoverInfo2(BestMatchHover.java:130) at org.eclipse.jdt.internal.ui.text.java.hover.JavaEditorTextHoverProxy.getHoverInfo2(JavaEditorTextHoverProxy.java:86) at org.eclipse.jface.text.TextViewerHoverManager$4.run(TextViewerHoverManager.java:166) And Eclipse Version Eclipse Java EE IDE for Web Developers. Version: Oxygen.2 Release (4.7.2) Build id: 20171218-0600 EDIT: Added Error Detail Screenshot.
K.Nicholas
1

голосов
0

ответ
159

Просмотры

Спарк кластер под управлением Пряжа бросает java.lang.ClassNotFoundException

Я новичок в пряжи, но есть некоторый опыт работы с искрой самостоятельным мастером, я недавно установил Пряжа + Spark, кластер с помощью Ambari. У меня есть искра программы (program.jar) скомпилирован в банку, которая опирается на другую банку работать (infra.jar). Я установил следующие конфигурации в Ambari: spark.executor.extraClassPath = / корень / infra.jar spark.driver.extraClassPath = / корень / infra.jar Я проверил файл существует на всех узлах и проверяется конфигурация была отодвинута до $ SPARK_HOME / конф / искровой defaults.conf на все узлы. На самом деле я также скопировал его $ SPARK_HOME / банки на все узлы. Я бег работы с помощью: $ SPARK_HOME / бен / искрового представить --master пряжи --class com.MyMainClass /root/program.jar У меня есть следующий переменный ENV: экспорт HADOOP_CONF_DIR = / USR / HDP / 2.6.3.0-235 / Hadoop / экспорт конф HADOOP_HOME = / USR / HDP / 2.6.3.0-235 / Hadoop Я получаю сообщение об ошибке: Вызванный: 1942) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:1808) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1353) в java.io.ObjectInputStream.readObject (ObjectInputStream.java:373) в орг. apache.spark.serializer.JavaDeserializationStream.readObject (JavaSerializer.scala: 75) при org.apache.spark.serializer.JavaSerializerInstance.deserialize (JavaSerializer.scala: 114) в org.apache.spark.executor.Executor $ TaskRunner.run ( Executor.scala: 312) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) в java.lang.Thread.run (Thread.java:745) класс MyPartition расположен в infra.jar, но по какой-то причине он не найден. Этот код выглядит как его называют в Исполнителю, также некоторый код мчит - что код драйвера. PS Я пытался добавить банку вручную либо с помощью --jars флага или addJars он все еще терпит неудачу с ClassNotFoundException (хотя по какой-то странной причине, по иному классу infra.jar)
Anton.P
1

голосов
0

ответ
179

Просмотры

Спарк потокового возвращается Кафка результат на местном, но не на ПРЯЖАХ

Я использую Cloudera В. М. CDH 5,12, свечи v1.6, Кафка (устанавливается ням) V0.10 и питона 2.66. Я по этой ссылке параметры свечи: Ниже приведен простой искровой приложение, которое я бегу. Он принимает события от Кафки и печатает его после того, как карта уменьшить. от __future__ импорта print_function импорта SYS из pyspark импорта SparkContext из pyspark.streaming импорта StreamingContext из pyspark.streaming.kafka импорта KafkaUtils если __name__ == "__main__": если Len (sys.argv) = 3: печать ( "Использование: kafka_wordcount. ру», файл = sys.stderr) выход (-1) = SparkContext подкожно (APPNAME = "PythonStreamingKafkaWordCount") ССК = StreamingContext (п, 1) zkQuorum, тема = sys.argv [1:] КВС = KafkaUtils.createStream (ССК , zkQuorum, "искра-потоковая-потребитель", {темы: 1}) линия = kvs.map (лямбда х: 1515766709025 окончательный статус: UNDEFINED отслеживания URL: HTTP: //quickstart.cloudera: 8088 / прокси / application_1515761416282_0010 / пользователь: Cloudera 40 INFO YarnClientSchedulerBackend: application_1515761416282_0010 Применение начал работать. 40 INFO Utils: Успешно начал службу 'org.apache.spark.network.netty.NettyBlockTransferService' на порт 53694. 40 INFO NettyBlockTransferService: Сервер создан на 53694 53 INFO YarnClientSchedulerBackend: SchedulerBackend готова для планирования начинающегося после ожидания maxRegisteredResourcesWaitingTime: 30000 (мс) 54 ИНФО BlockManagerMasterEndpoint: Регистрация блок-менеджер quickstart.cloudera: 56220 с 534,5 MB RAM, BlockManagerId (1, quickstart.cloudera, 56220) 07 ИНФО ReceiverTracker: с 1 приемников 07 ИНФО ReceiverTracker: ReceiverTracker начала 07 INFO PythonTransformedDStream: PythonTransformedDStream @ de77734 10 INFO MemoryStore: Блок broadcast_2 хранятся в виде значений в памяти (приблизительный размер 5,8 Кб, свободный 534,5 MB) 10 INFO MemoryStore: Блок broadcast_2_piece0 хранится в виде байтов в памяти (приблизительный размер 3.5 KB, свободный 534,5 МБ) 20 INFO JobScheduler: Добавлено рабочие места для времени 1515766760000 мс 30 INFO JobScheduler: Добавлены рабочие места для времени 1515766770000 мс 40 INFO JobScheduler: Добавлены задания на время 1515766780000 мс после этого, работа только начинает повторять следующие строки (после некоторой задержки, заданной контексте потока) и не суммируется и не распечатке Кафки потока, в то время как работа на хозяина местного с точно такой же код делает. Интересно, что печатает следующую строку каждый раз, когда происходит-событие Кафки (картинка увеличенных настроек искры памяти) Обратите внимание, что: данные в Кафке, и я могу видеть, что в потребительской консоли я также попытался увеличением исполнителя»
Samhash
1

голосов
0

ответ
727

Просмотры

Правильный синтаксис для создания таблицы с паркетным CTAS в определенном месте

Я пытаюсь создать таблицу, хранящуюся в виде паркета spark.sql с заранее заданным внешним расположением, но я, кажется, что-то не хватаю, или что-то пропущено из документации. Мое чтение документации предполагает следующее должно работать: создать таблицу, если не существует schema.test с помощью ПАРКЕТА разбит на разделы (год, месяц) местоположение «S3a: // пути / к / местоположению», как выбрать «1» test1, test2 истинных, «2017» года, «01» месяца но это возвращает ошибка: несовпадающий ввод «местоположение» ожидает (строка 4, позиция 0) документация предполагает внешнее автоматически подразумеваются, установив местоположение, но в любом случае добавление создать внешнюю таблицу дает ту же ошибку. Я был в состоянии успешно создать пустую таблицу с аналогичным синтаксисом: создать внешнюю таблицу, если не существует schema.test (test1 строку, test2 булево) разделены (год строки, месяц строка) сохраняется как паркетными местоположение «S3A: // путь / к / место» Моя альтернатива для сохранения выберите результаты паркета в / путь / к / место непосредственно первым, а затем создать таблица указывает на это, но это, кажется, круговое движение, когда оригинальный синтаксис кажется действительным и предназначен для этой цели. Что случилось с моим подходом?
MichaelChirico
1

голосов
0

ответ
159

Просмотры

Как выполнить Ttest Уэлча в Спарк 2.0.0 с помощью StreamingTest

Я хочу попробовать T-тест Уэлча в Спарк 2.0.0 Как я знаю, что могу использовать StreamingTest () из mllib на этом сайте. [Https://spark.apache.org/docs/2.0.0/api/scala/index.html#org.apache.spark.mllib.stat.test.StreamingTest] это мой код в искровой оболочке импорт орге. apache.spark.mllib.stat.test. {BinarySample, StreamingTest} импорт org.apache.spark. {SparkContext, SparkConf} импорт org.apache.spark.streaming. {Seconds, StreamingContext} импорт org.apache.spark.storage. StorageLevel импорт org.apache.log4j. {Уровень, Logger} Logger.getRootLogger.setLevel (Level.WARN) вал ЦМК = новый StreamingContext (SC, секунды (30)) ssc.checkpoint ( "./ контрольная точка") данные знач = ССК .textFileStream ( "/ datatest / sample1.csv"). карта (линия => line.split ( "") совпадение {случай Массив (метка, значение) => BinarySample (label.toBoolean, значение. toDouble)}) Вал streamingTest = новый StreamingTest (). setPeacePeriod (0) .setWindowSize (0) .setTestMethod ( "Welch") = вал из streamingTest.registerStream (данные) out.print () ssc.start () ssc.awaitTermination () и sample1.csv будет, как это значение TRUE, FALSE, 11, 43 FALSE, 2 ИСТИНЫ, 5 ИСТИНЫ, 3 ИСТИНЫ, 4 ИСТИНЫ, 122 ИСТИНЫ, 153 FALSE, 153 FALSE, 151 ИСТИНЫ, 153 ИСТИНЫ, 154 ... и я получил эту ошибку. 18/01/17 15:50:30 WARN FileInputDStream: Ошибка поиска новых файлов java.lang.NullPointerException в scala.collection.mutable.ArrayOps $ ofRef $ .length $ расширения (ArrayOps.scala: 192) в scala.collection.mutable .ArrayOps $ ofRef.length (ArrayOps.scala: 192) при scala.collection.SeqLike $ class.size (SeqLike.scala: 106) в scala.collection.mutable.ArrayOps $ ofRef.size (ArrayOps.scala: 186) на scala.collection.mutable.Builder $ class.sizeHint (Builder. spark.streaming.scheduler.JobGenerator $$ Анон $ 1.onReceive (JobGenerator.scala: 89) в org.apache.spark.streaming.scheduler.JobGenerator $$ Анон $ 1.onReceive (JobGenerator.scala: 88) в org.apache. spark.util.EventLoop $$ Анон $ 1.run (EventLoop.scala: 48) Я думаю, что данные sample1.csv неправильно для применения к параметру StreamingTest.registerStream () Я не могу понять, какой тип данных требуется для параметр. или я только что нашел еще один WelchTTest (). Я еще не знаю, что он не отличается. [Https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/mllib/stat/test/WelchTTest.html] 48) Я думаю, что данные sample1.c неправильно для применения к параметру StreamingTest.registerStream () Я не могу понять, какой тип данных требуется для параметра. или я только что нашел еще один WelchTTest (). Я еще не знаю, что он не отличается. [Https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/mllib/stat/test/WelchTTest.html] 48) Я думаю, что данные sample1.c неправильно для применения к параметру StreamingTest.registerStream () Я не могу понять, какой тип данных требуется для параметра. или я только что нашел еще один WelchTTest (). Я еще не знаю, что он не отличается. [Https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/mllib/stat/test/WelchTTest.html]
Jchanho
1

голосов
0

ответ
160

Просмотры

Получение Задача не Сериализуемый при попытке совершить Кафку сдвиги в Спарк, которые транслируют переменные

Я написал Spark, рабочие места, которые читают от Кафки и совершают коррекции вручную. Он работал хорошо, но так как я ввел переменные вещания, я получаю сериализуемые исключения, потому что он пытается сериализовать KafkaInputDStream. Вот минимальный код, показывающий проблему (код написан в Котлин, но я считаю, что это произойдет в Java тоже): весело testBroadCast (оао: JavaStreamingContext, kafkaStream: JavaInputDStream) {вал keyPrefix = jsc.sparkContext () трансляции (. "Событие:") kafkaStream.foreachRDD {РДД -> Val offsetRanges = (rdd.rdd (), как HasOffsetRanges) .offsetRanges () = Val prefixedIds rdd.map { "$ {keyPrefix.value}: $ это"} (.collect ) (kafkaStream.dstream (), как CanCommitOffsets) .commitAsync (offsetRanges)}} приколы основные (арг: Array) {Val = ао JavaStreamingContext (SparkConf () setAppName (». 39) в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ пробег $ 1.Apply $ мкВ $ зр (JobScheduler.scala: 257) в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $ $ anonfun $ пробег $ 1.Apply (JobScheduler.scala: 257) в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ пробег $ 1.Apply (JobScheduler.scala: 257) в scala.util.DynamicVariable. withValue (DynamicVariable.scala: 58) при org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler.run (JobScheduler.scala: 256) при java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в Java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) в java.lang.Thread.run (Thread.java:745) Как использовать или не использовать переменное вещание относится к сериализации KafkaInputDStream? Спарк версия 2.2.0.
Reith
1

голосов
1

ответ
244

Просмотры

Apache Спарк Streaming пользовательский приемник (текстовый файл) с помощью Java

Я новичок в Apache Спарк. Мне нужно прочитать файлы журналов из локального / смонтированного каталога. Некоторый внешний источник записи файлов в локальный / смонтированном каталог. Например, внешние журналы источник записи в combined_file.txtfile и один раз запись файла завершена внешний источник создания нового файла с префиксом 0_, как 0_combined_file.txt. Тогда мне нужно прочитать файл журнала combined_file.txt и обработать его. Так что я пытаюсь написать пользовательский приемник для проверки записи файлов в локальный / смонтированном каталог входа завершен, а затем прочитать заполненный файл. Вот мой код @Override общественной ничтожной OnStart () {Runnable й = () -> {в то время (правда) {попробуйте {Thread.sleep (1000л); реж File = новый файл ( "/ главная / PK01 / Desktop / arcflash /"); Файл [] completedFiles = реж. listFiles ((DIRNAME, имя_файла) -> {вернуть fileName.toLowerCase () StartsWith ( "0_");}.); // metaDataFile -> 0_test.txt // completedFiles -> test.txt для (File metaDataFile: completedFiles) {Строка compFileName = metaDataFile.getName (); compFileName = compFileName.substring (2, compFileName.length ()); Файл DataFile = новый файл ( "/ главная / PK01 / Desktop / arcflash /" + compFileName); если (dataFile.exists ()) {байт [] данных = новый байт [(Int) dataFile.length ()]; fis.read (данные); fis.close (); магазин (новый String (данные)); dataFile.delete (); metaDataFile.delete (); }}} Задвижка (Исключение е) {e.printStackTrace (); }}}; новая тема (й); } Я пытаюсь обрабатывать данные, как показано ниже. Данные JavaReceiverInputDStream = jssc.receiverStream (приемник); data.foreachRDD (fileStreamRdd -> {processOnSingleFile (fileStreamRdd.flatMap (streamBatchData -> {вернуть Arrays.asList (streamBatchData.split ( "\\ п")) итератора ();.}));}); Но получение ниже исключения 18/01/19 12:08:39 WARN RandomBlockReplicationPolicy: Ожидать 1 реплику только с 0 пэром / с. 18/01/19 12:08:39 WARN BlockManager: runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run (Thread.java:748) Кто-нибудь может помочь мне решить эту ошибку Вот. Любая помощь будет признателен
PVH
12

голосов
1

ответ
18.5k

Просмотры

Unable to understand error “SparkListenerBus has already stopped! Dropping event …”

Вопрос, который я хотел бы знать, если у кого есть магический метод, чтобы избежать таких сообщений в журналах Spark: 2015-08-30 19:30:44 ОШИБКА LiveListenerBus: 75 - SparkListenerBus уже останавливал! Удаление SparkListenerExecutorMetricsUpdate событий (41, WrappedArray ()) После дальнейших исследований, я понимаю, что LiveListenerBus расширяет AsynchronousListenerBus. И таким образом, в какой-то момент, метод .stop () вызывается. Затем, сообщения, которые могут быть отправлены / полученные будут сброшены и остаются необработанными. В принципе, некоторые SparkListenerExecutorMetricsUpdate сообщения, к сожалению, пока не получили, и как только они, они становятся не упали в никуда. Это не выглядит критическим, поскольку SparkListenerExecutorMetricsUpdate просто соответствует периодическим обновлениям от исполнителей. Что такое стыдно, что я абсолютно не» т понять, почему это происходит, и бездельники относятся к этому вопросу. Обратите внимание, что это абсолютно не детерминированный, и я не могу воспроизвести это, вероятно, из-за асинхронной природы и мое отсутствие понять о том, как / когда остановка () должна быть вызвана. О исполняемый код Плотный образец: Val СБН = новый SparkContext (sparkConf) вал metricsMap = Metrics.values.toSeq.map (V => V -> sc.accumulator (0, v.toString)) .toMap вал outFiles = подкожно .textFile (outPaths) И нет никакой другой ссылки на подкожно или SparkContent экземпляра.
Adrien M.
1

голосов
1

ответ
93

Просмотры

optimize/tune setting to spark job, where the job uses groupbyKey and reduceGroups

Привет Я пытаюсь увидеть, если есть какие-либо параметры, такие как исполнитель память, ядер, раздел в случайном порядке или что мы можем думать о том, что может ускорить работу, которая включает в себя объединение, GroupByKey и reduceGroups операция Я понимаю эти интенсивные операции выполнять и ее в настоящее время с 5 часов, чтобы закончить это. (Ключевые _.) .ReduceGroups .union (переходный) .union (семейство) .groupByKey ((левый, правый) => искра подать "Step5_Spark_Command": Пример: «Команда-runner.jar, искровым представить, - класс, com.ms.eng.link.modules.linkmod.Links, - название, \\\ "Ссылки \\\", - мастер, пряжа, - развернуть режим, клиент, - исполнитель-памяти, 32G, --executor-сердечники, 4, - CONF, spark.sql.shuffle.partitions = 2020, / главная / Hadoop / linking.jar, JobId = # {myJobId}, окружающая среда = Prod», функция Val семьи = generateFamilyLinks ( ссылки, superNodes.filter (_.
dedpo
1

голосов
0

ответ
77

Просмотры

Почему искра SQL выбросит другое место раздела не существует исключение?

Ниже мой питон код, чтобы получить ульи данных с искровым SQL warehouse_location = 'Файл: /// путь / к / склад' искра = SparkSession \ .builder \ .config ( "spark.sql.warehouse.dir", warehouse_location) \. enableHiveSupport () \ .getOrCreate () spark.sql ( 'выберите * из ТПС, где Dt =' A ') Но Спарк сгенерирует исключение FileNotFound об / путь / к / warehoues / TBL / Dt = B. Действительно, я создал раздел B и имел какие-то данные, то я удалил каталог B напрямую (не опускалась раздел B). Но я думаю, что я не должен получить не ошибку, если я только запрос, где Dt = «A» как в улье. Разве это не имеет значения Dt = «B»? Я не видел исходный код о Спарк SQL, я думаю Спарк просканирует весь warehouse.dir и вывод схемы, так что она будет проверять действительность. Могу ли я отключить это?
MoreFreeze
1

голосов
1

ответ
253

Просмотры

How can Apache Spark history-server refer to Amazon S3?

[Версия] Apache Spark 2.2.0 Hadoop 2.7 Я хочу настроить сервер Apache Спарк histroy. Свечи Журнал событий находится в Amazon S3. Я могу сохранить файл войти S3, но не могу читать из истории сервера. Apache Spark установлен в / USR / местные / искры так, $ SPARK_HOME является / USR / местные / искрой $ кд / USR / местные / искровой / SBIN $ ш start-history-server.sh я получил следующее исключение ошибок в потоке "основной" java.lang.ClassNotFoundException: org.apache.hadoop.fs.s3a.S3AFileSystem в java.net.URLClassLoader.findClass (URLClassLoader.java:381) в java.lang.ClassLoader.loadClass (ClassLoader.java:424) на солнце. misc.Launcher $ AppClassLoader.loadClass (Launcher.java:335) в java.lang.ClassLoader.loadClass (ClassLoader.java:357) в java.lang.Class.forName0 (Native Method) в java.lang.Class.forName ( Class.java:348) в org.apache.spark.util.Utils $ .classForName (Utils.scala: 230) .... мой искровой defaults.conf ниже spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.history.provider org.apache.hadoop.fs.s3a.S3AFileSystem spark.history.fs. logDirectory S3A: // XXXXXXXXXXXXX spark.eventLog.enabled верно spark.eventLog.dir S3A: // xxxxxxxxxxxxxxx Я установил это 2 фляги файлы в / USR / местные / искровым / банки / AWS-ява-СДК-1.7.4.jar Hadoop -aws-2.7.3.jar но ошибка та же. Что случилось?
Blue-Pixel
1

голосов
0

ответ
775

Просмотры

Спарк структурированных потокового с обеспеченным Кафка получает заморозил сообщение журнала [INFO StreamExecution: Запуск нового потоковый запроса.]

Для того, чтобы использовать структурированные потоковый в моем проекте, я тестирую искру 2.2.0 и Кафка 0.10.1 интеграцию с Kerberos на моем Hortonworks 2.6.3 среды, я бег ниже пример кода для проверки интеграции. Я могу запустить программу ниже на IntelliJ на свече локальном режиме без каких-либо проблем, но и та же программа при перемещении в кластер пряжи / режим клиента на Hadoop кластера он застрянет на ниже сообщения журнала 18/01/25 14:19:59 INFO StreamExecution: Запуск нового потокового запроса. Для проверки вменяемости на обеспеченном кластере Hadoop, я побежал искру 2.2.0 ПИ примера, это работает отлично, и я также способен производить и потреблять сообщения от Кафки на Hadoop кластере с использованием kafka-console-producer.sh и kafka- console-consumer.sh. Извините за этот долгий пост, поскольку это является очень широким вопросом, мне нужно ваши материалы для того, чтобы сделать эту работу. Пример программы: APPNAME ( "KAFKA_SPARK_TEST_APP") .getOrCreate ()}} искры kafka_jaas.conf: KafkaClient {com.sun.security.auth.module.Krb5LoginModule требуется useTicketCache = истина renewTicket = истина ServiceName = "{{kafka_bare_jaas_principal}}"; }; Клиент {com.sun.security.auth.module.Krb5LoginModule требуется useKeyTab = истина Keytab = "{{kafka_keytab_path}}" storeKey = TRUE useTicketCache = FALSE ServiceName = "Zookeeper" основные "= {{kafka_jaas_principal}}"; }; Спарк подать команду: ./spark-submit \ --name TEST_JOB_SPARK_STREAM_WITH_KAFKA \ --verbose \ --master пряжа \ --deploy режим клиента \ --num-исполнители 1 \ --executor-памяти 1g \ --executor- ядра 1 \ --repositories http://repo.hortonworks.com/content/repositories/releases/ \ --packages org.apache.spark: искровой SQL-Кафка-0-10_2.11: 2.2.0.2.6.3. 0-235 \ --conf spark.yarn.maxAppAttempts = 1 \ --files /home/user/sparktest/kafka_jaas.conf#kafka_jaas.conf,/home/user/sparktest/user.headless.keytab#user.headless. Keytab \ --conf "spark.driver.extraJavaOptions = -XX: + UseG1GC -Djava.security.auth.login.config = / дом / пользователь / sparktest / kafka_jaas.conf" \ --conf «spark.executor.extraJavaOptions = -XX: + UseG1GC -Djava.security.auth.login.config = / дом / пользователь / sparktest / kafka_jaas.conf»\ --class com.brokerfmc.demo.DemoSparkStreamingWithKafkaJob /home/user/sparktest/demo.jar kafka01. broker.com:6667,kafka02.broker.com:6667 src_topic dest_topic SASL_PLAINTEXT Спарк Консоль вывода: 18/01/25 14:19:59 INFO AbstractLogin: Успешно вошли в 18/01/25 14:19:59 INFO KerberosLogin.: TGT обновить поток начал. 18/01/25 14:19:59 INFO KerberosLogin: TGT действителен начиная с: Чт 25 января 13:07:
nilesh1212
1

голосов
0

ответ
64

Просмотры

Add a column value in a row based on every values of this same row

Мой вопрос может быть тупым или что-нибудь еще. Но мне было интересно: я хочу сделать структурированные потоковым я хочу как агрегировать и оценку данных с помощью модели газированной воды Так у меня есть этот VAL data_processed = data_raw .withWatermark ( «метка», «10 минут») .groupBy (окно ( Седло ( "метки времени"), "1 минута")) .agg (*** все агрегирование ***) То, что я хочу добавить, как: .withColumn ( "row_scored", скоринг (all_others_cols)) Так что для каждой строки структурированная потоковый он забьет после агрегации. Но я не думаю, что может быть возможно. Так что мне интересно, если вы думаете о другом подходе. Я использую газированную воду, так что функции озвучивания нуждается в H2O Frame. Я думал создать UDF вроде: выбрать все остальные столбцы, создать строку и преобразовать его в dataframe преобразовать dataframe, состоящий из одного ряда в H2O Рама предсказать H2O Рама из одной строки преобразовать предсказание от H20 ​​рамы для dataframe получить счет в dataframe удвоить и вернуть его с UDF Но я не» т думает, что это вполне оптимизирован, может быть, у вас есть новый подход или замечания, которые сделают увидеть еще один способ сделать это. заранее спасибо
tricky
1

голосов
0

ответ
104

Просмотры

AKKA FSM in spark streaming

У меня есть требование, чтобы создать реализацию государственной машины в моей искры потокового приложения. После прочтения через несколько постов нашли AKKA приходит с FSM из коробки. Я создал простой AKKA FSM, и я могу запустить его локально. Я не уверен, как интегрировать этот код в свече структурирована потоковый (mapGroupsWithState). Я хочу взять эту FSM и подключить к mapGroupsWithState, так что я могу хранить последнее состояние, даже если мое приложение идет вниз. объект BuilderFSM {// States запечатаны признак объект MachineState случай настроенный расширяет MachineState случае объект трюк проходит MachineState случае объект nonTuned расширяет MachineState случая класса TuningEvent (Меджид: String, acctNumber: String, типСобытие:
Nats
1

голосов
0

ответ
250

Просмотры

UnicodeDecodeError when filtering then counting a pyspark DataFrame in spark-submit

У меня есть таблица паркета, что мне нужно фильтровать на основе UDF, а затем подсчитать результаты для дальнейшей обработки. Это работает в интерактивном режиме, но не в искровом представить. Я вареные вопрос вниз к простейшим примером. Вот это интерактивный код: В работе [26]: Таблица = table.filter (F.udf (лямбда х: Правда, T.BooleanType ()) ( 'my_field')) В работе [27]: table.count () Выход [ 27]: 819876 Тем не менее, когда я исполняю тот же код, с помощью искры подати (режим кластера пряжи), я получаю следующее: Вызванный: org.apache.spark.api.python.PythonException: Traceback (самый последний вызов последнего) : Файл "{путь} /pyspark/worker.py", строка 137, в главном spark_files_dir = utf8_deserializer.loads (входной_файл) Файл "{путь} /pyspark/serializers.py", строка 549, в нагрузках возврата s.decode ( "UTF-8"), если self.use_unicode ELSE сек UnicodeDecodeError: 'UTF-8' кодек не может декодировать байт 0x80 в позиции 13: неверный старт байт я полностью сбит с толку этим. Любые идеи, для чего может идти неправильно, когда я исполняю выше с помощью искрового представить? версия Python: 3.6.3 искры версия: 2.1.1 Обновление следующих работ в интерактивном режиме: В работе [33]: table.rdd.filter (лямбда х: 'my_term' в x.my_field) .toDF () рассчитывать () Выход [. 33]: 753137 Но когда я пытаюсь работает, что с помощью искрового представить, я получаю следующее сообщение об ошибке: {} some_path /pyspark/serializers.py», строка 547, в грузе с = stream.read (длина) ValueError: чтение длина должна быть положительным или -1
LateCoder
0

голосов
0

ответ
4

Просмотры

Как сделать длинный запрос с использованием WHERE IN Спарк?

У меня есть большая база данных с приблизительно 500 гигабайтами данных столбцов. Я пытаюсь получить доступ к данным с использованием искрового на DataBricks, но запрос слишком долго, чтобы позволить мне получить данные, которые я заинтересован в. Я новичок в этом, поэтому, пожалуйста, простите меня, если вопрос не имеет полный смысл. Единственный способ, которым я могу это сделать прямо сейчас, чтобы разбить запрос и использовать часть его и повторить его. SELECT * FROM myDataTable WHERE rollID в ( '1', '2', '148', '123', '21432' ....) Ожидаемый: получить все данные сразу, так что я могу сделать некоторый анализ на кластере ,
user11385784
1

голосов
0

ответ
197

Просмотры

spark.task.cpus changed runtime - speedup pairwise comparisons

I've read some of the previous posts and sadly I think I can't do what I would like to do, but maybe there is a recent solution that I don't know about. I would like at a certain point to perform some parallel operations within a map using the parallel collections provided by the Scala standard library. However, I understood that I should set at the beginning of my Spark execution the spark.task.cpus to assign to each task a sufficient number of cores. My configuration is made of 14 nodes with 8 cores each. Within a map, I have two collections of points that should be compared using the Euclidean distance to check if at least one pair respects a distance threshold. Consequently, I end up with something like: collection1.exists(point1 => collection2.exists(point2 => dist(point1, point2)
Luca
1

голосов
0

ответ
355

Просмотры

Спарк Csv указать символ новой строки

Я пишу фрейм данных с использованием библиотеки искры CSV. Я использую искру 1.6. Мне было интересно, если есть способ, чтобы указать символ новой строки. Обычно, я думаю, что это \ п. Или, если нет, то есть хорошее решение для изменения новой строки? view.coalesce (1) .write.format ( "com.databricks.spark.csv"). Опция ( "заголовок", "истина"). Опция ( "разделитель", "\ 036"). Опция ( "кодировка" ., "ASCII") сохранить (местоположение);
Defcon
1

голосов
1

ответ
776

Просмотры

Apache Спарк Структурированные потоковый (DataStreamWriter) запись в улье таблицы

Я ищу использовать искру Structured потокового чтения данных с Кафкой и обработать его и записать в таблицу Hive. Вал искра = SparkSession .builder .appName ( "Кафка Тест") .config ( "spark.sql.streaming.metricsEnabled", правда) .config ( "spark.streaming.backpressure.enabled", "истина") .enableHiveSupport () .getOrCreate () знач события = искра .readStream .format ( "Кафка") .option ( "kafka.bootstrap.servers", "ХХХХХХХ") .option ( "startingOffsets", "последний") .option ( "подписываются", "YYYYYY") .load VAL данные = events.select (..... некоторые столбцы ...) data.writeStream .format ( "паркетным") .option ( "сжатие", "мгновенным") .outputMode ( "добавить ") .partitionBy (" DS ") .option ("
Anant Furia
1

голосов
1

ответ
580

Просмотры

искра-на-K8S ступенчатого ресурсов сервера с Python

Я после Running искру на Kubernetes документы с искровым по-K8S v2.2.0-kubernetes-0.5.0, Kubernetes v1.9.0 и Minikube v0.25.0. Я в состоянии успешно выполнить задание на Python с помощью этой команды: бен / искрового отправить \ --deploy режим кластера \ --master K8S: // https: //10.128.0.4: 8443 \ --kubernetes-пространство имен по умолчанию \ --conf spark.executor.instances = 1 \ --conf spark.app.name = искровой пи \ --conf spark.kubernetes.driver.docker.image = kubespark / искровые драйвер-р: v2.2.0-kubernetes -0.5.0 \ --conf spark.kubernetes.executor.docker.image = kubespark / искровой исполнитель-р: v2.2.0-kubernetes-0.5.0 \ --jars местный: /// неавтоматический / искровой / примеры / банки / искровой examples_2.11-2.2.0-K8S-0.5.0.jar \ местный: /// неавтоматический / искра / примеры / SRC / главный / питон / пи. баночка Можно ли запустить работу Python с местными зависимостями? Я попробовал эту команду, и это не удалось: бен / искрового отправить \ --deploy режим кластера \ --master K8S: // https: //10.128.0.4: 8443 \ --kubernetes-пространства имен по умолчанию \ --conf искры. executor.instances = 1 \ --conf spark.app.name = искровым р \ --conf spark.kubernetes.driver.docker.image = kubespark / искровым водитель-ру: v2.2.0-kubernetes-0.5.0 \ --conf spark.kubernetes.executor.docker.image = kubespark / искровой исполнитель-р: v2.2.0-kubernetes-0.5.0 \ --conf spark.kubernetes.initcontainer.docker.image = kubespark / искровой INIT: v2.2.0-kubernetes-0.5.0 \ --conf spark.kubernetes.resourceStagingServer.uri = HTTP: //10.128.0.4: 31000 \ ./examples/src/main/python/pi.py 10 Я получаю эту ошибку в водительская журналы: Ошибка: не удалось найти или загрузить основной класс .opt.spark.jars.RoaringBitmap-0.5.11.
David
1

голосов
0

ответ
95

Просмотры

искры кластер с использованием пряжи и справедливым планировщика попадет максимальное количество ядер для каждого пользователя

Я делаю несколько представлений пряжи работы в кластер с помощью справедливого планировщика и динамического распределения. Я могу загрузить много рабочих мест в очереди. То, что я вижу в том, что рабочие места будут работать до 18 ядер рабочих, то это будет сдерживать рабочие места вниз по течению, пока один из активных ядер не закончена. Где бы это ограничение быть от? это установка нити или установка справедливой планировщик?
bhomass
1

голосов
1

ответ
27

Просмотры

Riak искры разъем не работает

Мой разъем Riak-Спарк не работает. Я могу запустить искру: / OPT / искра / бен / искровой оболочка \ --jars com.fasterxml.jackson.module_jackson-модуль-scala_2.11-2.4.4.jar \ --conf spark.riak.connection.host = 127.0.0.1: 8087 \ --packages com.basho.riak: искровой Riak-connector_2.11: 1.6.3 Но когда я бег: импорт com.basho.riak.spark._ знач данные = Array (1, 2 , 3, 4, 5) вал testRDD = sc.parallelize (данные) Я получил сообщение об ошибке: SCALA> импорт com.basho.riak.spark._ SCALA> данных знач = Array (1, 2, 3, 4, 5) данные: Массив [Int] = Array (1, 2, 3, 4, 5) Скала> Val testRDD = sc.parallelize (данные) java.lang.VerifyError: класс com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer отменяет последний метод withResolved (Lcom / fasterxml / / DataBind JACKSON / BeanProperty; LCOM. / fasterxml / / DataBind JACKSON / jsontype ... Может кто-нибудь помочь в установке и использовании данного разъема ??? Большое спасибо заранее, J
user2405703
1

голосов
1

ответ
567

Просмотры

How to assign the python interpreter spark worker used?

Как назначить питон переводчика искровой работника используется? я попробовать несколько метод, как: 1) установить окр Варс экспорт PYSPARK_DRIVER_PYTHON = / python_path / бен / экспорт питон PYSPARK_PYTHON = / python_path / bin / питон не работает. Я уверен, что использование окр набор успеха PYSPARK_DRIVER_PYTHON PYSPARK_PYTHON: окр | Grep PYSPARK_PYTHON я хочу pyspark использование / python_path / bin / Python как интерпретатор Отправной питона, но рабочий начать использовать: питон -m Deamon я не хочу, чтобы связать по умолчанию питона / python_path / bin / питона в том, что это может влияет на другой УБС, BCZ питон по умолчанию и / python_path / bin / питон не совпадает с версией, и как при использовании продукции. Также установлено spark-env.sh не работает: spark.pyspark.driver.python = / python_path / bin / питон spark.pyspark.python = / python_path / bin / питон, когда водитель запуска некоторые предупредительные журналы как: конф / искрового окр. ш: строка 63: искры.
Moon.Hou
1

голосов
0

ответ
348

Просмотры

Какой искровой SQL-Кафка использовать для Apche Кафки 1.0.0.?

Какие искровым SQL-Кафка .jar как зависимость при работе с искровым отправить следует использовать для Apache Кафки 1.0.0.? Я использую Scale 2.11 и Спарк 2.2.1. Я хотел бы запустить Спарк Структурированного приложения Streaming.
Erhan
1

голосов
1

ответ
217

Просмотры

ClassCastException while deserializing with Java's native readObject from Spark driver

I have two spark jobs A and B such that A must run before B. The output of A must be readable from: The spark job B A standalone Scala program outside of Spark environment (no Spark dependency in) I am currently using the Java's native serialization with Scala case classes. From the A Spark Job: val model = ALSFactorizerModel(...) context.writeSerializable(resultOutputPath, model) with serialization method: def writeSerializable[T OK From the B Spark job running on a (Dataproc) Spark cluster => Fails with the below exception: The exception: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel.stores of type scala.collection.Seq in instance of mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at mycompany.fs.gcs.SimpleGCSFileSystem.readSerializable(SimpleGCSFileSystem.scala:71) at mycompany.algo.als.batch.strategy.ALSClusterer$.run(ALSClusterer.scala:38) at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55) at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Am I missing something ? Should I configure Dataproc/Spark to support the use of the Java Serialization for this code ? I submit the job with the --jars and never had other issues before. The spark dependencies are not included in this Jar, the scope is Provided. Scala version: 2.11.8 Spark version: 2.0.2 SBT version: 0.13.13 Thanks for your help
ogen

Просмотр дополнительных вопросов