Left join в pyspark работает как inner join
Вот мой код:
joined = deals_parsed.alias("deals").join(
transactions_parsed.alias("transactions"),
expr("""
deals.symbol = transactions.symbol AND
deals.timestamp >= transactions.timestamp AND
deals.timestamp <= transactions.timestamp + interval 1 seconds
"""),
"leftOuter"
)
joined_windowed = (
joined
.withColumn("window", window(col("deals.timestamp"), "5 minutes"))
)
def process_batch(batch: DataFrame, _batch_id: int):
if batch.isEmpty():
logger.info(f"Batch {_batch_id} is empty.")
return
windows = batch.select("window").distinct().collect()
for window in windows:
window_start = window['window'].start
window_end = window['window'].end
window_df = batch.filter((batch.window.start == window_start) & (batch.window.end == window_end))
broker_count = window_df.filter(col("broker_id").isNotNull()).count()
exchange_count = window_df.count()
logger.info(f"Window: {window_start} - {window_end}")
logger.info(f"Broker Transactions: {broker_count}")
logger.info(f"Exchange Transactions: {exchange_count}")
query = (
joined_windowed
.writeStream
.outputMode("append")
.foreachBatch(process_batch)
.trigger(processingTime='5 minutes')
.start()
)
query.awaitTermination()
У меня 2 потока. Количество элементов в потоке deals гарантированно меньше. Однако после join их столько же, сколько и transactions, потому что join работает как внутренний и просто удаляет строки из сделок без совпадений.
Пожалуйста, помогите мне найти, где моя ошибка.