Arrow language (#8512)

Initial implementation of the Arrow language. Closes #7755.
Currently supported logical types are
- Date (days and milliseconds)
- Int (8, 16, 32, 64)

One can currently
- allocate a new fixed-length, nullable Arrow vector - `new[<name-of-the-type>]`
- cast an already existing fixed-length Arrow vector from a memory address - `cast[<name-of-the-type>]`

Closes #7755.
This commit is contained in:
Hubert Plociniczak 2024-01-12 19:19:36 +01:00 committed by GitHub
parent 6ae35abc46
commit 3c29a58829
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1505 additions and 1 deletions

View File

@ -1014,6 +1014,7 @@
- [Export of non-existing symbols results in error][7960] - [Export of non-existing symbols results in error][7960]
- [Upgrade GraalVM to 23.1.0 JDK21][7991] - [Upgrade GraalVM to 23.1.0 JDK21][7991]
- [Added opt-in type checks of return type][8502] - [Added opt-in type checks of return type][8502]
- [Introduce Arrow language][8512]
- [DataflowError.withoutTrace doesn't store stacktrace][8608] - [DataflowError.withoutTrace doesn't store stacktrace][8608]
[3227]: https://github.com/enso-org/enso/pull/3227 [3227]: https://github.com/enso-org/enso/pull/3227
@ -1166,6 +1167,7 @@
[7960]: https://github.com/enso-org/enso/pull/7960 [7960]: https://github.com/enso-org/enso/pull/7960
[7991]: https://github.com/enso-org/enso/pull/7991 [7991]: https://github.com/enso-org/enso/pull/7991
[8502]: https://github.com/enso-org/enso/pull/8502 [8502]: https://github.com/enso-org/enso/pull/8502
[8512]: https://github.com/enso-org/enso/pull/8512
[8608]: https://github.com/enso-org/enso/pull/8608 [8608]: https://github.com/enso-org/enso/pull/8608
# Enso 2.0.0-alpha.18 (2021-10-12) # Enso 2.0.0-alpha.18 (2021-10-12)

View File

@ -293,6 +293,7 @@ lazy val enso = (project in file("."))
`runtime-parser`, `runtime-parser`,
`runtime-compiler`, `runtime-compiler`,
`runtime-language-epb`, `runtime-language-epb`,
`runtime-language-arrow`,
`runtime-instrument-common`, `runtime-instrument-common`,
`runtime-instrument-id-execution`, `runtime-instrument-id-execution`,
`runtime-instrument-repl-debugger`, `runtime-instrument-repl-debugger`,
@ -496,6 +497,7 @@ val hamcrestVersion = "1.3"
val netbeansApiVersion = "RELEASE180" val netbeansApiVersion = "RELEASE180"
val fansiVersion = "0.4.0" val fansiVersion = "0.4.0"
val httpComponentsVersion = "4.4.1" val httpComponentsVersion = "4.4.1"
val apacheArrowVersion = "14.0.1"
// ============================================================================ // ============================================================================
// === Utility methods ===================================================== // === Utility methods =====================================================
@ -1487,6 +1489,50 @@ lazy val `runtime-language-epb` =
) )
) )
lazy val `runtime-language-arrow` =
(project in file("engine/runtime-language-arrow"))
.enablePlugins(JPMSPlugin)
.settings(
crossPaths := false,
autoScalaLibrary := false,
inConfig(Compile)(truffleRunOptionsSettings),
instrumentationSettings,
libraryDependencies ++= GraalVM.modules ++ Seq(
"junit" % "junit" % junitVersion % Test,
"com.github.sbt" % "junit-interface" % junitIfVersion % Test,
"org.slf4j" % "slf4j-nop" % slf4jVersion % Test,
"org.slf4j" % "slf4j-api" % slf4jVersion % Test,
"org.apache.arrow" % "arrow-vector" % apacheArrowVersion % Test,
"org.apache.arrow" % "arrow-memory-netty" % apacheArrowVersion % Test
),
javaModuleName := "org.enso.interpreter.arrow",
modulePath := {
val updateReport = (Test / update).value
JPMSUtils.filterModulesFromUpdate(
updateReport,
GraalVM.modules,
streams.value.log,
shouldContainAll = true
) ++ Seq(
(LocalProject(
"runtime-language-arrow"
) / Compile / productDirectories).value.head
)
},
Test / patchModules := {
val testClassesDir = (Test / productDirectories).value.head
Map(javaModuleName.value -> Seq(testClassesDir))
},
Test / addModules := Seq(javaModuleName.value),
Test / javaOptions ++= Seq(
s"--add-opens=java.base/java.nio=${javaModuleName.value}", // DirectByteBuffer in MemoryUtil init is in-accessible
"--add-opens=java.base/java.nio=ALL-UNNAMED" // Tests use Apache Arrow
),
Test / addReads := {
Map(javaModuleName.value -> Seq("ALL-UNNAMED"))
}
)
/** `runtime-test-instruments` project contains Truffle instruments that are used solely for testing. /** `runtime-test-instruments` project contains Truffle instruments that are used solely for testing.
* It is compiled into an explicit Java module. Note that this project cannot have compile-time dependency on `runtime` * It is compiled into an explicit Java module. Note that this project cannot have compile-time dependency on `runtime`
* project, so if you need access to classes from `runtime`, you need to use reflection. * project, so if you need access to classes from `runtime`, you need to use reflection.
@ -2075,7 +2121,8 @@ lazy val `engine-runner` = project
"com.sun.imageio", "com.sun.imageio",
"com.sun.jna.internal.Cleaner", "com.sun.jna.internal.Cleaner",
"com.sun.jna.Structure$FFIType", "com.sun.jna.Structure$FFIType",
"akka.http" "akka.http",
"org.enso.interpreter.arrow.util.MemoryUtil"
) )
) )
.dependsOn(assembly) .dependsOn(assembly)

View File

@ -0,0 +1,6 @@
open module org.enso.interpreter.arrow {
requires org.graalvm.truffle;
provides com.oracle.truffle.api.provider.TruffleLanguageProvider with
org.enso.interpreter.arrow.ArrowLanguageProvider;
}

View File

@ -0,0 +1,15 @@
package org.enso.interpreter.arrow;
import com.oracle.truffle.api.TruffleLanguage;
final class ArrowContext {
private final TruffleLanguage.Env env;
public ArrowContext(TruffleLanguage.Env env) {
this.env = env;
}
public void initialize() {
//
}
}

View File

@ -0,0 +1,48 @@
package org.enso.interpreter.arrow;
import com.oracle.truffle.api.CallTarget;
import com.oracle.truffle.api.TruffleLanguage;
import org.enso.interpreter.arrow.node.ArrowEvalNode;
/** An internal language that implements Arrow specification. */
@TruffleLanguage.Registration(
id = ArrowLanguage.ID,
name = "Truffle implementation of Arrow",
characterMimeTypes = {ArrowLanguage.MIME},
defaultMimeType = ArrowLanguage.MIME,
contextPolicy = TruffleLanguage.ContextPolicy.SHARED)
public class ArrowLanguage extends TruffleLanguage<ArrowContext> {
public static final String ID = "arrow";
public static final String MIME = "application/vnd.apache.arrow.file";
public ArrowLanguage() {}
@Override
protected ArrowContext createContext(TruffleLanguage.Env env) {
var ctx = new ArrowContext(env);
return ctx;
}
@Override
protected void initializeContext(ArrowContext context) {
context.initialize();
}
@Override
protected CallTarget parse(ParsingRequest request) {
ArrowParser.Result code = ArrowParser.parse(request.getSource());
if (code != null) {
ArrowEvalNode node = ArrowEvalNode.create(this, code);
return node.getCallTarget();
} else {
throw new IllegalArgumentException(
"unable to parse the code: " + request.getSource().getCharacters().toString());
}
}
@Override
protected boolean isThreadAccessAllowed(Thread thread, boolean singleThreaded) {
return true;
}
}

