Big Data 13 min read

Deep Dive into PySpark Implementation: Multi‑Process Architecture, Java Integration, RDD/SQL Interfaces, Executor Communication, and Pandas UDF

This article explains PySpark's multi‑process architecture, how the Python driver uses Py4J to call Java/Scala APIs, the implementation of RDD and DataFrame interfaces, executor‑side process communication and serialization with Arrow, and the design of Pandas UDFs, while also discussing current limitations and future directions.

DataFunTalk
DataFunTalk
DataFunTalk
Deep Dive into PySpark Implementation: Multi‑Process Architecture, Java Integration, RDD/SQL Interfaces, Executor Communication, and Pandas UDF

PySpark is built on a multi‑process architecture where both the driver and executor hosts separate Python and JVM processes. When a user submits a PySpark script via spark-submit , the driver runs the Python script, launches a JVM, and uses Py4J to invoke Java/Scala Spark APIs.

On the driver side, the SparkContext is created by launching a gateway:

def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
    with SparkContext._lock:
        if not SparkContext._gateway:
            SparkContext._gateway = gateway or launch_gateway(conf)
            SparkContext._jvm = SparkContext._gateway.jvm

The gateway starts the JVM and imports essential Spark classes via JavaGateway :

gateway = JavaGateway(gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret, auto_convert=True))
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")

Python RDD and DataFrame operations are thin wrappers around the corresponding JVM objects. For example, creating an RDD from Hadoop files calls the Scala PythonRDD companion object:

def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
                     valueConverter=None, conf=None, batchSize=0):
    jconf = self._dictToJavaMap(conf)
    jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
                                                valueClass, keyConverter, valueConverter,
                                                jconf, batchSize)
    return RDD(jrdd, self)

On the executor side, when a UDF is required, Spark launches a Python worker process via a socket. The executor creates the worker with:

val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context)
writerThread.start()
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
val stdoutIterator = newReaderIterator(stream, writerThread, startTime, env, worker, releasedOrClosed, context)
new InterruptibleIterator(context, stdoutIterator)

Data is serialized using Apache Arrow for efficiency. The Arrow writer on the JVM side streams batches to the Python worker:

val arrowWriter = ArrowWriter.create(root)
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()
while (inputIterator.hasNext) {
    val nextBatch = inputIterator.next()
    while (nextBatch.hasNext) { arrowWriter.write(nextBatch.next()) }
    arrowWriter.finish()
    writer.writeBatch()
    arrowWriter.reset()
}

In the Python worker, worker.py reads the socket, determines the evaluation type, and deserializes Arrow batches back into Pandas Series:

def dump_stream(self, iterator, stream):
    import pyarrow as pa
    writer = None
    for batch in iterator:
        if writer is None:
            writer = pa.RecordBatchStreamWriter(stream, batch.schema)
        writer.write_batch(batch)
    if writer is not None:
        writer.close()

def load_stream(self, stream):
    import pyarrow as pa
    reader = pa.ipc.open_stream(stream)
    for batch in reader:
        yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]

Pandas UDFs leverage this Arrow‑based communication, allowing users to write vectorized functions that operate on whole Pandas Series rather than row‑by‑row. An example from the official documentation:

def multiply_func(a, b):
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())
df.select(multiply(col("x"), col("x"))).show()

The article concludes that while PySpark’s integration of Arrow and Pandas UDFs greatly improves throughput and cache efficiency, there are still drawbacks such as extra CPU overhead for inter‑process communication, the need to understand Spark’s distributed model, and limitations of Pandas UDF return types. Ongoing work like Koalas and broader vectorized execution aims to further close the gap between single‑node Pandas and distributed Spark workloads.

big datapythonUDFdistributed computingSparkPySparkARROW
DataFunTalk
Written by

DataFunTalk

Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.