Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/joinery/DataFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -2286,6 +2286,7 @@ public final void writeSql(final Connection c, final String sql)
writeSql(c.prepareStatement(sql));
}


/**
* Write the data from the data frame to a database by
* executing the provided prepared SQL statement.
Expand All @@ -2298,6 +2299,12 @@ public final void writeSql(final PreparedStatement stmt)
Serialization.writeSql(this, stmt);
}

public final void writeSql(final PreparedStatement stmt, int chunkSize)
throws SQLException {
Serialization.writeSql(this, stmt, chunkSize);
}


public final String toString(final int limit) {
return Serialization.toString(this, limit);
}
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/joinery/impl/Serialization.java
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,39 @@ public static <V> void writeSql(final DataFrame<V> df, final PreparedStatement s
stmt.close();
}
}

public static <V> void writeSql(final DataFrame<V> df, final PreparedStatement stmt, int chunkSize)
throws SQLException {
try {
ParameterMetaData md = stmt.getParameterMetaData();
List<Integer> columns = new ArrayList<>();
for (int i = 1; i <= md.getParameterCount(); i++) {
columns.add(md.getParameterType(i));
}
int length = df.length();
int parts = (int) Math.ceil((double) length / chunkSize);
// int remainder = parts*chunkSize - length;
System.out.println(parts);
for (int i = 0; i < parts; i++) {
if (i != parts - 1) {
for (int r = i * chunkSize; r < (i + 1) * chunkSize; r++) {
for (int c = 1; c <= df.size(); c++) {
stmt.setObject(c, df.get(r, c - 1));
}
stmt.addBatch();
}
} else {
for (int r = i * chunkSize; r < (i*chunkSize) + ((parts*chunkSize) - (i*chunkSize)); r++) {
for (int c = 1; c <= df.size(); c++) {
stmt.setObject(c, df.get(r, c - 1));
}
stmt.addBatch();
}
}
stmt.executeBatch();
}
} finally {
stmt.close();
}
}
}
69 changes: 69 additions & 0 deletions src/test/java/joinery/DataFrameSerializationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,73 @@ public void testToFromSql()
);
}
}

@Test
public void testToFromSqlChunkSize()
throws Exception {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
try (Connection dbc = DriverManager.getConnection("jdbc:derby:memory:testdb;create=true")) {
dbc.createStatement().executeUpdate("create table test (category varchar(32), name varchar(32), value int)");
PreparedStatement stmt = dbc.prepareStatement("insert into test values (?,?,?)");
df.writeSql(stmt, 3);

Map<Object, Object> names = new HashMap<>();
names.put("CATEGORY", "category");
names.put("NAME", "name");
names.put("VALUE", "value");

DataFrame<Object> other = DataFrame.readSql(dbc, "select * from test").rename(names);
DataFrame<String> cmp = DataFrame.compare(df, other);
assertArrayEquals(
cmp.col("value").toArray(),
new String[] { "1 | 1", "2 | 2", "3 | 3", "4 | 4", "5 | 5", "6 | 6" }
);
}
}

@Test
public void testToFromSqlChunkSize2()
throws Exception {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
try (Connection dbc = DriverManager.getConnection("jdbc:derby:memory:testdb;create=true")) {
dbc.createStatement().executeUpdate("create table test (category varchar(32), name varchar(32), value int)");
PreparedStatement stmt = dbc.prepareStatement("insert into test values (?,?,?)");
df.writeSql(stmt, 2);

Map<Object, Object> names = new HashMap<>();
names.put("CATEGORY", "category");
names.put("NAME", "name");
names.put("VALUE", "value");

DataFrame<Object> other = DataFrame.readSql(dbc, "select * from test").rename(names);
DataFrame<String> cmp = DataFrame.compare(df, other);
assertArrayEquals(
cmp.col("value").toArray(),
new String[] { "1 | 1", "2 | 2", "3 | 3", "4 | 4", "5 | 5", "6 | 6" }
);
}
}

@Test
public void testToFromSqlChunkSize3()
throws Exception {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
try (Connection dbc = DriverManager.getConnection("jdbc:derby:memory:testdb;create=true")) {
dbc.createStatement().executeUpdate("create table test (category varchar(32), name varchar(32), value int)");
PreparedStatement stmt = dbc.prepareStatement("insert into test values (?,?,?)");
df.writeSql(stmt, 1);

Map<Object, Object> names = new HashMap<>();
names.put("CATEGORY", "category");
names.put("NAME", "name");
names.put("VALUE", "value");

DataFrame<Object> other = DataFrame.readSql(dbc, "select * from test").rename(names);
DataFrame<String> cmp = DataFrame.compare(df, other);
assertArrayEquals(
cmp.col("value").toArray(),
new String[] { "1 | 1", "2 | 2", "3 | 3", "4 | 4", "5 | 5", "6 | 6" }
);
}
}
}