View File

@ -0,0 +1,46 @@
package org.enso.interpreter.arrow;
import com.oracle.truffle.api.source.Source;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public final class ArrowParser {
private ArrowParser() {}
public record Result(PhysicalLayout physicalLayout, LogicalLayout logicalLayout, Mode mode) {}
public static Result parse(Source source) {
String src = source.getCharacters().toString();
Matcher m = ARRAY_PATTERN.matcher(src);
if (m.find()) {
try {
var layout = LogicalLayout.valueOf(m.group(1));
return new Result(PhysicalLayout.Primitive, layout, Mode.Allocate);
} catch (IllegalArgumentException iae) {
// propagate warning
return null;
}
}
m = CAST_PATTERN.matcher(src);
if (m.find()) {
try {
var layout = LogicalLayout.valueOf(m.group(1));
return new Result(PhysicalLayout.Primitive, layout, Mode.Cast);
} catch (IllegalArgumentException iae) {
// propagate warning
return null;
}
}
return null;
}
private static final Pattern ARRAY_PATTERN = Pattern.compile("new\\[(.+)\\]");
private static final Pattern CAST_PATTERN = Pattern.compile("cast\\[(.+)\\]");
public enum Mode {
Allocate,
Cast
}
}

View File

@ -0,0 +1,23 @@
package org.enso.interpreter.arrow;
import org.enso.interpreter.arrow.runtime.SizeInBytes;
public enum LogicalLayout implements SizeInBytes {
Date32(32),
Date64(64),
Int8(8),
Int16(16),
Int32(32),
Int64(64);
private final int bits;
LogicalLayout(int bits) {
this.bits = bits;
}
@Override
public int sizeInBytes() {
return bits / 8;
}
}

View File

@ -0,0 +1,6 @@
package org.enso.interpreter.arrow;
public enum PhysicalLayout {
Primitive,
VariableSizeBinary
}

View File

@ -0,0 +1,16 @@
package org.enso.interpreter.arrow.node;
import com.oracle.truffle.api.nodes.Node;
import org.enso.interpreter.arrow.LogicalLayout;
import org.enso.interpreter.arrow.runtime.ArrowCastToFixedSizeArrayFactory;
public class ArrowCastFixedSizeNode extends Node {
static ArrowCastFixedSizeNode create() {
return new ArrowCastFixedSizeNode();
}
public Object execute(LogicalLayout layoutType) {
return new ArrowCastToFixedSizeArrayFactory(layoutType);
}
}

View File

@ -0,0 +1,34 @@
package org.enso.interpreter.arrow.node;
import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.frame.FrameDescriptor;
import com.oracle.truffle.api.frame.VirtualFrame;
import com.oracle.truffle.api.nodes.RootNode;
import org.enso.interpreter.arrow.ArrowLanguage;
import org.enso.interpreter.arrow.ArrowParser;
public class ArrowEvalNode extends RootNode {
private final ArrowParser.Result code;
@Child private ArrowFixedSizeNode fixedPhysicalLayout = ArrowFixedSizeNode.create();
@Child private ArrowCastFixedSizeNode castToFixedPhysicalLayout = ArrowCastFixedSizeNode.create();
public static ArrowEvalNode create(ArrowLanguage language, ArrowParser.Result code) {
return new ArrowEvalNode(language, code);
}
private ArrowEvalNode(ArrowLanguage language, ArrowParser.Result code) {
super(language, new FrameDescriptor());
this.code = code;
}
public Object execute(VirtualFrame frame) {
return switch (code.physicalLayout()) {
case Primitive -> switch (code.mode()) {
case Allocate -> fixedPhysicalLayout.execute(code.logicalLayout());
case Cast -> castToFixedPhysicalLayout.execute(code.logicalLayout());
};
default -> throw CompilerDirectives.shouldNotReachHere("unsupported physical layout");
};
}
}

View File

@ -0,0 +1,16 @@
package org.enso.interpreter.arrow.node;
import com.oracle.truffle.api.nodes.Node;
import org.enso.interpreter.arrow.LogicalLayout;
import org.enso.interpreter.arrow.runtime.ArrowFixedSizeArrayFactory;
public class ArrowFixedSizeNode extends Node {
static ArrowFixedSizeNode create() {
return new ArrowFixedSizeNode();
}
public Object execute(LogicalLayout layoutType) {
return new ArrowFixedSizeArrayFactory(layoutType);
}
}

View File

