Deep Dive into PySpark Implementation: Multi‑Process Architecture, Java Integration, RDD/SQL Interfaces, Executor Communication, and Pandas UDF
This article explains PySpark's multi‑process architecture, how the Python driver uses Py4J to call Java/Scala APIs, the implementation of RDD and DataFrame interfaces, executor‑side process communication and serialization with Arrow, and the design of Pandas UDFs, while also discussing current limitations and future directions.
PySpark is built on a multi‑process architecture where both the driver and executor hosts separate Python and JVM processes. When a user submits a PySpark script via spark-submit , the driver runs the Python script, launches a JVM, and uses Py4J to invoke Java/Scala Spark APIs.
On the driver side, the SparkContext is created by launching a gateway:
def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
with SparkContext._lock:
if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway(conf)
SparkContext._jvm = SparkContext._gateway.jvmThe gateway starts the JVM and imports essential Spark classes via JavaGateway :
gateway = JavaGateway(gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret, auto_convert=True))
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")Python RDD and DataFrame operations are thin wrappers around the corresponding JVM objects. For example, creating an RDD from Hadoop files calls the Scala PythonRDD companion object:
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
valueConverter=None, conf=None, batchSize=0):
jconf = self._dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
return RDD(jrdd, self)On the executor side, when a UDF is required, Spark launches a Python worker process via a socket. The executor creates the worker with:
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context)
writerThread.start()
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
val stdoutIterator = newReaderIterator(stream, writerThread, startTime, env, worker, releasedOrClosed, context)
new InterruptibleIterator(context, stdoutIterator)Data is serialized using Apache Arrow for efficiency. The Arrow writer on the JVM side streams batches to the Python worker:
val arrowWriter = ArrowWriter.create(root)
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()
while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()
while (nextBatch.hasNext) { arrowWriter.write(nextBatch.next()) }
arrowWriter.finish()
writer.writeBatch()
arrowWriter.reset()
}In the Python worker, worker.py reads the socket, determines the evaluation type, and deserializes Arrow batches back into Pandas Series:
def dump_stream(self, iterator, stream):
import pyarrow as pa
writer = None
for batch in iterator:
if writer is None:
writer = pa.RecordBatchStreamWriter(stream, batch.schema)
writer.write_batch(batch)
if writer is not None:
writer.close()
def load_stream(self, stream):
import pyarrow as pa
reader = pa.ipc.open_stream(stream)
for batch in reader:
yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]Pandas UDFs leverage this Arrow‑based communication, allowing users to write vectorized functions that operate on whole Pandas Series rather than row‑by‑row. An example from the official documentation:
def multiply_func(a, b):
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
df.select(multiply(col("x"), col("x"))).show()The article concludes that while PySpark’s integration of Arrow and Pandas UDFs greatly improves throughput and cache efficiency, there are still drawbacks such as extra CPU overhead for inter‑process communication, the need to understand Spark’s distributed model, and limitations of Pandas UDF return types. Ongoing work like Koalas and broader vectorized execution aims to further close the gap between single‑node Pandas and distributed Spark workloads.
DataFunTalk
Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.