Left join в pyspark работает как inner join

Вот мой код:

joined = deals_parsed.alias("deals").join(
    transactions_parsed.alias("transactions"),
    expr("""
    deals.symbol = transactions.symbol AND
    deals.timestamp >= transactions.timestamp AND
    deals.timestamp <= transactions.timestamp + interval 1 seconds
    """),
    "leftOuter"
)

joined_windowed = (
    joined
    .withColumn("window", window(col("deals.timestamp"), "5 minutes"))
)

def process_batch(batch: DataFrame, _batch_id: int):
    if batch.isEmpty():
        logger.info(f"Batch {_batch_id} is empty.")
        return
    windows = batch.select("window").distinct().collect()
    for window in windows:
        window_start = window['window'].start
        window_end = window['window'].end
        window_df = batch.filter((batch.window.start == window_start) & (batch.window.end == window_end))
        broker_count = window_df.filter(col("broker_id").isNotNull()).count()
        exchange_count = window_df.count()
        logger.info(f"Window: {window_start} - {window_end}")
        logger.info(f"Broker Transactions: {broker_count}")
        logger.info(f"Exchange Transactions: {exchange_count}")

query = (
    joined_windowed
    .writeStream 
    .outputMode("append")
    .foreachBatch(process_batch)
    .trigger(processingTime='5 minutes')
    .start()
)

query.awaitTermination()

У меня 2 потока. Количество элементов в потоке deals гарантированно меньше. Однако после join их столько же, сколько и transactions, потому что join работает как внутренний и просто удаляет строки из сделок без совпадений.

Пожалуйста, помогите мне найти, где моя ошибка.


Ответы (0 шт):