@ -0,0 +1,144 @@
package org.enso.interpreter.arrow.runtime;
import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.dsl.Cached;
import com.oracle.truffle.api.dsl.Fallback;
import com.oracle.truffle.api.dsl.ImportStatic;
import com.oracle.truffle.api.dsl.Specialization;
import com.oracle.truffle.api.interop.ArityException;
import com.oracle.truffle.api.interop.InteropLibrary;
import com.oracle.truffle.api.interop.TruffleObject;
import com.oracle.truffle.api.interop.UnsupportedMessageException;
import com.oracle.truffle.api.interop.UnsupportedTypeException;
import com.oracle.truffle.api.library.CachedLibrary;
import com.oracle.truffle.api.library.ExportLibrary;
import com.oracle.truffle.api.library.ExportMessage;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.enso.interpreter.arrow.LogicalLayout;
import org.enso.interpreter.arrow.util.MemoryUtil;
@ExportLibrary(InteropLibrary.class)
public class ArrowCastToFixedSizeArrayFactory implements TruffleObject {
private final LogicalLayout logicalLayout;
public ArrowCastToFixedSizeArrayFactory(LogicalLayout logicalLayout) {
this.logicalLayout = logicalLayout;
}
@ExportMessage
public boolean isExecutable() {
return true;
}
public LogicalLayout getLayout() {
return logicalLayout;
}
@ExportMessage
@ImportStatic(LogicalLayout.class)
static class Execute {
@Specialization(guards = "receiver.getLayout() == Date32")
static Object doDate32(
ArrowCastToFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayDate.DateUnit.Day;
return new ArrowFixedArrayDate(pointer(args, iop, unit), unit);
}
@Specialization(guards = "receiver.getLayout() == Date64")
static Object doDate64(
ArrowCastToFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayDate.DateUnit.Millisecond;
return new ArrowFixedArrayDate(pointer(args, iop, unit), unit);
}
@Specialization(guards = "receiver.getLayout() == Int8")
static Object doInt8(
ArrowCastToFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayInt.IntUnit.Byte1;
return new ArrowFixedArrayInt(pointer(args, iop, unit), unit);
}
@Specialization(guards = "receiver.getLayout() == Int16")
static Object doInt16(
ArrowCastToFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayInt.IntUnit.Byte2;
return new ArrowFixedArrayInt(pointer(args, iop, unit), unit);
}
@Specialization(guards = "receiver.getLayout() == Int32")
static Object doInt32(
ArrowCastToFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayInt.IntUnit.Byte4;
return new ArrowFixedArrayInt(pointer(args, iop, unit), unit);
}
@Specialization(guards = "receiver.getLayout() == Int64")
static Object doInt64(
ArrowCastToFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayInt.IntUnit.Byte8;
return new ArrowFixedArrayInt(pointer(args, iop, unit), unit);
}
@CompilerDirectives.TruffleBoundary
private static ByteBufferDirect pointer(Object[] args, InteropLibrary interop, SizeInBytes unit)
throws ArityException, UnsupportedTypeException, UnsupportedMessageException {
if (args.length < 2) {
throw ArityException.create(2, 3, args.length);
}
if (!interop.isNumber(args[0]) || !interop.fitsInLong(args[0])) {
throw UnsupportedTypeException.create(
new Object[] {args[0]}, "Address of Arrow vector is invalid");
}
if (!interop.isNumber(args[1]) || !interop.fitsInInt(args[1])) {
throw UnsupportedTypeException.create(
new Object[] {args[0]}, "Size of allocated memory is invalid");
}
var size = interop.asInt(args[1]);
var targetSize = size * unit.sizeInBytes();
ByteBuffer buffer = MemoryUtil.directBuffer(interop.asLong(args[0]), targetSize);
buffer.order(ByteOrder.LITTLE_ENDIAN);
if (args.length == 3) {
if (!interop.isNumber(args[2]) || !interop.fitsInLong(args[2])) {
throw UnsupportedTypeException.create(
new Object[] {args[2]}, "Address of non-null bitmap is invalid");
}
ByteBuffer validityMap =
MemoryUtil.directBuffer(interop.asLong(args[2]), (int) Math.ceil(size / 8) + 1);
return new ByteBufferDirect(buffer, validityMap);
} else {
return new ByteBufferDirect(buffer, size);
}
}
@Fallback
static Object doOther(ArrowCastToFixedSizeArrayFactory receiver, Object[] args) {
throw CompilerDirectives.shouldNotReachHere(unknownLayoutMessage(receiver.getLayout()));
}
@CompilerDirectives.TruffleBoundary
private static String unknownLayoutMessage(LogicalLayout layout) {
return "unknown layout: " + layout.toString();
}
}
}

View File

