Size: a a a

2022 February 08

ДМ

Дмитрий Морозов... in Moscow Spark
def path_length(fromnode, tonode):

   try:
       return nx.dijkstra_path_length(graph.value, fromnode, tonode, weight='weight')
   except Exception:
       return ''

len_udf = F.udf(path_length)
data = data.withColumn('path_length', len_udf(data.fromnode, data.tonode))
источник

DZ

Dmitry Zuev in Moscow Spark
откуда graph.value ?
источник

ДМ

Дмитрий Морозов... in Moscow Spark
до этого граф пришлось вне спарка собрать

data_list = data.collect()

G = nx.DiGraph()

for graph_row in data_list:
   G.add_edge(graph_row['fromnode'], graph_row['tonode'], weight=graph_row['weight'])

broadcastGraph = spark_session.sparkContext.broadcast(G)
источник

DZ

Dmitry Zuev in Moscow Spark
источник

ДМ

Дмитрий Морозов... in Moscow Spark
если есть варианты, как собрать граф по другому буду рад услышать)
источник

DB

Dmitry Buslov in Moscow Spark
Если это разовая задачка - можно активировать фри инстанс SAP HANA Cloud и там оч. быстро все посчитать для всех маршрутов. Пример параллельного поиска множества кратчайших путей - https://github.com/SAP-samples/hana-graph-examples/blob/main/POKEC/POKEC_1k_SP_pairs_bench_sequential_and_parallel.sql
источник

DZ

Dmitry Zuev in Moscow Spark
источник

DZ

Dmitry Zuev in Moscow Spark
graphx
источник

ЕГ

Евгений Глотов... in Moscow Spark
Graphframes рекомендую, это надстройка над graphx
источник

DZ

Dmitry Zuev in Moscow Spark
да вроде на дейкстре не поможет
источник

ЕГ

Евгений Глотов... in Moscow Spark
А, там чёт сильно сложное надо крутануть, понял)
источник

РБ

Руслан Бикмаев... in Moscow Spark
Не по вариациям сборки, а по оптимизации.
При выборе кратчайшего маршрута, если дропать заведомо ненужные варианты на этапе сборки цепочек, объем вычислений может сократиться на неск. порядков и вписаться в возможности реального железа.
К примеру граф междугородней логистики или курьерской доставки.
Вариант ограничения по максимальной длине. Вычисляется  заранее длина наикратчашего маршрута, в отдельную табличку. И если трасса по маршруту заткнулась, авария, дорожные работы, нашествие лягушек, обрабатываются цепочки маршрутов , с предельным превышением = эталонное_ расстояние * 1.7
Примерный коэффициент, полученный несколькими пробами.
В этом случае петли маршрутов, идущие в противоположную сторону, или  по спирали будут отсекаться на ранних стадиях.
Можно фильтровать "полосой пропускания" , например ограничив построение  альтернативных маршрутов отклонением на 1 - 2 района в сторону от наикратчайшего маршрута.

Кстати при озвученных исходных условиях  можно решать не графовыми алгоритмами, а отранжировав с десяток альтернативных маршрутов  из точки А в точку В, и выбраковывая из них те, на которые поступила информация о непригодности для движения. Сложнее, когда данные о весе ребер поступают динамически, из расчета дорожной обстановки. Здесь обработка графа необходима.

Похожая ситуация возникает при селф-джоине, когда таблица джоинится сама на себя с целью анализа сочетаний и сходства. Добавив фильтры и подропав таким образом ненужные варианты, я сократил объем двух промежуточных таблиц по 1.2 Тб весом до одной в 56 Гб.

Это такой же случай, когда сначала производится вычисление всех возможных вариантов, а потом выбираются оптимальные из них.
источник
2022 February 09

Н

Никита in Moscow Spark
Всем привет!
Читаю spark.read.load() AVRO на HDFS с папками в виде дат /a/b/c/2020-01-01/part-0001.avro
Задача прочитать только даты на 1 число TRUNC(month, dt)
Получаю spark reading partition values: [empty row].
Можно ли указать, где либо партицирование на этапе загрузки датафрейма или единственный вариант делать динамический путь типа /a/b/c/20**-**-01, чтобы читать только первое число месяца?
источник

ЕГ

Евгений Глотов... in Moscow Spark
Надо наоборот, trunc(dt, 'MM')
источник

Н

Никита in Moscow Spark
да это я написал так для понимания
metrics = metrics.withColumn("dt", extract_date_from_input_file())
metrics = metrics.filter(metrics["dt"] == F.date_trunc("month", metrics["dt"]))

мне кажется он сначала все дни загружает потом фильтрует, а я хочу чтобы он загрузил только 1 число месяца сразу
источник

ЕГ

Евгений Глотов... in Moscow Spark
Варик с подстановкой числа в хдфс годный, но надо вписать .option("basePath", "/a/b/c")
источник

ЕГ

Евгений Глотов... in Moscow Spark
Чтоб саму дату не потерять
источник

Н

Никита in Moscow Spark
ну я так и думал, просто мб есть какой-то option, чтобы сказать ему типа тут есть партиции, как в хайве
источник

ЕГ

Евгений Глотов... in Moscow Spark
Ну это и есть указание
источник

ЕГ

Евгений Глотов... in Moscow Spark
А так пиши where, прунинг должен работать
источник