pjfanning opened a new pull request, #1527:
URL: https://github.com/apache/pekko-connectors/pull/1527
follow up to #1526
Two `javadsl` files exposed raw Scala types in public method signatures,
making them awkward to use from Java. `CassandraFlow.createUnloggedBatch` took
a Scala function literal instead of a `pekko.japi.function.Function2`, and all
four `Slick.flowWithPassThrough` overloads exposed
`scala.concurrent.ExecutionContext` directly instead of
`java.util.concurrent.Executor`.
### `CassandraFlow.scala`
Overloading wasn't possible (same erasure), so the old method is renamed and
deprecated:
- Renamed `createUnloggedBatch` →
`createUnloggedBatchWithScalaStatementBinder` (`@deprecated` since `2.0.0`)
- Added new `createUnloggedBatch` with `pekko.japi.function.Function2[T,
PreparedStatement, BoundStatement]` for `statementBinder`, consistent with
`create` and `withContext`
```scala
// Before (Scala-only callable)
def createUnloggedBatch[T, K](..., statementBinder: (T, PreparedStatement)
=> BoundStatement, ...)
// After (Java-callable)
def createUnloggedBatch[T, K](..., statementBinder:
pekko.japi.function.Function2[T, PreparedStatement, BoundStatement], ...)
```
### `Slick.scala`
Overloading is viable here (`ExecutionContext` and `Executor` have distinct
erasures):
- All 4 existing `flowWithPassThrough(ExecutionContext, ...)` overloads
marked `@deprecated` since `2.0.0`
- Added 4 new overloads accepting `java.util.concurrent.Executor`,
converting internally via `ExecutionContext.fromExecutor(executor)`
```scala
// Before
def flowWithPassThrough[T, R](session: SlickSession, executionContext:
ExecutionContext, ...)
// After (new overload; old kept as deprecated)
def flowWithPassThrough[T, R](session: SlickSession, executor: Executor, ...)
```
<!-- START COPILOT ORIGINAL PROMPT -->
<details>
<summary>Original prompt</summary>
## Problem
Two files in `javadsl` packages expose raw Scala types in their public
method signatures, making them awkward or impossible to use idiomatically from
Java.
### File 1:
`cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraFlow.scala`
**Method:** `createUnloggedBatch` (line 91–102)
The `statementBinder` parameter uses a raw Scala function type `(T,
PreparedStatement) => BoundStatement` instead of a Java functional interface.
The other two methods in the same file (`create` and `withContext`) correctly
use `pekko.japi.function.Function2[T, PreparedStatement, BoundStatement]`.
```scala
def createUnloggedBatch[T, K](session: CassandraSession,
writeSettings: CassandraWriteSettings,
cqlStatement: String,
statementBinder: (T, PreparedStatement) => BoundStatement, // ← raw
Scala function
groupingKey: pekko.japi.function.Function[T, K]): Flow[T, T, NotUsed]
```
### File 2:
`slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala`
**Methods:** All 4 `flowWithPassThrough` overloads (lines 157–253)
All four overloads expose `scala.concurrent.ExecutionContext` as a public
parameter. Per the project's own `contributor-advice.md`, Java APIs that need
an `ExecutionContext` should accept `java.util.concurrent.Executor` and convert
internally using `ExecutionContext.fromExecutor(executor)`.
```scala
def flowWithPassThrough[T, R](
session: SlickSession,
executionContext: ExecutionContext, // ← Scala type exposed to Java
API
toStatement: JFunction[T, String],
mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed]
```
---
## Required Changes
### Strategy: Overload where possible (keep old methods deprecated), rename
if overloading is not possible
#### `CassandraFlow.scala` — `createUnloggedBatch`
Scala does **not** allow overloading solely by changing a function type
parameter because `(T, PreparedStatement) => BoundStatement` and
`pekko.japi.function.Function2[T, PreparedStatement, BoundStatement]` would
have the same erasure after compilation. Therefore:
1. **Rename** the existing method to
`createUnloggedBatchWithScalaStatementBinder` (or similar clearly deprecated
name) and annotate it with `@deprecated("Use createUnloggedBatch with
pekko.japi.function.Function2 instead", "2.0.0")` and `@java.lang.Deprecated`.
2. **Add a new** `createUnloggedBatch` method with signature using
`pekko.japi.function.Function2[T, PreparedStatement, BoundStatement]` for
`statementBinder` — matching the style of `create` and `withContext`.
#### `Slick.scala` — `flowWithPassThrough`
The four `flowWithPassThrough` overloads are:
1. `(SlickSession, ExecutionContext, JFunction[T, String], JBiFunction[T,
Integer, R])`
2. `(SlickSession, ExecutionContext, Function2[T, Connection,
PreparedStatement], Function2[T, Integer, R])`
3. `(SlickSession, ExecutionContext, Int, JFunction[T, String],
JBiFunction[T, Integer, R])`
4. `(SlickSession, ExecutionContext, Int, Function2[T, Connection,
PreparedStatement], Function2[T, Integer, R])`
`ExecutionContext` and `Executor` are different types (not erased the same
way), so **overloading IS possible** here. Add 4 new overloads that replace
`executionContext: ExecutionContext` with `executor: Executor`, and mark all 4
existing overloads with `@deprecated("Use the overload with
java.util.concurrent.Executor instead", "2.0.0")` and `@java.lang.Deprecated`.
The new overloads internally convert using
`ExecutionContext.fromExecutor(executor)`.
The `import java.util.concurrent.Executor` is already available (used
elsewhere or can be added).
---
## Detailed Implementation
### `CassandraFlow.scala`
```scala
/**
* Creates a flow that uses
[[com.datastax.oss.driver.api.core.cql.BatchStatement]] and groups the
* elements internally into batches using the `writeSettings` and per
`groupingKey`.
* Use this when most of the elements in the stream share the same
partition key.
* ...
* @deprecated Use [[createUnloggedBatch]] with
`pekko.japi.function.Function2` for `statementBinder` instead.
*/
@deprecated("Use createUnloggedBatch with pekko.japi.function.Function2
for statementBinder instead", "2.0.0")
@java.lang.Deprecated
def createUnloggedBatch[T, K](session: CassandraSession,
writeSettings: CassandraWriteSettings,
cqlStatement: String,
statementBinder: (T, PreparedStatement) => BoundStatement,
groupingKey: pekko.japi.function.Function[T, K]): Flow[T, T, NotUsed]
= {
scaladsl.CassandraFlow
.createBatch(writeSettings,
cqlStatement,
(t, preparedStatement) => statementBinder.apply(t,
preparedStatement),
t => groupingKey.apply(t))(session.delegate)
.asJava
}
/**
* Creates a flow that uses
[[com.datastax.oss.driver.api.core.cql.BatchStatement]] and groups the
* elements internally into batches using the `writeSettings` and per
`groupingKey`.
* ...
*/
def createUnloggedBatch[T, K](se...
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]