@ -0,0 +1,258 @@
package org.enso.interpreter.arrow.runtime;
import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.dsl.Cached;
import com.oracle.truffle.api.dsl.ImportStatic;
import com.oracle.truffle.api.dsl.Specialization;
import com.oracle.truffle.api.interop.InteropLibrary;
import com.oracle.truffle.api.interop.TruffleObject;
import com.oracle.truffle.api.interop.UnsupportedMessageException;
import com.oracle.truffle.api.library.CachedLibrary;
import com.oracle.truffle.api.library.ExportLibrary;
import com.oracle.truffle.api.library.ExportMessage;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
@ExportLibrary(InteropLibrary.class)
public final class ArrowFixedArrayDate implements TruffleObject {
private final int size;
private final ByteBufferDirect buffer;
private final DateUnit unit;
public ArrowFixedArrayDate(int size, DateUnit unit) {
this.size = size;
this.unit = unit;
this.buffer = allocateBuffer(size * unit.sizeInBytes(), size);
}
public ArrowFixedArrayDate(ByteBufferDirect buffer, DateUnit unit)
throws UnsupportedMessageException {
this.size = buffer.capacity() / unit.sizeInBytes();
this.unit = unit;
this.buffer = buffer;
}
public DateUnit getUnit() {
return unit;
}
@ExportMessage
public boolean hasArrayElements() {
return true;
}
@ExportMessage
@ImportStatic(ArrowFixedArrayDate.DateUnit.class)
static class ReadArrayElement {
@Specialization(guards = "receiver.getUnit() == Day")
static Object doDay(ArrowFixedArrayDate receiver, long index)
throws UnsupportedMessageException {
if (receiver.buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, receiver.unit);
var daysSinceEpoch = receiver.buffer.getInt(at);
var localDate = localDateFromDays(daysSinceEpoch);
return new ArrowDate(localDate);
}
@Specialization(guards = "receiver.getUnit() == Millisecond")
static Object doMilliseconds(ArrowFixedArrayDate receiver, long index)
throws UnsupportedMessageException {
if (receiver.buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, receiver.unit);
var secondsPlusNanoSinceEpoch = receiver.buffer.getLong(at);
var seconds = Math.floorDiv(secondsPlusNanoSinceEpoch, nanoDiv);
var nano = Math.floorMod(secondsPlusNanoSinceEpoch, nanoDiv);
var zonedDateTime = zonedDateTimeFromSeconds(seconds, nano, utc);
return new ArrowZonedDateTime(zonedDateTime);
}
}
@ExportMessage
@ImportStatic(ArrowFixedArrayDate.DateUnit.class)
static class WriteArrayElement {
@Specialization(guards = "receiver.getUnit() == Day")
static void doDay(
ArrowFixedArrayDate receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
if (!iop.isDate(value)) {
throw UnsupportedMessageException.create();
}
var at = typeAdjustedIndex(index, receiver.unit);
var time = iop.asDate(value).toEpochDay();
receiver.buffer.putInt(at, Math.toIntExact(time));
}
@Specialization(guards = {"receiver.getUnit() == Millisecond", "!iop.isNull(value)"})
static void doMilliseconds(
ArrowFixedArrayDate receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
if (!iop.isDate(value) || !iop.isTime(value)) {
throw UnsupportedMessageException.create();
}
var at = typeAdjustedIndex(index, receiver.unit);
if (iop.isTimeZone(value)) {
var zoneDateTimeInstant =
instantForZone(iop.asDate(value), iop.asTime(value), iop.asTimeZone(value), utc);
var secondsPlusNano =
zoneDateTimeInstant.getEpochSecond() * nanoDiv + zoneDateTimeInstant.getNano();
receiver.buffer.putLong(at, secondsPlusNano);
} else {
var dateTime = instantForOffset(iop.asDate(value), iop.asTime(value), ZoneOffset.UTC);
var secondsPlusNano = dateTime.getEpochSecond() * nanoDiv + dateTime.getNano();
receiver.buffer.putLong(at, secondsPlusNano);
}
}
@Specialization(guards = "iop.isNull(value)")
static void doNull(
ArrowFixedArrayDate receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop) {
receiver.buffer.setNull((int) index);
}
}
@ExportMessage
long getArraySize() {
return size;
}
@ExportMessage
boolean isArrayElementReadable(long index) {
return index >= 0 && index < size && !buffer.isNull((int) index);
}
@ExportMessage
boolean isArrayElementModifiable(long index) {
return index >= 0 && index < size;
}
@ExportMessage
boolean isArrayElementInsertable(long index) {
return index >= 0 && index < size;
}
@ExportLibrary(InteropLibrary.class)
static class ArrowDate implements TruffleObject {
private LocalDate date;
public ArrowDate(LocalDate date) {
this.date = date;
}
@ExportMessage
public boolean isDate() {
return true;
}
@ExportMessage
public LocalDate asDate() {
return date;
}
}
@ExportLibrary(InteropLibrary.class)
static class ArrowZonedDateTime implements TruffleObject {
private ZonedDateTime dateTime;
public ArrowZonedDateTime(ZonedDateTime dateTime) {
this.dateTime = dateTime;
}
@ExportMessage
public boolean isDate() {
return true;
}
@ExportMessage
public LocalDate asDate() {
return dateTime.toLocalDate();
}
@ExportMessage
public boolean isTime() {
return true;
}
@ExportMessage
public LocalTime asTime() {
return dateTime.toLocalTime();
}
@ExportMessage
public boolean isTimeZone() {
return true;
}
@ExportMessage
public ZoneId asTimeZone() {
return dateTime.getZone();
}
}
@CompilerDirectives.TruffleBoundary
private static ByteBufferDirect allocateBuffer(int sizeInBytes, int size) {
return new ByteBufferDirect(sizeInBytes, size);
}
@CompilerDirectives.TruffleBoundary
private static LocalDate localDateFromDays(int daysSinceEpoch) {
return LocalDate.ofEpochDay(daysSinceEpoch);
}
@CompilerDirectives.TruffleBoundary
private static ZonedDateTime zonedDateTimeFromSeconds(long seconds, long nano, ZoneId zone) {
return Instant.ofEpochSecond(seconds, nano).atZone(zone);
}
@CompilerDirectives.TruffleBoundary
private static Instant instantForZone(
LocalDate date, LocalTime time, ZoneId zone, ZoneId target) {
return date.atTime(time).atZone(zone).withZoneSameLocal(target).toInstant();
}
@CompilerDirectives.TruffleBoundary
private static Instant instantForOffset(LocalDate date, LocalTime time, ZoneOffset offset) {
return date.atTime(time).toInstant(offset);
}
public enum DateUnit implements SizeInBytes {
Day(4),
Millisecond(8);
private final int bytes;
DateUnit(int bytes) {
this.bytes = bytes;
}
public int sizeInBytes() {
return bytes;
}
}
private static final long nanoDiv = 1000000000L;
private static final ZoneId utc = ZoneId.of("UTC");
private static int typeAdjustedIndex(long index, SizeInBytes unit) {
return Math.toIntExact(index * unit.sizeInBytes());
}
}

View File

@ -0,0 +1,189 @@
package org.enso.interpreter.arrow.runtime;
import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.dsl.Cached;
import com.oracle.truffle.api.dsl.ImportStatic;
import com.oracle.truffle.api.dsl.Specialization;
import com.oracle.truffle.api.interop.InteropLibrary;
import com.oracle.truffle.api.interop.TruffleObject;
import com.oracle.truffle.api.interop.UnsupportedMessageException;
import com.oracle.truffle.api.library.CachedLibrary;
import com.oracle.truffle.api.library.ExportLibrary;
import com.oracle.truffle.api.library.ExportMessage;
@ExportLibrary(InteropLibrary.class)
public final class ArrowFixedArrayInt implements TruffleObject {
private final int size;
private final ByteBufferDirect buffer;
private final IntUnit unit;
public ArrowFixedArrayInt(int size, IntUnit unit) {
this.size = size;
this.unit = unit;
this.buffer = allocateBuffer(size * this.unit.sizeInBytes(), size);
}
public ArrowFixedArrayInt(ByteBufferDirect buffer, IntUnit unit)
throws UnsupportedMessageException {
this.size = buffer.capacity() / unit.sizeInBytes();
this.unit = unit;
this.buffer = buffer;
}
public IntUnit getUnit() {
return unit;
}
@ExportMessage
public boolean hasArrayElements() {
return true;
}
@ExportMessage
@ImportStatic(IntUnit.class)
static class ReadArrayElement {
@Specialization(guards = "receiver.getUnit() == Byte1")
public static Object doByte(ArrowFixedArrayInt receiver, long index)
throws UnsupportedMessageException {
if (receiver.buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, receiver.unit);
return receiver.buffer.get(at);
}
@Specialization(guards = "receiver.getUnit() == Byte2")
public static Object doShort(ArrowFixedArrayInt receiver, long index)
throws UnsupportedMessageException {
if (receiver.buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, receiver.unit);
return receiver.buffer.getShort(at);
}
@Specialization(guards = "receiver.getUnit() == Byte4")
public static Object doInt(ArrowFixedArrayInt receiver, long index)
throws UnsupportedMessageException {
if (receiver.buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, receiver.unit);
return receiver.buffer.getInt(at);
}
@Specialization(guards = "receiver.getUnit() == Byte8")
public static Object doLong(ArrowFixedArrayInt receiver, long index)
throws UnsupportedMessageException {
if (receiver.buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, receiver.unit);
return receiver.buffer.getLong(at);
}
}
@ExportMessage
@ImportStatic(IntUnit.class)
static class WriteArrayElement {
@Specialization(guards = "receiver.getUnit() == Byte1")
public static void doByte(
ArrowFixedArrayInt receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
if (!iop.fitsInByte(value)) {
throw UnsupportedMessageException.create();
}
receiver.buffer.put(typeAdjustedIndex(index, receiver.unit), (iop.asByte(value)));
}
@Specialization(guards = "receiver.getUnit() == Byte2")
public static void doShort(
ArrowFixedArrayInt receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
if (!iop.fitsInShort(value)) {
throw UnsupportedMessageException.create();
}
receiver.buffer.putShort(typeAdjustedIndex(index, receiver.unit), (iop.asShort(value)));
}
@Specialization(guards = "receiver.getUnit() == Byte4")
public static void doInt(
ArrowFixedArrayInt receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
if (!iop.fitsInInt(value)) {
throw UnsupportedMessageException.create();
}
receiver.buffer.putInt(typeAdjustedIndex(index, receiver.unit), (iop.asInt(value)));
}
@Specialization(guards = "receiver.getUnit() == Byte8")
public static void doLong(
ArrowFixedArrayInt receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
if (!iop.fitsInLong(value)) {
throw UnsupportedMessageException.create();
}
receiver.buffer.putLong(typeAdjustedIndex(index, receiver.unit), (iop.asLong(value)));
}
}
public enum IntUnit implements SizeInBytes {
Byte1(8),
Byte2(16),
Byte4(32),
Byte8(64);
private final int bits;
IntUnit(int bits) {
this.bits = bits;
}
public int sizeInBytes() {
return bits / 8;
}
}
@ExportMessage
long getArraySize() {
return size;
}
@ExportMessage
boolean isArrayElementReadable(long index) {
return index >= 0 && index < size && !buffer.isNull((int) index);
}
@ExportMessage
boolean isArrayElementModifiable(long index) {
return index >= 0 && index < size;
}
@ExportMessage
boolean isArrayElementInsertable(long index) {
return index >= 0 && index < size;
}
@CompilerDirectives.TruffleBoundary
private ByteBufferDirect allocateBuffer(int sizeInBytes, int size) {
return new ByteBufferDirect(sizeInBytes, size);
}
private static int typeAdjustedIndex(long index, SizeInBytes unit) {
return Math.toIntExact(index * unit.sizeInBytes());
}
}

View File

@ -0,0 +1,111 @@
package org.enso.interpreter.arrow.runtime;
import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.dsl.Cached;
import com.oracle.truffle.api.dsl.Fallback;
import com.oracle.truffle.api.dsl.ImportStatic;
import com.oracle.truffle.api.dsl.Specialization;
import com.oracle.truffle.api.interop.InteropLibrary;
import com.oracle.truffle.api.interop.TruffleObject;
import com.oracle.truffle.api.interop.UnsupportedMessageException;
import com.oracle.truffle.api.library.CachedLibrary;
import com.oracle.truffle.api.library.ExportLibrary;
import com.oracle.truffle.api.library.ExportMessage;
import org.enso.interpreter.arrow.LogicalLayout;
@ExportLibrary(InteropLibrary.class)
public class ArrowFixedSizeArrayFactory implements TruffleObject {
private final LogicalLayout logicalLayout;
public ArrowFixedSizeArrayFactory(LogicalLayout logicalLayout) {
this.logicalLayout = logicalLayout;
}
@ExportMessage
public boolean isInstantiable() {
return true;
}
public LogicalLayout getLayout() {
return logicalLayout;
}
@ExportMessage
@ImportStatic(LogicalLayout.class)
static class Instantiate {
@Specialization(guards = "receiver.getLayout() == Date32")
static Object doDate32(
ArrowFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
return new ArrowFixedArrayDate(arraySize(args, iop), ArrowFixedArrayDate.DateUnit.Day);
}
@Specialization(guards = "receiver.getLayout() == Date64")
static Object doDate64(
ArrowFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
return new ArrowFixedArrayDate(
arraySize(args, iop), ArrowFixedArrayDate.DateUnit.Millisecond);
}
@Specialization(guards = "receiver.getLayout() == Int8")
static Object doInt8(
ArrowFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
return new ArrowFixedArrayInt(arraySize(args, iop), ArrowFixedArrayInt.IntUnit.Byte1);
}
@Specialization(guards = "receiver.getLayout() == Int16")
static Object doInt16(
ArrowFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
return new ArrowFixedArrayInt(arraySize(args, iop), ArrowFixedArrayInt.IntUnit.Byte2);
}
@Specialization(guards = "receiver.getLayout() == Int32")
static Object doInt32(
ArrowFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
return new ArrowFixedArrayInt(arraySize(args, iop), ArrowFixedArrayInt.IntUnit.Byte4);
}
@Specialization(guards = "receiver.getLayout() == Int64")
static Object doInt64(
ArrowFixedSizeArrayFactory receiver,
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
return new ArrowFixedArrayInt(arraySize(args, iop), ArrowFixedArrayInt.IntUnit.Byte8);
}
@CompilerDirectives.TruffleBoundary
private static int arraySize(Object[] args, InteropLibrary interop)
throws UnsupportedMessageException {
if (args.length != 1 || !interop.isNumber(args[0]) || !interop.fitsInInt(args[0])) {
throw UnsupportedMessageException.create();
}
return interop.asInt(args[0]);
}
@Fallback
static Object doOther(ArrowFixedSizeArrayFactory receiver, Object[] args) {
throw CompilerDirectives.shouldNotReachHere(unknownLayoutMessage(receiver.getLayout()));
}
@CompilerDirectives.TruffleBoundary
private static String unknownLayoutMessage(LogicalLayout layout) {
return "unknown layout: " + layout.toString();
}
}
}

View File

@ -0,0 +1,164 @@
package org.enso.interpreter.arrow.runtime;
import com.oracle.truffle.api.interop.UnsupportedMessageException;
import java.nio.ByteBuffer;
final class ByteBufferDirect {
private final ByteBuffer buffer;
private final ByteBuffer nonNullBitmap;
/**
* Creates a fresh buffer with an empty non-null bitmap..
*
* @param sizeInBytes size of the new buffer in bytes
* @param size size of the new buffer for the elements of the requested type
*/
public ByteBufferDirect(int sizeInBytes, int size) {
this.buffer = ByteBuffer.allocate(sizeInBytes);
this.nonNullBitmap = ByteBuffer.allocate((int) Math.ceil(size / 8) + 1);
for (int i = 0; i < nonNullBitmap.capacity(); i++) {
nonNullBitmap.put(i, (byte) 0);
}
}
/**
* Creates a new buffer from memory-mapped buffer and a corresponding memory-mapped non-null
* bitmap.
*
* @param buffer memory-mapped buffer
* @param nonNullBitmap memory-mapped buffer representing non-null bitmap
*/
public ByteBufferDirect(ByteBuffer buffer, ByteBuffer nonNullBitmap) {
this.buffer = buffer;
this.nonNullBitmap = nonNullBitmap;
}
/**
* Creates a new buffer from memory-mapped buffer of a given size assuming it has only non-null
* values.
*
* @param buffer memory-mapped buffer
* @param size size of the buffer (in bytes)
*/
public ByteBufferDirect(ByteBuffer buffer, int size) {
this.buffer = buffer;
this.nonNullBitmap = ByteBuffer.allocate((int) Math.ceil(size / 8) + 1);
for (int i = 0; i < nonNullBitmap.capacity(); i++) {
nonNullBitmap.put(i, (byte) 255);
}
}
public void put(byte b) throws UnsupportedMessageException {
setValidityBitmap(0, 1);
buffer.put(b);
}
public byte get(int index) throws UnsupportedMessageException {
return buffer.get(index);
}
public void put(int index, byte b) throws UnsupportedMessageException {
setValidityBitmap(index, 1);
buffer.put(index, b);
}
public void putShort(short value) throws UnsupportedMessageException {
setValidityBitmap(0, 2);
buffer.putShort(value);
}
public short getShort(int index) throws UnsupportedMessageException {
return buffer.getShort(index);
}
public void putShort(int index, short value) throws UnsupportedMessageException {
setValidityBitmap(index, 2);
buffer.putShort(index, value);
}
public void putInt(int value) throws UnsupportedMessageException {
setValidityBitmap(0, 4);
buffer.putInt(value);
}
public int getInt(int index) throws UnsupportedMessageException {
return buffer.getInt(index);
}
public void putInt(int index, int value) throws UnsupportedMessageException {
setValidityBitmap(index, 4);
buffer.putInt(index, value);
}
public void putLong(long value) throws UnsupportedMessageException {
setValidityBitmap(0, 8);
buffer.putLong(value);
}
public long getLong(int index) throws UnsupportedMessageException {
return buffer.getLong(index);
}
public void putLong(int index, long value) throws UnsupportedMessageException {
setValidityBitmap(index, 8);
buffer.putLong(index, value);
}
public void putFloat(float value) throws UnsupportedMessageException {
setValidityBitmap(0, 4);
buffer.putFloat(value);
}
public float getFloat(int index) throws UnsupportedMessageException {
return buffer.getFloat(index);
}
public void putFloat(int index, float value) throws UnsupportedMessageException {
setValidityBitmap(index, 4);
buffer.putFloat(index, value);
}
public void putDouble(double value) throws UnsupportedMessageException {
setValidityBitmap(0, 8);
buffer.putDouble(value);
}
public double getDouble(int index) throws UnsupportedMessageException {
return buffer.getDouble(index);
}
public void putDouble(int index, double value) throws UnsupportedMessageException {
setValidityBitmap(index, 8);
buffer.putDouble(index, value);
}
public int capacity() throws UnsupportedMessageException {
return buffer.capacity();
}
public boolean isNull(int index) {
var bufferIndex = index >> 3;
var byteIndex = index & ~(1 << 3);
var slot = nonNullBitmap.get(bufferIndex);
var mask = 1 << byteIndex;
return (slot & mask) == 0;
}
public void setNull(int index) {
var bufferIndex = index >> 3;
var byteIndex = index & ~(1 << 3);
var slot = nonNullBitmap.get(bufferIndex);
var mask = ~(1 << byteIndex);
nonNullBitmap.put(bufferIndex, (byte) (slot & mask));
}
private void setValidityBitmap(int index0, int unitSize) {
var index = index0 / unitSize;
var bufferIndex = index >> 3;
var byteIndex = index & ~(1 << 3);
var slot = nonNullBitmap.get(bufferIndex);
var mask = 1 << byteIndex;
var updated = (slot | mask);
nonNullBitmap.put(bufferIndex, (byte) (updated));
}
}

View File

@ -0,0 +1,27 @@
package org.enso.interpreter.arrow.runtime;
import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.interop.InteropLibrary;
import com.oracle.truffle.api.interop.TruffleObject;
import com.oracle.truffle.api.library.ExportLibrary;
import com.oracle.truffle.api.library.ExportMessage;
@ExportLibrary(InteropLibrary.class)
final class NullValue implements TruffleObject {
@CompilerDirectives.CompilationFinal private static NullValue value = null;
private NullValue() {}
public static NullValue get() {
if (value == null) {
value = new NullValue();
}
return value;
}
@ExportMessage
public boolean isNull() {
return true;
}
}

View File

@ -0,0 +1,5 @@
package org.enso.interpreter.arrow.runtime;
public interface SizeInBytes {
int sizeInBytes();
}

View File

@ -0,0 +1,54 @@
package org.enso.interpreter.arrow.util;
import com.oracle.truffle.api.CompilerDirectives;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
public final class MemoryUtil {
private static MethodHandle byteBufferConstr;
static {
ByteBuffer buffer = null;
try {
buffer = ByteBuffer.allocateDirect(1);
Constructor<?> constr = buffer.getClass().getDeclaredConstructor(long.class, long.class);
constr.setAccessible(true);
byteBufferConstr = MethodHandles.lookup().unreflectConstructor(constr);
} catch (NoSuchMethodException | IllegalAccessException e) {
CompilerDirectives.transferToInterpreter();
throw new ExceptionInInitializerError(
"Unable to find a constructor for ByteBuffer created directly from a memory address");
} finally {
if (buffer != null) {
buffer.clear();
}
}
}
private MemoryUtil() {}
/**
* Create a ByteBuffer directly from a (allocated) memory address and its size without copying.
*
* @param address address in memory to the start of the allocated chunk of memory
* @param capacity size in bytes of the allocated chunk of memory
* @return ByteBuffer instance
*/
public static ByteBuffer directBuffer(long address, long capacity) {
if (byteBufferConstr != null) {
try {
return (ByteBuffer) byteBufferConstr.invoke(address, capacity);
} catch (Throwable e) {
CompilerDirectives.transferToInterpreter();
throw new RuntimeException(e);
}
} else {
CompilerDirectives.transferToInterpreter();
throw new RuntimeException(
"constructor for a ByteBuffer created from a memory address is missing");
}
}
}

View File

@ -0,0 +1,293 @@
package org.enso.interpreter.arrow;
import static org.junit.Assert.*;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.LogRecord;
import java.util.logging.SimpleFormatter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.IntVector;
import org.graalvm.polyglot.Context;
import org.graalvm.polyglot.Value;
import org.graalvm.polyglot.io.IOAccess;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class VerifyArrowTest {
private static Context ctx;
private static Handler handler;
@BeforeClass
public static void initEnsoContext() {
handler = new MockHandler();
ctx =
Context.newBuilder()
.allowExperimentalOptions(true)
.allowIO(IOAccess.ALL)
.logHandler(handler)
.out(System.out)
.err(System.err)
.allowAllAccess(true)
.build();
}
@AfterClass
public static void closeEnsoContext() throws Exception {
if (ctx != null) {
ctx.close();
}
}
@Test
public void arrowDate32() {
var arrow = ctx.getEngine().getLanguages().get("arrow");
assertNotNull("Arrow is available", arrow);
var date32Constr = ctx.eval("arrow", "new[Date32]");
Value date32Array = date32Constr.newInstance(10);
assertNotNull("allocated value should not be null", date32Array);
assertTrue("allocated value should be an array", date32Array.hasArrayElements());
var startDate = LocalDate.now();
populateArrayWithConsecutiveDays(date32Array, startDate);
var rawDayPlus2 = date32Array.getArrayElement(2);
var dayPlus2 = rawDayPlus2.asDate();
assertFalse(rawDayPlus2.isTime() || rawDayPlus2.isTimeZone());
assertEquals(startDate.plusDays(2), dayPlus2);
var startDateTime = ZonedDateTime.now();
populateArrayWithConsecutiveDays(date32Array, startDateTime);
rawDayPlus2 = date32Array.getArrayElement(2);
assertFalse(rawDayPlus2.isTime());
}
@Test
public void arrowDate64() {
var arrow = ctx.getEngine().getLanguages().get("arrow");
assertNotNull("Arrow is available", arrow);
var date64Constr = ctx.eval("arrow", "new[Date64]");
Value date64Array = date64Constr.newInstance(10);
assertNotNull("allocated value should not be null", date64Array);
assertTrue("allocated value should be an array", date64Array.hasArrayElements());
var startDate = ZonedDateTime.now(ZoneId.of("Europe/Paris"));
var startDateZone = startDate.getZone();
populateArrayWithConsecutiveDays(date64Array, startDate);
var rawZonedDateTime = date64Array.getArrayElement(2);
var dayPlus2 =
rawZonedDateTime.asDate().atTime(rawZonedDateTime.asTime()).atZone(startDateZone);
var startDateInstant = startDate.toInstant().atZone(startDate.getZone());
assertTrue(startDateInstant.plusDays(2).isEqual(dayPlus2));
assertFalse(startDate.isEqual(dayPlus2));
var startDate2 = ZonedDateTime.parse("2023-11-01T02:00:01+01:00[Europe/Paris]");
var startDate2Zone = startDate2.getZone();
var startDate2Pnf = ZonedDateTime.parse("2023-11-01T02:00:01-07:00[US/Pacific]");
populateArrayWithConsecutiveDays(date64Array, startDate2);
rawZonedDateTime = date64Array.getArrayElement(2);
dayPlus2 = rawZonedDateTime.asDate().atTime(rawZonedDateTime.asTime()).atZone(startDate2Zone);
assertTrue(startDate2.plusDays(2).isEqual(dayPlus2));
assertFalse(startDate2Pnf.plusDays(2).isEqual(dayPlus2));
date64Array.setArrayElement(5, null);
date64Array.setArrayElement(9, null);
assertFalse(date64Array.getArrayElement(4).isNull());
assertTrue(date64Array.getArrayElement(5).isNull());
assertTrue(date64Array.getArrayElement(9).isNull());
}
@Test
public void arrowInt8() {
var arrow = ctx.getEngine().getLanguages().get("arrow");
assertNotNull("Arrow is available", arrow);
var int8Constr = ctx.eval("arrow", "new[Int8]");
assertNotNull(int8Constr);
Value int8Array = int8Constr.newInstance(10);
assertNotNull(int8Array);
populateIntArray(int8Array, (byte) 42);
var v = int8Array.getArrayElement(5);
assertEquals((byte) 47, v.asByte());
int8Array.setArrayElement(5, 21);
v = int8Array.getArrayElement(5);
assertEquals((byte) 21, v.asByte());
try {
int8Array.setArrayElement(5, 300);
fail("expected out of bounds exception");
} catch (UnsupportedOperationException e) {
}
}
@Test
public void castInt() {
var typeLength = LogicalLayout.Int32;
var testValues = new Object[] {3, 1, 5, 3};
try (BufferAllocator allocator = new RootAllocator();
BaseFixedWidthVector intVector =
allocateFixedLengthVector(allocator, testValues, typeLength); ) {
var int32Constr = ctx.eval("arrow", "cast[" + typeLength + "]");
assertNotNull(int32Constr);
Value int32Array =
int32Constr.execute(
intVector.getDataBufferAddress(),
intVector.getDataBuffer().capacity() / typeLength.sizeInBytes());
assertNotNull(int32Array);
for (int i = 0; i < testValues.length; i++) {
if (testValues[i] != null) {
assertEquals(testValues[i], int32Array.getArrayElement(i).asInt());
} else {
assertTrue(int32Array.getArrayElement(i).isNull());
}
}
}
testValues = new Object[] {3, null, 5, 3, 7, 18, null, 9, 7, null, null, 100};
try (BufferAllocator allocator = new RootAllocator();
BaseFixedWidthVector intVector =
allocateFixedLengthVector(allocator, testValues, typeLength); ) {
var int32Constr = ctx.eval("arrow", "cast[" + typeLength + "]");
assertNotNull(int32Constr);
Value int32Array =
int32Constr.execute(
intVector.getDataBufferAddress(),
intVector.getDataBuffer().capacity() / typeLength.sizeInBytes(),
intVector.getValidityBufferAddress());
assertNotNull(int32Array);
for (int i = 0; i < testValues.length; i++) {
if (testValues[i] != null) {
assertEquals(testValues[i], int32Array.getArrayElement(i).asInt());
} else {
assertTrue(int32Array.getArrayElement(i).isNull());
}
}
// Verify vector is memory-mapped, not copied
assertTrue(int32Array.getArrayElement(10).isNull());
((IntVector) intVector).set(10, 12);
assertFalse(int32Array.getArrayElement(10).isNull());
assertEquals(12, int32Array.getArrayElement(10).asInt());
}
typeLength = LogicalLayout.Int64;
testValues = new Object[] {(long) 3, null, (long) 5, (long) 3};
try (BufferAllocator allocator = new RootAllocator();
BaseFixedWidthVector vector =
allocateFixedLengthVector(allocator, testValues, typeLength); ) {
var int32Constr = ctx.eval("arrow", "cast[" + typeLength + "]");
assertNotNull(int32Constr);
Value int32Array =
int32Constr.execute(
vector.getDataBufferAddress(),
vector.getDataBuffer().capacity() / typeLength.sizeInBytes(),
vector.getValidityBufferAddress());
assertNotNull(int32Array);
for (int i = 0; i < testValues.length; i++) {
if (testValues[i] != null) {
assertEquals(testValues[i], int32Array.getArrayElement(i).asLong());
} else {
assertTrue(int32Array.getArrayElement(i).isNull());
}
}
}
}
private BaseFixedWidthVector allocateFixedLengthVector(
BufferAllocator allocator, Object[] testValues, LogicalLayout unit) {
var valueCount = 0;
switch (unit) {
case Int32:
var intVector = new IntVector("fixed-size-primitive-layout", allocator);
intVector.allocateNew(testValues.length);
for (int i = 0; i < testValues.length; i++) {
if (testValues[i] != null) {
intVector.set(i, (int) testValues[i]);
valueCount++;
} else {
intVector.setNull(i);
}
}
intVector.setValueCount(valueCount);
return intVector;
case Int64:
var bigIntVector = new BigIntVector("fixed-size-primitive-layout", allocator);
bigIntVector.allocateNew(testValues.length);
for (int i = 0; i < testValues.length; i++) {
if (testValues[i] != null) {
bigIntVector.set(i, (long) testValues[i]);
valueCount++;
} else {
bigIntVector.setNull(i);
}
}
bigIntVector.setValueCount(valueCount);
return bigIntVector;
default:
throw new RuntimeException("unable to create a vector for " + unit);
}
}
private void populateArrayWithConsecutiveDays(Value arr, Temporal startDate) {
var len = arr.getArraySize();
for (int i = 0; i < len; i++) {
arr.setArrayElement(i, startDate.plus(2, java.time.temporal.ChronoUnit.DAYS));
}
}
private void populateIntArray(Value arr, byte startValue) {
var len = arr.getArraySize();
for (int i = 0; i < len; i++) {
var v = startValue + i;
arr.setArrayElement(i, (byte) v);
}
}
private static class MockHandler extends Handler {
private final Formatter fmt = new SimpleFormatter();
private final List<LogRecord> records = new ArrayList<>();
private String failMsg;
private Error failure;
public MockHandler() {}
public void failOnMessage(String msg) {
this.failMsg = msg;
}
@Override
public void publish(LogRecord lr) {
records.add(lr);
var msg = fmt.formatMessage(lr);
if (failMsg != null && failMsg.equals(msg)) {
failure = new AssertionError(this.toString() + "\nGot forbidden message: " + msg);
}
}
@Override
public void flush() {}
@Override
public void close() throws SecurityException {}
@Override
public String toString() {
var sb = new StringBuilder();
for (var r : records) {
sb.append("\n").append(fmt.formatMessage(r));
}
return sb.toString();
}
}
}